In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "what is the principle and function of UpdatedServiceProcessor in nacos ServiceManager". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "what is the principle and function of UpdatedServiceProcessor in nacos ServiceManager"?
Order
This paper mainly studies the UpdatedServiceProcessor of nacos ServiceManager.
ServiceManager.init
Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@ Component@DependsOn ("nacosApplicationContext") public class ServiceManager implements RecordListener {/ * * Map * / private Map serviceMap = new ConcurrentHashMap (); private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque (1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer (); private final Lock lock = new ReentrantLock (); @ Resource (name = "consistencyDelegate") private ConsistencyService consistencyService; @ Autowired private SwitchDomain switchDomain; @ Autowired private DistroMapper distroMapper; @ Autowired private ServerListManager serverListManager; @ Autowired private PushService pushService Private final Object putServiceLock = new Object (); @ PostConstruct public void init () {UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule (new ServiceReporter (), 60000, TimeUnit.MILLISECONDS); UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit (new UpdatedServiceProcessor ()); try {Loggers.SRV_LOG.info ("listen for service meta change"); consistencyService.listen (KeyBuilder.SERVICE_META_KEY_PREFIX, this) } catch (NacosException e) {Loggers.SRV_LOG.error ("listen for service meta change failed!");}} /.}
The init method of ServiceManager submits a UpdatedServiceProcessor task to UtilsAndCommons.SERVICE_UPDATE_EXECUTOR
UpdatedServiceProcessor
Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
Private class UpdatedServiceProcessor implements Runnable {/ / get changed service from other server asynchronously @ Override public void run () {ServiceKey serviceKey = null; try {while (true) {try {serviceKey = toBeUpdatedServicesQueue.take () } catch (Exception e) {Loggers.EVT_LOG.error ("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");} if (serviceKey = = null) {continue;} GlobalExecutor.submitServiceUpdate (new ServiceUpdater (serviceKey)) } catch (Exception e) {Loggers.EVT_LOG.error ("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);}
UpdatedServiceProcessor implements the Runnable method, whose run method iteratively fetches elements from the toBeUpdatedServicesQueue and then submits the ServiceUpdater using GlobalExecutor.submitServiceUpdate
ServiceUpdater
Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
Private class ServiceUpdater implements Runnable {String namespaceId; String serviceName; String serverIP; public ServiceUpdater (ServiceKey serviceKey) {this.namespaceId = serviceKey.getNamespaceId (); this.serviceName = serviceKey.getServiceName (); this.serverIP = serviceKey.getServerIP () } @ Override public void run () {try {updatedHealthStatus (namespaceId, serviceName, serverIP);} catch (Exception e) {Loggers.SRV_LOG.warn ("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName, serverIP, e) }}}
ServiceUpdater implements the Runnable interface, and its run method executes updatedHealthStatus
ServiceManager.updatedHealthStatus
Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@ Component@DependsOn ("nacosApplicationContext") public class ServiceManager implements RecordListener {/ * * Map * / private Map serviceMap = new ConcurrentHashMap (); private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque (1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer (); private final Lock lock = new ReentrantLock (); @ Resource (name = "consistencyDelegate") private ConsistencyService consistencyService; @ Autowired private SwitchDomain switchDomain; @ Autowired private DistroMapper distroMapper; @ Autowired private ServerListManager serverListManager; @ Autowired private PushService pushService Private final Object putServiceLock = new Object () /. Public void updatedHealthStatus (String namespaceId, String serviceName, String serverIP) {Message msg = synchronizer.get (serverIP, UtilsAndCommons.assembleFullServiceName (namespaceId, serviceName)); JSONObject serviceJson = JSON.parseObject (msg.getData ()); JSONArray ipList = serviceJson.getJSONArray ("ips"); Map ipsMap = new HashMap (ipList.size ()); for (int I = 0; I < ipList.size (); iTunes +) {String ip = ipList.getString (I) String [] strings = ip.split ("_"); ipsMap.put (strings [0], strings [1]);} Service service = getService (namespaceId, serviceName); if (service = = null) {return;} boolean changed = false; List instances = service.allIPs () For (Instance instance: instances) {boolean valid = Boolean.parseBoolean (ipsMap.get (instance.toIPAddr (); if (valid! = instance.isHealthy ()) {changed = true; instance.setHealthy (valid) Loggers.EVT_LOG.info ("{} {SYNC} IP- {}: {} @ {} {}", serviceName, (instance.isHealthy ()? "ENABLED": "DISABLED"), instance.getIp (), instance.getPort (), instance.getClusterName ();}} if (changed) {pushService.serviceChanged (service);} StringBuilder stringBuilder = new StringBuilder (); List allIps = service.allIPs () For (Instance instance: allIps) {stringBuilder.append (instance.toIPAddr ()) .append ("_") .append (instance.isHealthy ()) .append (",") } if (changed & & Loggers.EVT_LOG.isDebugEnabled ()) {Loggers.EVT_LOG.debug ("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId (), service.getName (), stringBuilder.toString ());}} /.}
The updatedHealthStatus method gets the msg from synchronizer, assembles the ipsMap, then obtains the instances information through service.allIPs (), then traverses the instances to obtain the valid status of the instance from ipsMap, marks it as changed if it does not match the isHealthy () of instance, updates the healthy; of instance, publishes the event through pushService.serviceChanged (service), and finally prints the log
Summary
The init method of ServiceManager submits a UpdatedServiceProcessor task to UtilsAndCommons.SERVICE_UPDATE_EXECUTOR
UpdatedServiceProcessor implements the Runnable method, whose run method iteratively fetches elements from the toBeUpdatedServicesQueue and then submits the ServiceUpdater using GlobalExecutor.submitServiceUpdate
ServiceUpdater implements the Runnable interface, and its run method executes updatedHealthStatus
At this point, I believe you have a deeper understanding of "what is the principle and function of UpdatedServiceProcessor in nacos ServiceManager". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.