Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement NIOServerCnxnFactory by zk Factory method

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces the zk factory method to achieve NIOServerCnxnFactory, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let Xiaobian take you to understand.

NIOServerCnxnFactory class

Inner class

AbstractSelectThread

AcceptThread

SelectorThread

Attribute

ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT

10s session expiration time

ZOOKEEPER_NIO_NUM_SELECTOR_THREADS

Number of selector threads

ZOOKEEPER_NIO_NUM_WORKER_THREADS

Number of worker threads

DirectBuffer

Buffer is used to exchange data between threads.

IpMap

Limit the number of connections on ip

CnxnExpiryQueue

Connection failure time bucket queue

WorkerPool

WorkerService worker execution service

AcceptThread

Receive a new connection and assign the simple round-robin to the selection thread

SelectorThreads

Method

Stop receiving

Private void pauseAccept (long millisecs) {acceptKey.interestOps (0); try {selector.select (millisecs);} catch (IOException e) {/ / ignore} finally {acceptKey.interestOps (SelectionKey.OP_ACCEPT);}} private boolean doAccept () {boolean accepted = false; SocketChannel sc = null; try {sc = acceptSocket.accept (); accepted = true InetAddress ia = sc.socket (). GetInetAddress (); int cnxncount = getClientCnxnCount (ia); if (maxClientCnxns > 0 & & cnxncount > = maxClientCnxns) {throw new IOException ("Too many connections from" + ia + "- max is" + maxClientCnxns);} LOG.debug ("Accepted socket connection from {}", sc.socket (). GetRemoteSocketAddress ()); sc.configureBlocking (false) / / Round-robin assign this connection to a selector thread if (! selectorIterator.hasNext ()) {selectorIterator = selectorThreads.iterator ();} SelectorThread selectorThread = selectorIterator.next (); if (! selectorThread.addAcceptedConnection (sc)) {throw new IOException ("Unable to add connection to selector queue" + (stopped? " (shutdown in progress) ":"));} acceptErrorLogger.flush ();} catch (IOException e) {/ / accept, maxClientCnxns, configureBlocking ServerMetrics.getMetrics (). CONNECTION_REJECTED.add (1); acceptErrorLogger.rateLimitLog ("Error accepting new connection:" + e.getMessage ()); fastCloseSock (sc);} return accepted;} private void processAcceptedConnections () {SocketChannel accepted While (! stopped & & (accepted = acceptedQueue.poll ())! = null) {SelectionKey key = null; try {key = accepted.register (selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection (accepted, key, this); key.attach (cnxn); addCnxn (cnxn) } catch (IOException e) {/ / register, createConnection cleanupSelectionKey (key); fastCloseSock (accepted);} configure get the number of client connections private int getClientCnxnCount (InetAddress cl) {Set s = ipMap.get (cl); if (s = = null) {return 0;} return s.size () } create connection protected NIOServerCnxn createConnection (SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException {return new NIOServerCnxn (zkServer, sock, sk, this, selectorThread);} create connection private void addCnxn (NIOServerCnxn cnxn) throws IOException {InetAddress addr = cnxn.getSocketAddress (); if (addr = = null) {throw new IOException ("Socket of" + cnxn + "has been closed");} Set set = ipMap.get (addr) If (set = = null) {/ / in general we will see 1 connection from each / / host, setting the initial cap to 2 allows us / / to minimize mem usage in the common case / / of 1 entry-we need to set the initial cap / / to 2 to avoid rehash when the first entry is added / / Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap (new ConcurrentHashMap (2)) / / Put the new set in the map, but only if another thread / / hasn't beaten us to it Set existingSet = ipMap.putIfAbsent (addr, set); if (existingSet! = null) {set = existingSet;}} set.add (cnxn); cnxns.add (cnxn); touchCnxn (cnxn) } think: why stand-alone and cluster mode are not the same stand-alone can be started directly from the log, snapshot recovery data cluster is divided according to roles, when it comes to data synchronization, thank you for reading this article carefully. I hope the article "zk factory method how to achieve NIOServerCnxnFactory" shared by the editor will be helpful to you. At the same time, I also hope you can support and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report