In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces the relevant knowledge of "what is the principle and function of DistroConsistencyServiceImpl in nacos". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Order
This paper mainly studies the DistroConsistencyServiceImpl of nacos.
ConsistencyService
Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java
Public interface ConsistencyService {/ * * Put a data related to a key to Nacos cluster * * @ param key key of data, this key should be globally unique * @ param value value of data * @ throws NacosException * @ see * / void put (String key, Record value) throws NacosException / * Remove a data from Nacos cluster * * @ param key key of data * @ throws NacosException * / void remove (String key) throws NacosException; / * Get a data from Nacos cluster * * @ param key key of data * @ return data related to the key * @ throws NacosException * / Datum get (String key) throws NacosException / * Listen for changes of a data * * @ param key key of data * @ param listener callback of data change * @ throws NacosException * / void listen (String key, RecordListener listener) throws NacosException / * Cancel listening of a data * * @ param key key of data * @ param listener callback of data change * @ throws NacosException * / void unlisten (String key, RecordListener listener) throws NacosException; / * * Tell the status of this consistency service * * @ return true if available * / boolean isAvailable ();}
ConsistencyService defines put, remove, get, listen, unlisten, isAvailable methods
EphemeralConsistencyService
Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java
Public interface EphemeralConsistencyService extends ConsistencyService {}
The EphemeralConsistencyService interface inherits the ConsistencyService interface
DistroConsistencyServiceImpl
Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@ org.springframework.stereotype.Service ("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor (1, new ThreadFactory () {@ Override public Thread newThread (Runnable r) {Thread t = newThread (r); t.setDaemon (true); t.setName ("com.alibaba.nacos.naming.distro.notifier"); return t;}}) @ Autowired private DistroMapper distroMapper; @ Autowired private DataStore dataStore; @ Autowired private TaskDispatcher taskDispatcher; @ Autowired private DataSyncer dataSyncer; @ Autowired private Serializer serializer; @ Autowired private ServerListManager serverListManager; @ Autowired private SwitchDomain switchDomain; @ Autowired private GlobalConfig globalConfig; private boolean initialized = false; public volatile Notifier notifier = new Notifier (); private Map listeners = new ConcurrentHashMap (); private Map syncChecksumTasks = new ConcurrentHashMap (16) @ PostConstruct public void init () {GlobalExecutor.submit (new Runnable () {@ Override public void run () {try {load ();} catch (Exception e) {Loggers.DISTRO.error ("load data failed.", e) }); executor.submit (notifier);} public void load () throws Exception {if (SystemUtils.STANDALONE_MODE) {initialized = true; return } / / size = 1 means only myself in the list, we need at least one another server alive: while (serverListManager.getHealthyServers (). Size () 0) {Map datumMap = serializer.deserializeMap (data, Instances.class); for (Map.Entry entry: datumMap.entrySet ()) {dataStore.put (entry.getKey (), entry.getValue ()) If (! listeners.containsKey (entry.getKey () {/ / pretty sure the service not exist: if (switchDomain.isDefaultInstanceEphemeral ()) {/ / create empty service Loggers.DISTRO.info ("creating service {}", entry.getKey ()); Service service = new Service () String serviceName = KeyBuilder.getServiceName (entry.getKey ()); String namespaceId = KeyBuilder.getNamespace (entry.getKey ()); service.setName (serviceName); service.setNamespaceId (namespaceId); service.setGroupName (Constants.DEFAULT_GROUP); / / now validate the service. If failed, exception will be thrown service.setLastModifiedMillis (System.currentTimeMillis ()); service.recalculateChecksum (); listeners.get (KeyBuilder.SERVICE_META_KEY_PREFIX) .get (0) .onchange (KeyBuilder.buildServiceMetaKey (namespaceId, serviceName), service) } for (Map.Entry entry: datumMap.entrySet ()) {if (! listeners.containsKey (entry.getKey () {/ / Should not happen: Loggers.DISTRO.warn ("listener of {} not found.", entry.getKey ()) Continue;} try {for (RecordListener listener: listeners.get (entry.getKey () {listener.onChange (entry.getKey (), entry.getValue () .value) }} catch (Exception e) {Loggers.DISTRO.error ("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey (), e); continue } / / Update data store if listener executed successfully: dataStore.put (entry.getKey (), entry.getValue ());}} / / @ Override public void put (String key, Record value) throws NacosException {onPut (key, value); taskDispatcher.addTask (key);} @ Override public void remove (String key) throws NacosException {onRemove (key); listeners.remove (key);} @ Override public Datum get (String key) throws NacosException {return dataStore.get (key);} /. @ Override public void listen (String key, RecordListener listener) throws NacosException {if (! listeners.containsKey (key)) {listeners.put (key, new CopyOnWriteArrayList ());} if (listeners.get (key) .requests (listener)) {return;} listeners.get (key) .add (listener) } @ Override public void unlisten (String key, RecordListener listener) throws NacosException {if (! listeners.containsKey (key)) {return;} for (RecordListener recordListener: listeners.get (key)) {if (recordListener.equals (listener)) {listeners.get (key) .remove (listener); break }} @ Override public boolean isAvailable () {return isInitialized () | | ServerStatus.UP.name () .equals (switchDomain.getOverriddenServerStatus ());} /.}
DistroConsistencyServiceImpl implements the EphemeralConsistencyService interface.
Its init method executes the load method asynchronously, which executes syncAllDataFromRemote for initialization, and the method gets data through NamingProxy.getAllData, and then executes processData, which mainly executes callback and then adds data to dataStore; init method finally executes Notifier asynchronously
Its put method executes the onPut method and taskDispatcher.addTask (key); its remove method executes the onRemove method, that is, listeners.remove (key); its get method reads directly from dataStore; its listen adds RecordListener;, its unlisten removes RecordListener;, and its isAvailable is judged by isInitialized and ServerStatus.UP status.
Notifier
Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
Public class Notifier implements Runnable {private ConcurrentHashMap services = new ConcurrentHashMap (10 * 1024); private BlockingQueue tasks = new LinkedBlockingQueue (1024 * 1024); public void addTask (String datumKey, ApplyAction action) {if (services.containsKey (datumKey) & & action = = ApplyAction.CHANGE) {return;} if (action = = ApplyAction.CHANGE) {services.put (datumKey, StringUtils.EMPTY) } tasks.add (Pair.with (datumKey, action));} public int getTaskSize () {return tasks.size ();} @ Override public void run () {Loggers.DISTRO.info ("distro notifier started") While (true) {try {Pair pair = tasks.take (); if (pair = = null) {continue;} String datumKey = (String) pair.getValue0 (); ApplyAction action = (ApplyAction) pair.getValue1 () Services.remove (datumKey); int count = 0; if (! listeners.containsKey (datumKey)) {continue;} for (RecordListener listener: listeners.get (datumKey)) {count++ Try {if (action = = ApplyAction.CHANGE) {listener.onChange (datumKey, dataStore.get (datumKey) .value); continue } if (action = = ApplyAction.DELETE) {listener.onDelete (datumKey); continue }} catch (Throwable e) {Loggers.DISTRO.error ("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e) }} if (Loggers.DISTRO.isDebugEnabled ()) {Loggers.DISTRO.debug ("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name ()) }} catch (Throwable e) {Loggers.DISTRO.error ("[NACOS-DISTRO] Error while handling notifying task", e);}}
Notifier implements the Runnable interface, and its run method fetches the task from the LinkedBlockingQueue and then performs the listener callback one by one
Summary
DistroConsistencyServiceImpl implements the EphemeralConsistencyService interface.
Its init method executes the load method asynchronously, which executes syncAllDataFromRemote for initialization, and the method gets data through NamingProxy.getAllData, and then executes processData, which mainly executes callback and then adds data to dataStore; init method finally executes Notifier asynchronously
Its put method executes the onPut method and taskDispatcher.addTask (key); its remove method executes the onRemove method, that is, listeners.remove (key); its get method reads directly from dataStore; its listen adds RecordListener;, its unlisten removes RecordListener;, and its isAvailable is judged by isInitialized and ServerStatus.UP status.
This is the end of the introduction of "what is the principle and function of DistroConsistencyServiceImpl in nacos". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.