RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
Nacos 配置中心源码分析

本文主要和大家一起以源码的角度来分析 Nacos 配置中心的配置信息获取,以及配置信息动态同步的过程和原理。环境介绍和使用 环境介绍:

  • Jdk 1.8
  • nacos-server-1.4.2
  • spring-boot-2.3.5.RELEASE
  • spring-cloud-Hoxton.SR8
  • spring-cloiud-alibab-2.2.5.RELEASE

如果我们需要使用 Nacos 作为配置中心,我们首先需要导入 Nacos Config 的依赖信息,如下所示:

 
 
 
 
  1.  
  2.   com.alibaba.cloud 
  3.   spring-cloud-starter-alibaba-nacos-config 
  4.  

然后再 bootstartp.yml 文件中配置 Nacos 服务信息。

 
 
 
 
  1. spring: 
  2.   cloud: 
  3.     nacos: 
  4.       config: 
  5.         server-addr: 127.0.0.1:8848 

客户端初始化

主要是通过 NacosConfigBootstrapConfiguration 类来进行初始化 NacosConfigManager 、NacosPropertySourceLocator

 
 
 
 
  1. @Configuration(proxyBeanMethods = false) 
  2. @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true) 
  3. public class NacosConfigBootstrapConfiguration { 
  4.  
  5.  @Bean 
  6.  @ConditionalOnMissingBean 
  7.  public NacosConfigManager nacosConfigManager( 
  8.    NacosConfigProperties nacosConfigProperties) { 
  9.   return new NacosConfigManager(nacosConfigProperties); 
  10.  } 
  11.      
  12.     @Bean 
  13.  public NacosPropertySourceLocator nacosPropertySourceLocator( 
  14.    NacosConfigManager nacosConfigManager) { 
  15.   return new NacosPropertySourceLocator(nacosConfigManager); 
  16.  } 
  17.     // ... 

在 NacosConfigManager 的构造方法中会调用 createConfigService 方法来创建 ConfigService 实例,内部调用工厂方法 ConfigFactory#createConfigService 通过反射实例化一个com.alibaba.nacos.client.config.NacosConfigService 的实例对象。

 
 
 
 
  1. public static ConfigService createConfigService(Properties properties) throws NacosException { 
  2.     try { 
  3.         Class driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); 
  4.         Constructor constructor = driverImplClass.getConstructor(Properties.class); 
  5.         ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); 
  6.         return vendorImpl; 
  7.     } catch (Throwable e) { 
  8.         throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); 
  9.     } 

NacosPropertySourceLocator 继承 PropertySourceLocator(PropertySourceLocator接口支持扩展自定义配置加载到 Spring Environment中)通过 locate 加载配置信息。

 
 
 
 
  1. @Override 
  2. public PropertySource locate(Environment env) { 
  3.  nacosConfigProperties.setEnvironment(env); 
  4.  ConfigService configService = nacosConfigManager.getConfigService(); 
  5.  
  6.  if (null == configService) { 
  7.   log.warn("no instance of config service found, can't load config from nacos"); 
  8.   return null; 
  9.  } 
  10.  long timeout = nacosConfigProperties.getTimeout(); 
  11.  nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, 
  12.    timeout); 
  13.  String name = nacosConfigProperties.getName(); 
  14.  
  15.  String dataIdPrefix = nacosConfigProperties.getPrefix(); 
  16.  if (StringUtils.isEmpty(dataIdPrefix)) { 
  17.   dataIdPrefix = name; 
  18.  } 
  19.  
  20.  if (StringUtils.isEmpty(dataIdPrefix)) { 
  21.   dataIdPrefix = env.getProperty("spring.application.name"); 
  22.  } 
  23.  
  24.  CompositePropertySource composite = new CompositePropertySource( 
  25.    NACOS_PROPERTY_SOURCE_NAME); 
  26.  
  27.        // 共享配置 
  28.  loadSharedConfiguration(composite); 
  29.  // 拓展配置 
  30.        loadExtConfiguration(composite); 
  31.  // 应用配置 
  32.        loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env); 
  33.  return composite; 

配置读取过程

配置加载有三个方法 loadSharedConfiguration、loadSharedConfiguration、 loadApplicationConfiguration 以 loadApplicationConfiguration 继续跟进。

 
 
 
 
  1. private void loadApplicationConfiguration( 
  2.     CompositePropertySource compositePropertySource, String dataIdPrefix, 
  3.     NacosConfigProperties properties, Environment environment) { 
  4.     String fileExtension = properties.getFileExtension(); 
  5.     String nacosGroup = properties.getGroup(); 
  6.     // load directly once by default 
  7.     loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, 
  8.                            fileExtension, true); 
  9.     // load with suffix, which have a higher priority than the default 
  10.     loadNacosDataIfPresent(compositePropertySource, 
  11.                            dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true); 
  12.     // Loaded with profile, which have a higher priority than the suffix 
  13.     for (String profile : environment.getActiveProfiles()) { 
  14.         String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension; 
  15.         loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, 
  16.                                fileExtension, true); 
  17.     } 
  18.  

