Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the principle and function of UpdatedServiceProcessor in nacos ServiceManager

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "what is the principle and function of UpdatedServiceProcessor in nacos ServiceManager". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "what is the principle and function of UpdatedServiceProcessor in nacos ServiceManager"?

Order

This paper mainly studies the UpdatedServiceProcessor of nacos ServiceManager.

ServiceManager.init

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@ Component@DependsOn ("nacosApplicationContext") public class ServiceManager implements RecordListener {/ * * Map * / private Map serviceMap = new ConcurrentHashMap (); private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque (1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer (); private final Lock lock = new ReentrantLock (); @ Resource (name = "consistencyDelegate") private ConsistencyService consistencyService; @ Autowired private SwitchDomain switchDomain; @ Autowired private DistroMapper distroMapper; @ Autowired private ServerListManager serverListManager; @ Autowired private PushService pushService Private final Object putServiceLock = new Object (); @ PostConstruct public void init () {UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule (new ServiceReporter (), 60000, TimeUnit.MILLISECONDS); UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit (new UpdatedServiceProcessor ()); try {Loggers.SRV_LOG.info ("listen for service meta change"); consistencyService.listen (KeyBuilder.SERVICE_META_KEY_PREFIX, this) } catch (NacosException e) {Loggers.SRV_LOG.error ("listen for service meta change failed!");}} /.}

The init method of ServiceManager submits a UpdatedServiceProcessor task to UtilsAndCommons.SERVICE_UPDATE_EXECUTOR

UpdatedServiceProcessor

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

Private class UpdatedServiceProcessor implements Runnable {/ / get changed service from other server asynchronously @ Override public void run () {ServiceKey serviceKey = null; try {while (true) {try {serviceKey = toBeUpdatedServicesQueue.take () } catch (Exception e) {Loggers.EVT_LOG.error ("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");} if (serviceKey = = null) {continue;} GlobalExecutor.submitServiceUpdate (new ServiceUpdater (serviceKey)) } catch (Exception e) {Loggers.EVT_LOG.error ("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);}

UpdatedServiceProcessor implements the Runnable method, whose run method iteratively fetches elements from the toBeUpdatedServicesQueue and then submits the ServiceUpdater using GlobalExecutor.submitServiceUpdate

ServiceUpdater

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

Private class ServiceUpdater implements Runnable {String namespaceId; String serviceName; String serverIP; public ServiceUpdater (ServiceKey serviceKey) {this.namespaceId = serviceKey.getNamespaceId (); this.serviceName = serviceKey.getServiceName (); this.serverIP = serviceKey.getServerIP () } @ Override public void run () {try {updatedHealthStatus (namespaceId, serviceName, serverIP);} catch (Exception e) {Loggers.SRV_LOG.warn ("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName, serverIP, e) }}}

ServiceUpdater implements the Runnable interface, and its run method executes updatedHealthStatus

ServiceManager.updatedHealthStatus

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@ Component@DependsOn ("nacosApplicationContext") public class ServiceManager implements RecordListener {/ * * Map * / private Map serviceMap = new ConcurrentHashMap (); private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque (1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer (); private final Lock lock = new ReentrantLock (); @ Resource (name = "consistencyDelegate") private ConsistencyService consistencyService; @ Autowired private SwitchDomain switchDomain; @ Autowired private DistroMapper distroMapper; @ Autowired private ServerListManager serverListManager; @ Autowired private PushService pushService Private final Object putServiceLock = new Object () /. Public void updatedHealthStatus (String namespaceId, String serviceName, String serverIP) {Message msg = synchronizer.get (serverIP, UtilsAndCommons.assembleFullServiceName (namespaceId, serviceName)); JSONObject serviceJson = JSON.parseObject (msg.getData ()); JSONArray ipList = serviceJson.getJSONArray ("ips"); Map ipsMap = new HashMap (ipList.size ()); for (int I = 0; I < ipList.size (); iTunes +) {String ip = ipList.getString (I) String [] strings = ip.split ("_"); ipsMap.put (strings [0], strings [1]);} Service service = getService (namespaceId, serviceName); if (service = = null) {return;} boolean changed = false; List instances = service.allIPs () For (Instance instance: instances) {boolean valid = Boolean.parseBoolean (ipsMap.get (instance.toIPAddr (); if (valid! = instance.isHealthy ()) {changed = true; instance.setHealthy (valid) Loggers.EVT_LOG.info ("{} {SYNC} IP- {}: {} @ {} {}", serviceName, (instance.isHealthy ()? "ENABLED": "DISABLED"), instance.getIp (), instance.getPort (), instance.getClusterName ();}} if (changed) {pushService.serviceChanged (service);} StringBuilder stringBuilder = new StringBuilder (); List allIps = service.allIPs () For (Instance instance: allIps) {stringBuilder.append (instance.toIPAddr ()) .append ("_") .append (instance.isHealthy ()) .append (",") } if (changed & & Loggers.EVT_LOG.isDebugEnabled ()) {Loggers.EVT_LOG.debug ("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId (), service.getName (), stringBuilder.toString ());}} /.}

The updatedHealthStatus method gets the msg from synchronizer, assembles the ipsMap, then obtains the instances information through service.allIPs (), then traverses the instances to obtain the valid status of the instance from ipsMap, marks it as changed if it does not match the isHealthy () of instance, updates the healthy; of instance, publishes the event through pushService.serviceChanged (service), and finally prints the log

Summary

The init method of ServiceManager submits a UpdatedServiceProcessor task to UtilsAndCommons.SERVICE_UPDATE_EXECUTOR

UpdatedServiceProcessor implements the Runnable method, whose run method iteratively fetches elements from the toBeUpdatedServicesQueue and then submits the ServiceUpdater using GlobalExecutor.submitServiceUpdate

ServiceUpdater implements the Runnable interface, and its run method executes updatedHealthStatus

At this point, I believe you have a deeper understanding of "what is the principle and function of UpdatedServiceProcessor in nacos ServiceManager". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report