In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces the zk factory method to achieve NIOServerCnxnFactory, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let Xiaobian take you to understand.
NIOServerCnxnFactory class
Inner class
AbstractSelectThread
AcceptThread
SelectorThread
Attribute
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT
10s session expiration time
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS
Number of selector threads
ZOOKEEPER_NIO_NUM_WORKER_THREADS
Number of worker threads
DirectBuffer
Buffer is used to exchange data between threads.
IpMap
Limit the number of connections on ip
CnxnExpiryQueue
Connection failure time bucket queue
WorkerPool
WorkerService worker execution service
AcceptThread
Receive a new connection and assign the simple round-robin to the selection thread
SelectorThreads
Method
Stop receiving
Private void pauseAccept (long millisecs) {acceptKey.interestOps (0); try {selector.select (millisecs);} catch (IOException e) {/ / ignore} finally {acceptKey.interestOps (SelectionKey.OP_ACCEPT);}} private boolean doAccept () {boolean accepted = false; SocketChannel sc = null; try {sc = acceptSocket.accept (); accepted = true InetAddress ia = sc.socket (). GetInetAddress (); int cnxncount = getClientCnxnCount (ia); if (maxClientCnxns > 0 & & cnxncount > = maxClientCnxns) {throw new IOException ("Too many connections from" + ia + "- max is" + maxClientCnxns);} LOG.debug ("Accepted socket connection from {}", sc.socket (). GetRemoteSocketAddress ()); sc.configureBlocking (false) / / Round-robin assign this connection to a selector thread if (! selectorIterator.hasNext ()) {selectorIterator = selectorThreads.iterator ();} SelectorThread selectorThread = selectorIterator.next (); if (! selectorThread.addAcceptedConnection (sc)) {throw new IOException ("Unable to add connection to selector queue" + (stopped? " (shutdown in progress) ":"));} acceptErrorLogger.flush ();} catch (IOException e) {/ / accept, maxClientCnxns, configureBlocking ServerMetrics.getMetrics (). CONNECTION_REJECTED.add (1); acceptErrorLogger.rateLimitLog ("Error accepting new connection:" + e.getMessage ()); fastCloseSock (sc);} return accepted;} private void processAcceptedConnections () {SocketChannel accepted While (! stopped & & (accepted = acceptedQueue.poll ())! = null) {SelectionKey key = null; try {key = accepted.register (selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection (accepted, key, this); key.attach (cnxn); addCnxn (cnxn) } catch (IOException e) {/ / register, createConnection cleanupSelectionKey (key); fastCloseSock (accepted);} configure get the number of client connections private int getClientCnxnCount (InetAddress cl) {Set s = ipMap.get (cl); if (s = = null) {return 0;} return s.size () } create connection protected NIOServerCnxn createConnection (SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException {return new NIOServerCnxn (zkServer, sock, sk, this, selectorThread);} create connection private void addCnxn (NIOServerCnxn cnxn) throws IOException {InetAddress addr = cnxn.getSocketAddress (); if (addr = = null) {throw new IOException ("Socket of" + cnxn + "has been closed");} Set set = ipMap.get (addr) If (set = = null) {/ / in general we will see 1 connection from each / / host, setting the initial cap to 2 allows us / / to minimize mem usage in the common case / / of 1 entry-we need to set the initial cap / / to 2 to avoid rehash when the first entry is added / / Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap (new ConcurrentHashMap (2)) / / Put the new set in the map, but only if another thread / / hasn't beaten us to it Set existingSet = ipMap.putIfAbsent (addr, set); if (existingSet! = null) {set = existingSet;}} set.add (cnxn); cnxns.add (cnxn); touchCnxn (cnxn) } think: why stand-alone and cluster mode are not the same stand-alone can be started directly from the log, snapshot recovery data cluster is divided according to roles, when it comes to data synchronization, thank you for reading this article carefully. I hope the article "zk factory method how to achieve NIOServerCnxnFactory" shared by the editor will be helpful to you. At the same time, I also hope you can support and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.