主要通过 loadNacosDataIfPresent 读取配置信息, 其实我们可以通过参数看出,主要配置文件包含以下部分:dataId, group, fileExtension

 
 
 
 
  1. private void loadNacosDataIfPresent(final CompositePropertySource composite, 
  2.                                     final String dataId, final String group, String fileExtension, 
  3.                                     boolean isRefreshable) { 
  4.     if (null == dataId || dataId.trim().length() < 1) { 
  5.         return; 
  6.     } 
  7.     if (null == group || group.trim().length() < 1) { 
  8.         return; 
  9.     } 
  10.     NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, 
  11.                                                                       fileExtension, isRefreshable); 
  12.     this.addFirstPropertySource(composite, propertySource, false); 

然后调用 loadNacosPropertySource 最后一步步的会调用到 NacosConfigService#getConfigInner

 
 
 
 
  1. private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException { 
  2.         group = null2defaultGroup(group); 
  3.         ParamUtils.checkKeyParam(dataId, group); 
  4.         ConfigResponse cr = new ConfigResponse(); 
  5.          
  6.         cr.setDataId(dataId); 
  7.         cr.setTenant(tenant); 
  8.         cr.setGroup(group); 
  9.          
  10.         // 优先使用本地配置 
  11.         String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); 
  12.         if (content != null) { 
  13.             LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), 
  14.                     dataId, group, tenant, ContentUtils.truncateContent(content)); 
  15.             cr.setContent(content); 
  16.             configFilterChainManager.doFilter(null, cr); 
  17.             content = cr.getContent(); 
  18.             return content; 
  19.         } 
  20.          
  21.         try { 
  22.             // 获取远程配置 
  23.             String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs); 
  24.             cr.setContent(ct[0]); 
  25.              
  26.             configFilterChainManager.doFilter(null, cr); 
  27.             content = cr.getContent(); 
  28.              
  29.             return content; 
  30.         } catch (NacosException ioe) { 
  31.             if (NacosException.NO_RIGHT == ioe.getErrCode()) { 
  32.                 throw ioe; 
  33.             } 
  34.             LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", 
  35.                     agent.getName(), dataId, group, tenant, ioe.toString()); 
  36.         } 
  37.          
  38.         LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), 
  39.                 dataId, group, tenant, ContentUtils.truncateContent(content)); 
  40.         content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant); 
  41.         cr.setContent(content); 
  42.         configFilterChainManager.doFilter(null, cr); 
  43.         content = cr.getContent(); 
  44.         return content; 
  45.     } 

加载远程配置

worker.getServerConfig 主要是获取远程配置, ClIentWorker 的 getServerConfig 定义如下:

 
 
 
 
  1. public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) 
  2.     throws NacosException { 
  3.     String[] ct = new String[2]; 
  4.     if (StringUtils.isBlank(group)) { 
  5.         group = Constants.DEFAULT_GROUP; 
  6.     } 
  7.  
  8.     HttpRestResult result = null; 
  9.     try { 
  10.         Map params = new HashMap(3); 
  11.         if (StringUtils.isBlank(tenant)) { 
  12.             params.put("dataId", dataId); 
  13.             params.put("group", group); 
  14.         } else { 
  15.             params.put("dataId", dataId); 
  16.             params.put("group", group); 
  17.             params.put("tenant", tenant); 
  18.         } 
  19.         result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); 
  20.     } catch (Exception ex) { 
  21.         String message = String 
  22.             .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", 
  23.                     agent.getName(), dataId, group, tenant); 
  24.         LOGGER.error(message, ex); 
  25.         throw new NacosException(NacosException.SERVER_ERROR, ex); 
  26.     } 
  27.  
  28.     switch (result.getCode()) { 
  29.         case HttpURLConnection.HTTP_OK: 
  30.             LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData()); 
  31.             ct[0] = result.getData(); 
  32.             if (result.getHeader().getValue(CONFIG_TYPE) != null) { 
  33.                 ct[1] = result.getHeader().getValue(CONFIG_TYPE); 
  34.             } else { 
  35.                 ct[1] = ConfigType.TEXT.getType(); 
  36.             } 
  37.             return ct; 
  38.         case HttpURLConnection.HTTP_NOT_FOUND: 
  39.             LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); 
  40.             return ct; 
  41.         case HttpURLConnection.HTTP_CONFLICT: { 
  42.             LOGGER.error( 
  43.                 "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, " 
  44.                 + "tenant={}", agent.getName(), dataId, group, tenant); 
  45.             throw new NacosException(NacosException.CONFLICT, 
  46.                                      "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); 
  47.         } 
  48.         case HttpURLConnection.HTTP_FORBIDDEN: { 
  49.             LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), 
  50.                          dataId, group, tenant); 
  51.             throw new NacosException(result.getCode(), result.getMessage()); 
  52.         } 
  53.         default: { 
  54.             LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(), 
  55.                          dataId, group, tenant, result.getCode()); 
  56.             throw new NacosException(result.getCode(), 
  57.                                      "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant=" 
  58.                                      + tenant); 
  59.         } 
  60.     } 

