In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces how to establish a connection server for Zookeeper. The content is very detailed. Interested friends can use it for reference. I hope it will be helpful to you.
There are two types of NIOServerCnxnFactory and NettyServerCnxnFactory for the server to process the request, and the default is NIOServerCnxnFactory, which can be modified by specifying the zookeeper.serverCnxnFactory parameter.
The logic of the two classes is the same, except that one uses the native NIO of java and the other uses netty. Here we will analyze NIOServerCnxnFactory.
NIOServerCnxnFactory implements the Runnable interface, take a look at its run method, loop the request
/ / Line 200 of NIOServerCnxnFactory.java// public void run () {while (! ss.socket (). IsClosed ()) {try {selector.select (1000); Set selected; synchronized (this) {selected = selector.selectedKeys ();} ArrayList selectedList = new ArrayList (selected) Collections.shuffle (selectedList); for (SelectionKey k: selectedList) {/ / if it is a connection request if ((k.readyOps () & SelectionKey.OP_ACCEPT)! = 0) {SocketChannel sc = ((ServerSocketChannel) k. Channel ()) .accept () InetAddress ia = sc.socket () .getInetAddress (); / / get the number of client connections corresponding to the IP address int cnxncount = getClientCnxnCount (ia) / / if it exceeds, close if (maxClientCnxns > 0 & & cnxncount > = maxClientCnxns) {LOG.warn ("Too many connections from" + ia + "- max is" + maxClientCnxns); sc.close () } else {LOG.info ("Accepted socket connection from" + sc.socket () .getRemoteSocketAddress ()); sc.configureBlocking (false) SelectionKey sk = sc.register (selector, SelectionKey.OP_READ); / / each connection is a NIOServerCnxn NIOServerCnxn cnxn = createConnection (sc, sk); sk.attach (cnxn) AddCnxn (cnxn);}} else if ((k.readyOps () & (SelectionKey.OP_READ | SelectionKey.OP_WRITE))! = 0) {/ / in the second loop, it will enter here to process the real connection request NIOServerCnxn c = (NIOServerCnxn) k.attachment () C.doIO (k);} else {if (LOG.isDebugEnabled ()) {LOG.debug ("Unexpected ops in select" + k.readyOps ()) } selected.clear ();} catch (RuntimeException e) {LOG.warn ("Ignoring unexpected runtime exception", e);} catch (Exception e) {LOG.warn ("Ignoring exception", e);}} closeAll (); LOG.info ("NIOServerCnxn factory exited run method") } / / Line 237 of NIOServerCnxn.java// void doIO (SelectionKey k) throws InterruptedException {try {if (isSocketOpen () = = false) {LOG.warn ("trying to do I on a null socket for session:0x" + Long.toHexString (sessionId)); return;} if (k.isReadable ()) {int rc = sock.read (incomingBuffer) If (rc
< 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword readPayload(); } else { // four letter words take care // need not do anything else return; } } } //省略部分代码 } catch (CancelledKeyException e) { } catch (CloseRequestException e) { } catch (EndOfStreamException e) { } catch (IOException e) { }}//NIOServerCnxn.java//第194行private void readPayload() throws IOException, InterruptedException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } } if (incomingBuffer.remaining() == 0) { // have we read length bytes? packetReceived(); incomingBuffer.flip(); if (!initialized) { readConnectRequest(); } else { readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; }}//NIOServerCnxn.java//第434行private void readConnectRequest() throws IOException, InterruptedException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } zkServer.processConnectRequest(this, incomingBuffer); initialized = true;}//ZookeeperServer.java//第886行public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress() + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen())); } boolean readOnly = false; try { readOnly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { // this is ok -- just a packet from an old client which // doesn't contain readOnly field LOG.warn("Connection request from old client " + cnxn.getRemoteSocketAddress() + "; will be dropped if server is in r-o mode"); } //如果客户端没有设置readOnly,但是服务端是只读的,直接抛出异常关闭连接 if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg); } if (connReq.getLastZxidSeen() >ZkDb.dataTree.lastProcessedZxid) {String msg = "Refusing session request for client" + cnxn.getRemoteSocketAddress () + "as it has seen zxid 0x" + Long.toHexString (connReq.getLastZxidSeen ()) + "our last zxid is 0x" + Long.toHexString (getZKDatabase (). GetDataTreeLastProcessedZxid ()) + "client must try another server"; LOG.info (msg) Throw new CloseRequestException (msg);} / / negotiate session timeout int sessionTimeout = connReq.getTimeOut (); byte passwd [] = connReq.getPasswd (); int minSessionTimeout = getMinSessionTimeout (); if (sessionTimeout
< minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout >MaxSessionTimeout) {sessionTimeout = maxSessionTimeout;} cnxn.setSessionTimeout (sessionTimeout); / / We don't want to receive any packets until we are sure that the / / session is setup cnxn.disableRecv (); long sessionId = connReq.getSessionId () If (sessionId! = 0) {/ / if sessionId is not 0, it means that the previously connected client reconnects due to disconnection and other reasons long clientSessionId = connReq.getSessionId (); LOG.info ("Client attempting to renew session 0x" + Long.toHexString (clientSessionId) + "at" + cnxn.getRemoteSocketAddress ()); serverCnxnFactory.closeSession (sessionId) Cnxn.setSessionId (sessionId); reopenSession (cnxn, sessionId, passwd, sessionTimeout);} else {LOG.info ("Client attempting to establish new session at" + cnxn.getRemoteSocketAddress ()); createSession (cnxn, passwd, sessionTimeout) }} / / Line 617 of ZookeeperServer.java// long createSession (ServerCnxn cnxn, byte passwd [], int timeout) {/ / creating the session management of a session,zookeeper is quite complex. In the specific case, the chapter analyzes long sessionId = sessionTracker.createSession (timeout); Random r = new Random (sessionId ^ superSecret); r.nextBytes (passwd); ByteBuffer to = ByteBuffer.allocate (4); to.putInt (timeout); cnxn.setSessionId (sessionId). / / response client submitRequest (cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId;} / / ZookeeperServer.java// line 728 public void submitRequest (Request si) {/ / omit part of the code try {/ / refresh session timeout touch (si.cnxn); boolean validpacket = Request.isValid (si.type) If (validpacket) {/ / submitted to PrepRequestProcessor for further processing firstProcessor.processRequest (si); if (si.cnxn! = null) {incInProcess ();}} else {LOG.warn ("Received packet at server of unknown type" + si.type); new UnimplementedRequestProcessor () .processRequest (si) } catch (MissingSessionException e) {if (LOG.isDebugEnabled ()) {LOG.debug ("Dropping request:" + e.getMessage ());}} catch (RequestProcessorException e) {LOG.error ("Unable to process request:" + e.getMessage (), e) }} / PrepRequestProcessor.java// Line 294 protected void pRequest2Txn (int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {request.hdr = new TxnHeader (request.sessionId, request.cxid, zxid, Time.currentWallTime (), type) Switch (type) {/ / omit part of the code case OpCode.createSession: request.request.rewind (); int to = request.request.getInt (); request.txn = new CreateSessionTxn (to); request.request.rewind () / / addSession is called here again, but the previous code has actually been added, and I don't quite understand why zks.sessionTracker.addSession (request.sessionId, to); zks.setOwner (request.sessionId, request.getOwner ()); break / / omit part of the code default: LOG.error ("Invalid OpCode: {} received by PrepRequestProcessor", type);}} this is the end of the server sharing on how to establish a connection with Zookeeper. I hope the above content can be helpful to you and learn more. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.