In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail the example analysis of Nacos client configuration center cache dynamic update. The editor thinks it is very practical, so I share it with you for reference. I hope you can get something after reading this article.
As the configuration center, Nacos is cached to local memory and disk when the application accesses the Nacos to dynamically obtain the configuration source.
Because Nacos acts as a dynamic configuration center, it means that all relevant clients need to be aware of subsequent configuration changes and update local memory!
Client configuration cache update
When the client gets the configuration, it needs to be dynamically refreshed to ensure that the data is consistent with the server. How is this process implemented? In this section, we will do a detailed analysis.
Nacos uses long-rotation training mechanism to achieve the synchronization of data changes, the principle is as follows!
The overall workflow is as follows:
The client initiates a long-rotation training request
After the server receives the request, it first compares whether the data in the server cache is the same. If not, it returns directly.
If the same, then delay 29.5s through schedule before performing the comparison
In order to ensure that the server can notify the client in time when data changes occur within 29.5s, the server uses event subscription to listen for events of local data changes on the server. Once the event is received, it triggers the notification of the DataChangeTask, traverses the ClientLongPolling in the allStubs queue, and writes the result back to the client, thus completing a data push.
What if the scheduling task in ClientLongPolling starts executing again after the DataChangeTask task completes the "push" of the data?
It is very simple, as long as you cancel the scheduling task waiting to be executed before the "push" operation, so as to prevent the scheduling task from writing the response data after the push operation has finished writing the response data. an error is bound to be reported at this time. So, in the ClientLongPolling method, the first step is to delete the subscription event
Long rotation training mission start entrance
In the constructor of NacosConfigService, when the class is instantiated, something is done
Initialize a HttpAgent, and here we use the decorated mode. The actual working class is ServerHttpAgent, and the ServerHttpAgent method is also called inside the MetricsHttpAgent, adding monitoring and statistical information.
ClientWorker, a client-side work class, agent is passed to clientworker as a parameter, and you can basically guess that agent will be used to do some remote-related things.
Public NacosConfigService (Properties properties) throws NacosException {ValidatorUtils.checkInitParam (properties); String encodeTmp = properties.getProperty (PropertyKeyConst.ENCODE); if (StringUtils.isBlank (encodeTmp)) {this.encode = Constants.ENCODE;} else {this.encode = encodeTmp.trim ();} initNamespace (properties); / / this.configFilterChainManager = new ConfigFilterChainManager (properties); / / initialize the network communication component this.agent = new MetricsHttpAgent (new ServerHttpAgent (properties)) This.agent.start (); / / initialize ClientWorker this.worker = new ClientWorker (this.agent, this.configFilterChainManager, properties);} ClientWorker
In the above initialization code, we need to focus on the class ClientWorker, which is constructed as follows
Public ClientWorker (final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {this.agent = agent; this.configFilterChainManager = configFilterChainManager; / / initialize configuration filter Manager / / Initialize the timeout parameter init (properties) / / initialize configuration / / initialize a scheduled thread pool, rewriting the threadfactory method this.executor = Executors.newScheduledThreadPool (1, new ThreadFactory () {@ Override public Thread newThread (Runnable r) {Thread t = newThread (r); t.setName ("com.alibaba.nacos.client.Worker." + agent.getName ()); t.setDaemon (true) Return t;}}); / / initialize a scheduled thread pool. Judging from the name of name in it, it seems to have something to do with long rotation training. And this long rotation training should be with the long rotation training of nacos server this.executorService = Executors. NewScheduledThreadPool (Runtime.getRuntime (). AvailableProcessors (), new ThreadFactory () {@ Override public Thread newThread (Runnable r) {Thread t = newThread (r); t.setName ("com.alibaba.nacos.client.Worker.longPolling." + agent.getName () T.setDaemon (true); return t;}}) / / set the execution frequency of the scheduled task, and call the checkConfigInfo method to guess whether the configuration has changed / / the delay time for the first execution is 1 millisecond and the delay time is 10 milliseconds this.executor.scheduleWithFixedDelay (new Runnable () {@ Override public void run () {try {checkConfigInfo ()) } catch (Throwable e) {LOGGER.error ("[" + agent.getName () + "] [sub-check] rotate check error", e);}}, 1L, 10L, TimeUnit.MILLISECONDS);}
You can see that in addition to keeping HttpAgent internally, ClientWorker has created two thread pools:
The first thread pool is that executor,executor, which has only one thread to perform scheduled tasks, executes the checkConfigInfo () method every other 10ms, and you can see from the method name that the configuration information is checked every 10ms.
The second thread pool is a normal thread pool, and you can see from the name of ThreadFactory that this thread pool is for long polling.
CheckConfigInfo
During the initialization of ClientWorker construction, a scheduled task is started to execute the checkConfigInfo () method, which is mainly used to periodically check the local configuration and configuration changes on the server. This method is defined as follows.
Public void checkConfigInfo () {/ / Dispatch tasks. Int listenerSize = cacheMap.size (); / Round up the longingTaskCount. / / rounded up to batches, divided by 3000 to get an integer that represents the number of long-rotation training tasks int longingTaskCount = (int) Math.ceil (listenerSize / ParamUtil.getPerTaskConfigSize ()); / / currentLongingTaskCount represents the current number of long-rotation training tasks. If it is less than the calculated result, you can continue to create if (longingTaskCount > currentLongingTaskCount) {for (int I = (int) currentLongingTaskCount; I)
< longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; }} 这个方法主要的目的是用来检查服务端的配置信息是否发生了变化。如果有变化,则触发listener通知 cacheMap: AtomicReference cacheMap 用来存储监听变更的缓存集合。key是根据dataID/group/tenant(租户) 拼接的值。Value是对应存储在nacos服务器上的配置文件的内容。 默认情况下,每个长轮训LongPullingRunnable任务默认处理3000个监听配置集。如果超过3000, 则需要启动多个LongPollingRunnable去执行。 currentLongingTaskCount保存已启动的LongPullingRunnable任务数 executorService就是在ClientWorker构造方法中初始化的线程池 LongPollingRunnable.run LongPollingRunnable长轮训任务的实现逻辑,代码比较长,我们分段来分析。 第一部分主要有两个逻辑 对任务按照批次分类 检查当前批次的缓存和本地文件的数据是否一致,如果发生了变化,则触发监听。 class LongPollingRunnable implements Runnable { private final int taskId; //表示当前任务批次id public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List cacheDatas = new ArrayList(); List inInitializingCacheList = new ArrayList(); try { // 遍历CacheMap,把CacheMap中和当前任务id相同的缓存,保存到cacheDatas // 通过checkLocalConfig方法 for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { //这里表示数据有变化,需要通知监听器 cacheData.checkListenerMd5(); //通知所有针对当前配置设置了监听的监听器 } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } //省略部分 } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); //出现异常,到下一次taskPenaltyTime后重新执行任务 } }}checkLocalConfig 检查本地配置,这里面有三种情况 如果isUseLocalConfigInfo为false,表示不使用本地配置,但是本地缓存路径的文件是存在的,于是把isUseLocalConfigInfo设置为true,并且更新cacheData的内容以及文件的更新时间 如果isUseLocalConfigInfo为true,表示使用本地配置文件,但是本地缓存文件不存在,则设置为false,不通知监听器。 如果isUseLocalConfigInfo为true,并且本地缓存文件也存在,但是缓存的的时间和文件的更新时间不一致,则更新cacheData中的内容,并且isUseLocalConfigInfo设置为true。 private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant; File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 没有 ->There are if (! cacheData.isUseLocalConfigInfo () & & path.exists ()) {String content = LocalConfigInfoProcessor.getFailover (agent.getName (), dataId, group, tenant); final String md5 = MD5Utils.md5Hex (content, Constants.ENCODE); cacheData.setUseLocalConfigInfo (true); cacheData.setLocalConfigInfoVersion (path.lastModified ()); cacheData.setContent (content) String encryptedDataKey = LocalEncryptedDataKeyProcessor .getEncryptDataKeyFailover (agent.getName (), dataId, group, tenant); cacheData.setEncryptedDataKey (encryptedDataKey); LOGGER.warn ("[{}] [failover-change] failover file created. DataId= {}, group= {}, tenant= {}, md5= {}, content= {} ", agent.getName (), dataId, group, tenant, md5, ContentUtils.truncateContent (content); return;} / / Yes-> No. Do not notify the business listener, but notify after getting the configuration from server. / / If use local config info, then it doesn't notify business listener and notify after getting from server. If (cacheData.isUseLocalConfigInfo () & &! path.exists ()) {cacheData.setUseLocalConfigInfo (false); LOGGER.warn ("[{}] [failover-change] failover file deleted. DataId= {}, group= {}, tenant= {} ", agent.getName (), dataId, group, tenant); return;} / / if (cacheData.isUseLocalConfigInfo () & & path.exists () & & cacheData.getLocalConfigInfoVersion ()! = path .lastModified ()) {String content = LocalConfigInfoProcessor.getFailover (agent.getName (), dataId, group, tenant) Final String md5 = MD5Utils.md5Hex (content, Constants.ENCODE); cacheData.setUseLocalConfigInfo (true); cacheData.setLocalConfigInfoVersion (path.lastModified ()); cacheData.setContent (content); String encryptedDataKey = LocalEncryptedDataKeyProcessor .getEncryptDataKeyFailover (agent.getName (), dataId, group, tenant); cacheData.setEncryptedDataKey (encryptedDataKey); LOGGER.warn ("[{}] [failover-change] failover file changed. DataId= {}, group= {}, tenant= {}, md5= {}, content= {} ", agent.getName (), dataId, group, tenant, md5, ContentUtils.truncateContent (content);}} checkListenerMd5
Traverses the listeners added by the user, and sends a notification if the md5 value of the data is different.
Void checkListenerMd5 () {for (ManagerListenerWrap wrap: listeners) {if (! md5.equals (wrap.lastCallMd5)) {safeNotifyListener (dataId, group, content, type, md5, wrap);}} check server configuration
In LongPollingRunnable.run, the notification of change is realized by reading and checking the local configuration to determine whether the data has changed.
Next, the current thread needs to go to the remote server to get the latest data and check which data has changed
Obtain the dataid of data changes on the remote server through checkUpdateDataIds
Iterate through the collection of these changes, and then call getServerConfig to get the corresponding content from the remote server
Update the local cache and set it to the content returned by the server
Finally, traverse the cacheDatas and find the changed data for notification.
/ / check server config// gets the DataID list of changed data from the server and stores it in the List collection List changedGroupKeys = checkUpdateDataIds (cacheDatas, inInitializingCacheList); if (! CollectionUtils.isEmpty (changedGroupKeys)) {LOGGER.info ("get changedGroupKeys:" + changedGroupKeys);} / / traverses the changed configuration item for (String groupKey: changedGroupKeys) {String [] key = GroupKey.parseKey (groupKey); String dataId = key [0]; String group = key [1] String tenant = null; if (key.length = = 3) {tenant = key [2];} try {/ / get configuration information item by item ConfigResponse response = getServerConfig (dataId, group, tenant, 3000L); / / Save configuration information to CacheData CacheData cache = cacheMap.get (GroupKey.getKeyTenant (dataId, group, tenant); cache.setContent (response.getContent ()) Cache.setEncryptedDataKey (response.getEncryptedDataKey ()); if (null! = response.getConfigType ()) {cache.setType (response.getConfigType ()) } LOGGER.info ("[{}] [data-received] dataId= {}, group= {}, tenant= {}, md5= {}, content= {}, type= {}", agent.getName (), dataId, group, tenant, cache.getMd5 (), ContentUtils.truncateContent (response.getContent ()), response.getConfigType ()) } catch (NacosException ioe) {String message = String .format ("[% s] [get-update] get changed config exception. DataId=%s, group=%s, tenant=%s ", agent.getName (), dataId, group, tenant); LOGGER.error (message, ioe) }} / / iterate through the CacheData collection to find the changed data and notify for (CacheData cacheData: cacheDatas) {if (! cacheData.isInitializing () | | inInitializingCacheList .polling (GroupKey.getKeyTenant (cacheData.dataId, cacheData.group, cacheData.tenant)) {cacheData.checkListenerMd5 (); cacheData.setInitializing (false);}} inInitializingCacheList.clear (); / / continue to pass the current thread for polling executorService.execute (this); checkUpdateDataIds
The main purpose of this method is to initiate a check request to the server to determine whether the local configuration is consistent with that of the server.
First find the cache whose isUseLocalConfigInfo is false from the cacheDatas collection
Assemble the configuration items that need to be checked into a string and call checkUpdateConfigStr for verification
/ * get the DataID list with changed values from Server. Only dataId and group are valid among the returned objects. Guarantee that no NULL is returned. * / List checkUpdateDataIds (List cacheDatas, List inInitializingCacheList) throws IOException {StringBuilder sb = new StringBuilder (); for (CacheData cacheData: cacheDatas) {/ / concatenate the configuration items to be checked into a string if (! cacheData.isUseLocalConfigInfo ()) {/ / find isUseLocalConfigInfo=false 's cache sb.append (cacheData.dataId) .append (WORD_SEPARATOR); sb.append (cacheData.group) .append (WORD_SEPARATOR) If (StringUtils.isBlank (cacheData.tenant)) {sb.append (cacheData.getMd5 ()) .append (LINE_SEPARATOR);} else {sb.append (cacheData.getMd5 ()) .append (WORD_SEPARATOR); sb.append (cacheData.getTenant ()) .append (LINE_SEPARATOR) } if (cacheData.isInitializing ()) {/ cacheData appears in cacheMap for the first time & the first check update inInitializingCacheList .add (GroupKey.getKeyTenant (cacheData.dataId, cacheData.group, cacheData.tenant));}} boolean isInitializingCacheList =! inInitializingCacheList.isEmpty (); return checkUpdateConfigStr (sb.toString (), isInitializingCacheList);} checkUpdateConfigStr
Get a list of DataID with changed values from Server. Only dataId and group are valid among the returned objects. Guarantee that no NULL is returned.
List checkUpdateConfigStr (String probeUpdateString, boolean isInitializingCacheList) throws Exception {/ / stitching parameters and header Map params = new HashMap (2); params.put (Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map headers = new HashMap (2); headers.put ("Long-Pulling-Timeout", "+ timeout); / / told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) {headers.put (" Long-Pulling-Timeout-No-Hangup "," true ") } if (StringUtils.isBlank (probeUpdateString)) {/ / determines whether the string that may be changed is empty, and if so, returns directly. Return Collections.emptyList ();} try {/ / In order to prevent the server from handling the delay of the client's long task, / / increase the client's read timeout to avoid this problem. / / set readTimeoutMs, that is, the timeout for waiting for a response for this request. The default is 30s long readTimeoutMs = timeout + (long) Math.round (timeout > > 1); / / initiate a remote call HttpRestResult result = agent .httpPost (Constants.CONFIG_CONTROLLER_PATH + "/ listener", headers, params, agent.getEncode (), readTimeoutMs) If (result.ok ()) {/ / if the response succeeds setHealthServer (true); return parseUpdateDataIdResponse (result.getData ()); / / parses and updates the data and returns the string that did change the data: tenant/group/dataid. } else {/ / if the response fails setHealthServer (false); LOGGER.error ("[{}] [check-update] get changed dataId error, code: {}", agent.getName (), result.getCode ());} catch (Exception e) {setHealthServer (false) LOGGER.error ("[" + agent.getName () + "] [check-update] get changed dataId exception", e); throw e;} return Collections.emptyList ();} client cache configuration summary of long rotation mechanism
The core points of the overall implementation are just the following parts
Split the tasks for the configuration of the local cache. Each batch is 3000.
Create a thread for every 3000 to execute
First compare the cache of each batch with the data in the local disk file
If it is inconsistent with the local configuration, it indicates that the cache has been updated and directly notifies the client to listen
If the local cache and disk data are consistent, you need to initiate a remote request to check for configuration changes
First splice a string with tenent/groupId/dataId, send it to the server for check, and return the changed configuration.
The client receives the change configuration list, and then traverses it item by item and sends it to the server to get the configuration content.
Server-side configuration of updated push
After analyzing the client, how does the server handle the client's request as curiosity drives it? So again, we need to think about a few questions.
How does the server realize the long rotation training mechanism?
Why should the client's timeout be set to 30s?
The address of the request initiated by the client is: / v1/cs/configs/listener, so find this API to view it, as follows.
/ / # ConfigController.java@PostMapping ("/ listener") @ Secured (action = ActionTypes.READ, parser = ConfigResourceParser.class) public void listener (HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {request.setAttribute ("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter ("Listening-Configs"); if (StringUtils.isBlank (probeModify)) {throw new IllegalArgumentException ("invalid probeModify");} probeModify = URLDecoder.decode (probeModify, Constants.ENCODE) Map clientMd5Map; try {/ / parses the possible changed configuration items passed by the client and converts them into Map collection (key=dataId,value=md5) clientMd5Map = MD5Util.getClientMd5Map (probeModify);} catch (Throwable e) {throw new IllegalArgumentException ("invalid probeModify");} / start the long training rotation. Inner.doPollingConfig (request, response, clientMd5Map, probeModify.length ();} doPollingConfig
This method is mainly used to judge long rotation training and short polling.
If it's a long rotation, just follow the addLongPollingClient method.
If it is a short poll, compare the data of the server directly, and if there is md5 inconsistency, return the data directly.
Public String doPollingConfig (HttpServletRequest request, HttpServletResponse response, Map clientMd5Map, int probeRequestSize) throws IOException {/ / determines whether the current request supports long rotation training. () if (LongPollingService.isSupportLongPolling (request)) {longPollingService.addLongPollingClient (request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + "";} / / if it is a short poll, go to the following request, which is to compare the data sent by the client with the data from the server item by item and save it to changeGroups. / / Compatible with short polling logic. List changedGroups = MD5Util.compareMd5 (request, response, clientMd5Map); / / Compatible with short polling result. String oldResult = MD5Util.compareMd5OldResult (changedGroups); String newResult = MD5Util.compareMd5ResultString (changedGroups); String version = request.getHeader (Constants.CLIENT_VERSION_HEADER); if (version = = null) {version = "2.0.0";} int versionNum = Protocol.getVersionNumber (version); / / Before 2.0.4 version, return value is put into header. If (versionNum
< START_LONG_POLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } Loggers.AUTH.info("new content:" + newResult); // Disable cache. response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + "";}addLongPollingClient 把客户端的请求,保存到长轮训的执行引擎中。 public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map, int probeRequestSize) { //获取客户端长轮训的超时时间 String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); //不允许断开的标记 String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); //应用名称 String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); // String tag = req.getHeader("Vipserver-Tag"); //延期时间,默认为500ms int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout. // 提前500ms返回一个响应,避免客户端出现超时 long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // Do nothing but set fix polling timeout. } else { long start = System.currentTimeMillis(); //通过md5判断客户端请求过来的key是否有和服务器端有不一致的,如果有,则保存到changedGroups中。 List changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() >0) {/ / if a change is found, the request is returned directly to the client generateResponse (req, rsp, changedGroups) LogUtil.CLIENT_LOG.info ("{} | {} | {}", System.currentTimeMillis ()-start, "instant", RequestUtil.getRemoteIp (req), "polling", clientMd5Map.size (), probeRequestSize, changedGroups.size ()); return } else if (noHangUpFlag! = null & & noHangUpFlag.equalsIgnoreCase (TRUE_STR)) {/ / if noHangUpFlag is true, it means there is no need to suspend the client, so it is returned directly. LogUtil.CLIENT_LOG.info ("{} | {} | {}", System.currentTimeMillis ()-start, "nohangup", RequestUtil.getRemoteIp (req), "polling", clientMd5Map.size (), probeRequestSize, changedGroups.size ()); return;}} / / get ip String ip = RequestUtil.getRemoteIp (req) on the request side / / Must be called by http thread, or send response. / / convert the current request to an asynchronous request (meaning that the tomcat thread is released at this time, that is, the client request, and the return needs to be triggered manually through asyncContext, otherwise it will be suspended all the time) final AsyncContext asyncContext = req.startAsync (); / / AsyncContext.setTimeout () is incorrect, Control by oneself asyncContext.setTimeout (0L) / / set the asynchronous request timeout, / / request ConfigExecutor.executeLongPolling for CEO rotation training (new ClientLongPolling (asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag);} ClientLongPolling
Let's take a look at exactly what clientLongPolling does. Or we can guess what we should do first.
This task needs to be blocked for 29.5s before it can be executed, because it doesn't make any sense to execute it right away, after all, it has already been done before.
If the data changes within 29.5s +, advance notice is required. Need to have a monitoring mechanism
Based on these conjectures, we can take a look at its implementation process.
From the point of view of the coarse granularity of the code, its implementation seems to be consistent with our guess. In the run method, a scheduled task is implemented through scheduler.schedule, and its delay time is exactly 29.5s calculated earlier. In this task, the calculation will be done through MD5Util.compareMd5
The other one, when the data changes, we certainly can't wait until after 29.5s to notify. What should we do? We found something about allSubs that seems to have something to do with publishing subscriptions. Is it possible that the current clientLongPolling subscribes to events of data changes?
Class ClientLongPolling implements Runnable {@ Override public void run () {/ / build an asynchronous task, and delay the execution of asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling (new Runnable () {@ Override public void run () {/ / if it reaches 29.5s) in 29.5s, indicating that no configuration changes have been made during this period Then automatically trigger the execution of try {getRetainIps () .put (ClientLongPolling.this.ip, System.currentTimeMillis ()) / / Delete subsciber's relations. AllSubs.remove (ClientLongPolling.this) / / remove subscription relationship if (isFixedPolling ()) {/ / if it is a long-rotation training LogUtil.CLIENT_LOG with fixed intervals ("{} | {}", (System.currentTimeMillis ()-createTime), "fix" RequestUtil.getRemoteIp ((HttpServletRequest) asyncContext.getRequest ()), "polling", clientMd5Map.size (), probeRequestSize) / / compare the changed key List changedGroups = MD5Util .compareMd5 ((HttpServletRequest) asyncContext.getRequest (), (HttpServletResponse) asyncContext.getResponse (), clientMd5Map) If (changedGroups.size () > 0) {/ / if greater than 0 indicates a change, respond directly to sendResponse (changedGroups);} else {sendResponse (null) / / otherwise return null}} else {LogUtil.CLIENT_LOG .info ("{} | {}", (System.currentTimeMillis ()-createTime), "timeout" RequestUtil.getRemoteIp ((HttpServletRequest) asyncContext.getRequest ()), "polling", clientMd5Map.size (), probeRequestSize) SendResponse (null);}} catch (Throwable t) {LogUtil.DEFAULT_LOG.error ("long polling error:" + t.getMessage (), t.getCause ());}, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add (this) / / add the current thread to the subscription event queue}} allSubs
AllSubs is a queue in which the object ClientLongPolling is placed. This queue seems to have something to do with configuration changes.
So what must be realized here is that when the user modifies the configuration in the nacos console, the client long connection of interest must be taken out of the subscription relationship and the result of the change must be returned. So we looked at the constructor of LongPollingService to find the subscription relationship.
/ * * long polling subscription relationship * / final Queue allSubs;allSubs.add (this); LongPollingService
In the constructor of LongPollingService, an NotifyCenter is used to subscribe to an event, and it is not difficult to find that if the instance of the event is LocalDataChangeEvent, that is, when the server data changes, a thread of DataChangeTask will be executed.
Public LongPollingService () {allSubs = new ConcurrentLinkedQueue (); ConfigExecutor.scheduleLongPolling (new StatTask (), 0L, 10L, TimeUnit.SECONDS); / / Register LocalDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher (LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); / / register the LocalDataChangeEvent subscription event NotifyCenter.registerSubscriber (new Subscriber () {@ Override public void onEvent (Event event) {if (isFixedPolling ()) {/ / Ignore. } else {if (event instanceof LocalDataChangeEvent) {/ / if LocalDataChangeEvent is triggered, execute the following code LocalDataChangeEvent evt = (LocalDataChangeEvent) event; ConfigExecutor.executeLongPolling (new DataChangeTask (evt.groupKey, evt.isBeta, evt.betaIps));} @ Override public Class
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.