agent 默认使用 MetricsHttpAgent 实现类

配置同步过程

Nacos 配置同步过程如下图所示:

客户端请求

客户端初始请求配置完成后,会通过 WorkClient 进行长轮询查询配置, 它的构造方法如下:

 
 
 
 
  1. public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, 
  2.             final Properties properties) { 
  3.         this.agent = agent; 
  4.         this.configFilterChainManager = configFilterChainManager; 
  5.          
  6.         // Initialize the timeout parameter 
  7.          
  8.         init(properties); 
  9.  
  10.         // 检查线程池 
  11.         this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { 
  12.             @Override 
  13.             public Thread newThread(Runnable r) { 
  14.                 Thread t = new Thread(r); 
  15.                 t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); 
  16.                 t.setDaemon(true); 
  17.                 return t; 
  18.             } 
  19.         }); 
  20.                  
  21.         // 长轮询线程 
  22.         this.executorService = Executors 
  23.                 .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { 
  24.                     @Override 
  25.                     public Thread newThread(Runnable r) { 
  26.                         Thread t = new Thread(r); 
  27.                         t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); 
  28.                         t.setDaemon(true); 
  29.                         return t; 
  30.                     } 
  31.                 }); 
  32.          
  33.         this.executor.scheduleWithFixedDelay(new Runnable() { 
  34.             @Override 
  35.             public void run() { 
  36.                 try { 
  37.                     checkConfigInfo(); 
  38.                 } catch (Throwable e) { 
  39.                     LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); 
  40.                 } 
  41.             } 
  42.         }, 1L, 10L, TimeUnit.MILLISECONDS); 
  43.     } 

这里初始化了两个线程池:

  • 第一个线程池主要是用来初始化做长轮询的;
  • 第二个线程池使用来做检查的,会每间隔 10 秒钟执行一次检查方法 checkConfigInfo

checkConfigInfo

在这个方法里面主要是分配任务,给每个 task 分配一个 taskId , 后面会去检查本地配置和远程配置,最终调用的是 LongPollingRunable 的 run 方法。

 
 
 
 
  1. public void checkConfigInfo() { 
  2.     // Dispatch taskes. 
  3.     int listenerSize = cacheMap.size(); 
  4.     // Round up the longingTaskCount. 
  5.     int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); 
  6.     if (longingTaskCount > currentLongingTaskCount) { 
  7.         for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { 
  8.             // The task list is no order.So it maybe has issues when changing. 
  9.             executorService.execute(new LongPollingRunnable(i)); 
  10.         } 
  11.         currentLongingTaskCount = longingTaskCount; 
  12.     } 

LongPollingRunnable

长轮询线程实现,首先第一步检查本地配置信息,然后通过 dataId 去检查服务端是否有变动的配置信息,如果有就更新下来然后刷新配置。

 
 
 
 
  1. public void run() { 
  2.  
  3.         List cacheDatas = new ArrayList(); 
  4.         List inInitializingCacheList = new ArrayList(); 
  5.         try { 
  6.             // check failover config 
  7.             for (CacheData cacheData : cacheMap.values()) { 
  8.                 if (cacheData.getTaskId() == taskId) { 
  9.                     cacheDatas.add(cacheData); 
  10.                     try { 
  11.                         checkLocalConfig(cacheData); 
  12.                         if (cacheData.isUseLocalConfigInfo()) { 
  13.                             // 触发回调 
  14.                             cacheData.checkListenerMd5(); 
  15.                         } 
  16.                     } catch (Exception e) { 
  17.                         LOGGER.error("get local config info error", e); 
  18.                     } 
  19.                 } 
  20.             } 
  21.  
  22.             // check server config 
  23.             List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); 
  24.             if (!CollectionUtils.isEmpty(changedGroupKeys)) { 
  25.                 LOGGER.info("get changedGroupKeys:" + changedGroupKeys); 
  26.             } 
  27.  
  28.             for (String groupKey : changedGroupKeys) { 
  29.                 String[] key = GroupKey.parseKey(groupKey); 
  30.                 String dataId = key[0]; 
  31.                 String group = key[1]; 
  32.                 String tenant = null; 
  33.                 if (key.length == 3) { 
  34.                     tenant = key[2]; 
  35.                 } 
  36.                 try { 
  37.                     String[] ct = getServerConfig(dataId, group, tenant, 3000L); 
  38.                     CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); 
  39.                     cache.setContent(ct[0]); 
  40.                     if (null != ct[1]) { 
  41.                         cache.setType(ct[1]); 
  42.                     } 
  43.                     LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", 
  44.                                 agent.getName(), dataId, group, tenant, cache.getMd5(), 
  45.                                 ContentUtils.truncateContent(ct[0]), ct[1]); 
  46.                 } catch (NacosException ioe) { 
  47.                     String message = String 
  48.                         .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", 
  49.                                 agent.getName(), dataId, group, tenant); 
  50.                     LOGGER.error(message, ioe); 
  51.                 } 
  52.             } 
  53.             for (CacheData cacheData : cacheDatas) { 
  54.                 if (!cacheData.isInitializing() || inInitializingCacheList 
  55.                     .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { 
  56.                     cacheData.checkListenerMd5(); 
  57.                     cacheData.setInitializing(false); 
  58.                 } 
  59.             } 
  60.             inInitializingCacheList.clear(); 
  61.  
  62.             executorService.execute(this); 
  63.  
  64.         } catch (Throwable e) { 
  65.  
  66.             // If the rotation training task is abnormal, the next execution time of the task will be punished 
  67.             LOGGER.error("longPolling error : ", e); 
  68.             executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); 
  69.         } 
  70.     } 

