Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Principle and Application of PushService in nacos server

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "the principle and application of PushService in nacos server". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "the principle and application of PushService in nacos server".

Order

This paper mainly studies the PushService of nacos server.

PushService

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

Componentpublic class PushService implements ApplicationContextAware, ApplicationListener {@ Autowired private SwitchDomain switchDomain; private ApplicationContext applicationContext; private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos (10L); private static final int MAX_RETRY_TIMES = 1; private static volatile ConcurrentMap ackMap = new ConcurrentHashMap (); private static ConcurrentMap clientMap = new ConcurrentHashMap (); private static volatile ConcurrentHashMap udpSendTimeMap = new ConcurrentHashMap (); public static volatile ConcurrentHashMap pushCostMap = new ConcurrentHashMap (); private static int totalPush = 0 Private static int failedPush = 0; private static ConcurrentHashMap lastPushMillisMap = new ConcurrentHashMap (); private static DatagramSocket udpSocket; private static Map futureMap = new ConcurrentHashMap (); private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor (new ThreadFactory () {@ Override public Thread newThread (Runnable r) {Thread t = newThread (r); t.setDaemon (true); t.setName ("com.alibaba.nacos.naming.push.retransmitter") Return t;}}); private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor (new ThreadFactory () {@ Override public Thread newThread (Runnable r) {Thread t = newThread (r); t.setDaemon (true); t.setName ("com.alibaba.nacos.naming.push.udpSender"); return t;}}) Static {try {udpSocket = new DatagramSocket (); Receiver receiver = new Receiver (); Thread inThread = new Thread (receiver); inThread.setDaemon (true); inThread.setName ("com.alibaba.nacos.naming.push.receiver"); inThread.start () ExecutorService.scheduleWithFixedDelay (new Runnable () {@ Override public void run () {try {removeClientIfZombie ();} catch (Throwable e) {Loggers.PUSH.warn ("[NACOS-PUSH] failed to remove client zombie") }, 0,20, TimeUnit.SECONDS);} catch (SocketException e) {Loggers.SRV_LOG.error ("[NACOS-PUSH] failed to init push service");} @ Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;} / /. Public static void removeClientIfZombie () {int size = 0; for (Map.Entry entry: clientMap.entrySet ()) {ConcurrentMap clientConcurrentMap = entry.getValue (); for (Map.Entry entry1: clientConcurrentMap.entrySet ()) {PushClient client = entry1.getValue (); if (client.zombie ()) {clientConcurrentMap.remove (entry1.getKey ()) }} size + = clientConcurrentMap.size ();} if (Loggers.PUSH.isDebugEnabled ()) {Loggers.PUSH.debug ("[NACOS-PUSH] clientMap size: {}", size);}} /.}

PushService implements ApplicationContextAware and ApplicationListener interfaces; it has two ScheduledExecutorService, one for retransmitter and one for udpSender; its static code block creates a Deamon thread execution Receiver and registers a scheduled task execution removeClientIfZombie, which traverses the clientMap and removes the client of the zombie

Receiver

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

Public static class Receiver implements Runnable {@ Override public void run () {while (true) {byte [] buffer = new byte [1024 * 64]; DatagramPacket packet = new DatagramPacket (buffer, buffer.length); try {udpSocket.receive (packet) String json = new String (packet.getData (), 0, packet.getLength (), Charset.forName ("UTF-8")). Trim (); AckPacket ackPacket = JSON.parseObject (json, AckPacket.class); InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress (); String ip = socketAddress.getAddress (). GetHostAddress () Int port = socketAddress.getPort (); if (System.nanoTime ()-ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {Loggers.PUSH.warn ("ack takes too long from {} ack json: {}", packet.getSocketAddress (), json);} String ackKey = getACKKey (ip, port, ackPacket.lastRefTime) AckEntry ackEntry = ackMap.remove (ackKey); if (ackEntry = = null) {throw new IllegalStateException ("unable to find ackEntry for key:" + ackKey + ", ack json:" + json);} long pushCost = System.currentTimeMillis ()-udpSendTimeMap.get (ackKey) Loggers.PUSH.info ("received ack: {} from: {}:, cost: {} ms, unacked: {}, total push: {}", json, ip, port, pushCost, ackMap.size (), totalPush); pushCostMap.put (ackKey, pushCost); udpSendTimeMap.remove (ackKey) } catch (Throwable e) {Loggers.PUSH.error ("[NACOS-PUSH] error while receiving ack data", e);} / /. Public static class AckPacket {public String type; public long lastRefTime; public String data;}}

Receiver implements the Runnable interface, and its run method uses a while true loop to execute udpSocket.receive, then parses the AckPacket, removes the ackKey from the ackMap, updates the pushCostMap, and removes the ackKey from the udpSendTimeMap

PushClient

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

Public class PushClient {private String namespaceId; private String serviceName; private String clusters; private String agent; private String tenant; private String app; private InetSocketAddress socketAddr; private DataSource dataSource; private Map params; public Map getParams () {return params;} public void setParams (Map params) {this.params = params } public long lastRefTime = System.currentTimeMillis () Public PushClient (String namespaceId, String serviceName, String clusters, String agent, InetSocketAddress socketAddr, DataSource dataSource, String tenant String app) {this.namespaceId = namespaceId This.serviceName = serviceName; this.clusters = clusters; this.agent = agent; this.socketAddr = socketAddr; this.dataSource = dataSource; this.tenant = tenant; this.app = app;} public DataSource getDataSource () {return dataSource } public PushClient (InetSocketAddress socketAddr) {this.socketAddr = socketAddr;} public boolean zombie () {return System.currentTimeMillis ()-lastRefTime > switchDomain.getPushCacheMillis (serviceName) } @ Override public String toString () {return "serviceName:" + serviceName + ", clusters:" + clusters + ", ip:" + socketAddr.getAddress () .getHostAddress () + ", port:" + socketAddr.getPort () + ", agent:" + agent } public String getAgent () {return agent;} public String getAddrStr () {return socketAddr.getAddress () .getHostAddress () + ":" + socketAddr.getPort ();} public String getIp () {return socketAddr.getAddress () .getHostAddress () } @ Override public int hashCode () {return Objects.hash (serviceName, clusters, socketAddr);} @ Override public boolean equals (Object obj) {if (! (obj instanceof PushClient)) {return false;} PushClient other = (PushClient) obj Return serviceName.equals (other.serviceName) & & clusters.equals (other.clusters) & & socketAddr.equals (other.socketAddr);} public String getClusters () {return clusters;} public void setClusters (String clusters) {this.clusters = clusters;} public String getNamespaceId () {return namespaceId } public void setNamespaceId (String namespaceId) {this.namespaceId = namespaceId;} public String getServiceName () {return serviceName;} public void setServiceName (String serviceName) {this.serviceName = serviceName;} public String getTenant () {return tenant } public void setTenant (String tenant) {this.tenant = tenant;} public String getApp () {return app;} public void setApp (String app) {this.app = app;} public InetSocketAddress getSocketAddr () {return socketAddr } public void refresh () {lastRefTime = System.currentTimeMillis ();}}

PushClient encapsulates the information such as the address of the target service to be pushed. It provides a zombie method to determine whether the target service is zombie. It determines whether the time difference from lastRefTime exceeds the PushCacheMillis of the serviceName specified by switchDomain (the default is 10 seconds). If it exceeds, it is determined to be zombie.

PushService.onApplicationEvent@Componentpublic class PushService implements ApplicationContextAware, ApplicationListener {/ /. @ Override public void onApplicationEvent (ServiceChangeEvent event) {Service service = event.getService (); String serviceName = service.getName (); String namespaceId = service.getNamespaceId (); Future future = udpSender.schedule (new Runnable () {@ Override public void run () {try {Loggers.PUSH.info (serviceName + "is changed, add it to push queue.") ConcurrentMap clients = clientMap.get (UtilsAndCommons.assembleFullServiceName (namespaceId, serviceName)); if (MapUtils.isEmpty (clients)) {return;} Map cache = new HashMap (16); long lastRefTime = System.nanoTime () For (PushClient client: clients.values ()) {if (client.zombie ()) {Loggers.PUSH.debug ("client is zombie:" + client.toString ()); clients.remove (client.toString ()) Loggers.PUSH.debug ("client is zombie:" + client.toString ()); continue;} Receiver.AckEntry ackEntry; Loggers.PUSH.debug ("push serviceName: {} to client: {}", serviceName, client.toString ()) String key = getPushCacheKey (serviceName, client.getIp (), client.getAgent ()); byte [] compressData = null; Map data = null If (switchDomain.getDefaultPushCacheMillis () > = 20000 & & cache.containsKey (key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get (key); compressData = (byte []) (pair.getValue0 ()); data = (Map) pair.getValue1 () Loggers.PUSH.debug ("[PUSH-CACHE] cache hit: {}: {}", serviceName, client.getAddrStr ());} if (compressData! = null) {ackEntry = prepareAckEntry (client, compressData, data, lastRefTime) } else {ackEntry = prepareAckEntry (client, prepareHostsData (client), lastRefTime); if (ackEntry! = null) {cache.put (key, new org.javatuples.Pair (ackEntry.origin.getData (), ackEntry.data)) }} Loggers.PUSH.info ("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName (), client.getAddrStr (), client.getAgent (), (ackEntry = = null? Null: ackEntry.key); udpPush (ackEntry);}} catch (Exception e) {Loggers.PUSH.error ("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove (UtilsAndCommons.assembleFullServiceName (namespaceId, serviceName)) }, 1000, TimeUnit.MILLISECONDS); futureMap.put (UtilsAndCommons.assembleFullServiceName (namespaceId, serviceName), future);} / /. Public void serviceChanged (Service service) {/ / merge some change events to reduce the push frequency: if (futureMap.containsKey (UtilsAndCommons.assembleFullServiceName (service.getNamespaceId (), service.getName () {return;} this.applicationContext.publishEvent (new ServiceChangeEvent (this, service));} / /. }

OnApplicationEvent processes ServiceChangeEvent, it registers a deferred task and puts the future into futureMap;. The deferred task gets the specified namespaceId from clientMap, the clients; of serviceName then traverses the clients to determine whether it is zombie, if so, removes the client, otherwise creates Receiver.AckEntry, then executes udpPush (ackEntry), and finally removes the future;serviceChanged method from futureMap and provides it to external calls to publish ServiceChangeEvent

PushService.udpPush

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

Componentpublic class PushService implements ApplicationContextAware, ApplicationListener {/ / Public static class Receiver implements Runnable {/ /. Public static class AckEntry {public AckEntry (String key, DatagramPacket packet) {this.key = key; this.origin = packet;} public void increaseRetryTime () {retryTimes.incrementAndGet ();} public int getRetryTimes () {return retryTimes.get () } public String key; public DatagramPacket origin; private AtomicInteger retryTimes = new AtomicInteger (0); public Map data;} / /. } private static Receiver.AckEntry udpPush (Receiver.AckEntry ackEntry) {if (ackEntry = = null) {Loggers.PUSH.error ("[NACOS-PUSH] ackEntry is null."); return null;} if (ackEntry.getRetryTimes () > MAX_RETRY_TIMES) {Loggers.PUSH.warn ("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key) AckMap.remove (ackEntry.key); udpSendTimeMap.remove (ackEntry.key); failedPush + = 1; return ackEntry;} try {if (! ackMap.containsKey (ackEntry.key)) {totalPush++;} ackMap.put (ackEntry.key, ackEntry) UdpSendTimeMap.put (ackEntry.key, System.currentTimeMillis ()); Loggers.PUSH.info ("send udp packet:" + ackEntry.key); udpSocket.send (ackEntry.origin); ackEntry.increaseRetryTime (); executorService.schedule (new Retransmitter (ackEntry), TimeUnit.NANOSECONDS.toMillis (ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry } catch (Exception e) {Loggers.PUSH.error ("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress (). GetHostAddress (), e); ackMap.remove (ackEntry.key); udpSendTimeMap.remove (ackEntry.key); failedPush + = 1; return null }} / /.}

The udpPush method determines based on the information of Receiver.AckEntry, terminates push if the number of retries is greater than MAX_RETRY_TIMES, removes it from ackMap and udpSendTimeMap; if it can be retried, put its ackEntry.key into ackMap and udpSendTimeMap, then executes udpSocket.send (ackEntry.origin) and ackEntry.increaseRetryTime (), and registers Retransmitter's deferred task; if there is an exception, remove it from ackMap and udpSendTimeMap

Retransmitter

Nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

Public static class Retransmitter implements Runnable {Receiver.AckEntry ackEntry; public Retransmitter (Receiver.AckEntry ackEntry) {this.ackEntry = ackEntry;} @ Override public void run () {if (ackMap.containsKey (ackEntry.key)) {Loggers.PUSH.info ("retry to push data, key:" + ackEntry.key); udpPush (ackEntry) }}}

Retransmitter implements the Runnable method, and its run method performs a udpPush retry if the ackMap contains ackEntry.key

Summary

PushService implements ApplicationContextAware and ApplicationListener interfaces.

Its static code block creates a Deamon thread execution Receiver and registers a scheduled task execution removeClientIfZombie, which traverses the clientMap and removes the client of the zombie

Its onApplicationEvent handles ServiceChangeEvent, it registers a deferred task and puts the future into futureMap;. The deferred task gets the specified namespaceId from clientMap, serviceName's clients; then traverses the clients to determine whether it is zombie, if so, removes the client, otherwise creates Receiver.AckEntry, then executes udpPush (ackEntry), and finally removes the future;serviceChanged method from futureMap and provides it to external calls to publish ServiceChangeEvent

Thank you for reading, the above is the content of "the principle and application of PushService in nacos server". After the study of this article, I believe you have a deeper understanding of the principle and application of PushService in nacos server, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report