addTenantListeners

添加监听,这里主要是通过 dataId , group 来获取 cache 本地缓存的配置信息,然后再将 Listener 也传给 cache 统一管理。

 
 
 
 
  1. public void addTenantListeners(String dataId, String group, List listeners) 
  2.         throws NacosException { 
  3.     group = null2defaultGroup(group); 
  4.     String tenant = agent.getTenant(); 
  5.     CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); 
  6.     for (Listener listener : listeners) { 
  7.         cache.addListener(listener); 
  8.     } 

回调触发

如果 md5 值发生变化过后就会调用 safeNotifyListener 方法然后将配置信息发送给对应的监听器

 
 
 
 
  1. void checkListenerMd5() { 
  2.     for (ManagerListenerWrap wrap : listeners) { 
  3.         if (!md5.equals(wrap.lastCallMd5)) { 
  4.             safeNotifyListener(dataId, group, content, type, md5, wrap); 
  5.         } 
  6.     } 

服务端响应

当服务端收到请求后,会 hold 住当前请求,如果有变化就返回,如果没有变化就等待超时之前返回无变化。

 
 
 
 
  1. /** 
  2.  * The client listens for configuration changes. 
  3.  */ 
  4. @PostMapping("/listener") 
  5. @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) 
  6. public void listener(HttpServletRequest request, HttpServletResponse response) 
  7.     throws ServletException, IOException { 
  8.     request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); 
  9.     String probeModify = request.getParameter("Listening-Configs"); 
  10.     if (StringUtils.isBlank(probeModify)) { 
  11.         throw new IllegalArgumentException("invalid probeModify"); 
  12.     } 
  13.  
  14.     probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); 
  15.  
  16.     Map clientMd5Map; 
  17.     try { 
  18.         clientMd5Map = MD5Util.getClientMd5Map(probeModify); 
  19.     } catch (Throwable e) { 
  20.         throw new IllegalArgumentException("invalid probeModify"); 
  21.     } 
  22.  
  23.     // do long-polling 
  24.     inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); 

LongPollingService

核心处理类 LongPollingService

 
 
 
 
  1. /** 
  2.    * Add LongPollingClient. 
  3.    * 
  4.    * @param req              HttpServletRequest. 
  5.    * @param rsp              HttpServletResponse. 
  6.    * @param clientMd5Map     clientMd5Map. 
  7.    * @param probeRequestSize probeRequestSize. 
  8.    */ 
  9.   public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map, 
  10.           int probeRequestSize) { 
  11.        
  12.       String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); 
  13.       String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); 
  14.       String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); 
  15.       String tag = req.getHeader("Vipserver-Tag"); 
  16.       int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); 
  17.        
  18.       // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout. 
  19.       long timeout = Math.max(10000, Long.parseLong(str) - delayTime); 
  20.       if (isFixedPolling()) { 
  21.           timeout = Math.max(10000, getFixedPollingInterval()); 
  22.           // Do nothing but set fix polling timeout. 
  23.       } else { 
  24.           long start = System.currentTimeMillis(); 
  25.           List changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); 
  26.           if (changedGroups.size() > 0) { 
  27.               generateResponse(req, rsp, changedGroups); 
  28.               LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}",&nb
    名称栏目:Nacos 配置中心源码分析
    分享网址:http://www.jxjierui.cn/article/dphocch.html