In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
In this issue, the editor will bring you an analysis of the process of client connection creation in ZooKeeper. The article is rich in content and analyzed and described from a professional point of view. I hope you can get something after reading this article.
Client-side API simply uses 0.1 demo case 1
The simplest demo is as follows:
Public class ZookeeperConstructorSimple implements Watcher {private static CountDownLatch connectedSemaphone=new CountDownLatch (1); public static void main (String [] args) throws IOException {ZooKeeper zooKeeper=new ZooKeeper ("127.0.0.1 args 2181", 5000 new ZookeeperConstructorSimple ()); System.out.println (zooKeeper.getState ()); try {connectedSemaphone.await ();} catch (Exception e) {} System.out.println ("ZooKeeper session established") System.out.println ("sessionId=" + zooKeeper.getSessionId ()); System.out.println ("password=" + zooKeeper.getSessionPasswd ());} @ Override public void process (WatchedEvent event) {System.out.println ("my ZookeeperConstructorSimple watcher Receive watched event:" + event); if (KeeperState.SyncConnected==event.getState ()) {connectedSemaphone.countDown ();}
The maven dependencies used are as follows:
Org.apache.zookeeper zookeeper 3.4.6
For now, ZooKeeper's server-side code and client-side code are still mixed, and it is estimated that they can be changed in the future.
The constructor of the ZooKeeper used consists of three parameters
Server address list for ZooKeeper cluster
Multiple addresses can be filled in, separated by commas. For example, "127.0.0.1virtual 2181127.0.0.1laz2182127.0.0.1", which one is used when connecting to the client? First randomly disturb, and then poll for use, which will be described in detail later.
SessionTimeout
It will eventually lead to three time settings: sessionTimeout, readTimeout, and connectTimeout negotiated with the server.
The server side uses the negotiated sessionTimeout: that is, the client does not send any request to the server after the time has elapsed (normally, the client will send a heartbeat request every other time, and the server will calculate the client's timeout point from the new one), then the server considers session timeout and cleans up the data. At this point, the client's ZooKeeper object no longer works, and a new object needs to be re-new.
The client uses connectTimeout and readTimeout to detect connection timeout and read timeout respectively. Once the client times out, the client thinks the server is unstable and will reconnect to the next server address.
Watcher
As a default Watcher for the ZooKeeper object, it is used to receive event notifications. Such as notification of successful connection with server, notification of disconnection, notification of expiration of Session, and so on.
At the same time, we can see that once the connection with the ZooKeeper server is successfully established, the server-side assigned sessionId and password will be obtained, as follows:
SessionId=94249128002584594password= [B@4de3aaf6
Here is a detailed description of the process of establishing a connection through the source code.
1 the process of establishing a connection for a client 1.1 Overview of the connection process
First, establish a connection with the ZooKeeper server, and there are two layers of connections to be established.
TCP connection between client and server
Establish session association on the basis of TCP connection
After the TCP connection is established, the client sends a ConnectRequest request to apply for the establishment of session association. At this time, the server assigns the client sessionId and password, and enables the detection of whether the session is timed out.
When the TCP connection is disconnected within the sessionTimeout time, that is, before the timeout, the server still thinks that the sessionId is alive. At this point, the client will select the next ZooKeeper server address to establish the TCP connection. After the TCP connection is established, send the ConnectRequest request with the previous sessionId and password. If the timeout of the sessionId is not reached, the automatic reconnection is successful, transparent to the client user, everything is executed silently behind, and the ZooKeeper object is valid.
If the timeout of the sessionId has been reached after the TCP connection is re-established (the server will clean up the data related to the sessionId), the sessionTimeout time returned to the client is 0, the sessionid is 0, and the password is an empty byte array. After receiving the data, the client will determine whether the negotiated sessionTimeout time is less than or equal to 0, and if it is less than or equal to 0, the eventthread thread will first issue a KeeperState.Expired event to notify the corresponding Watcher, and then end the loop of the EventThread thread and begin to end. At this point, the ZooKeeper object is invalid, and you have to re-new a new ZooKeeper object and assign a new sessionId.
1.2 ZooKeeper object
It is user-oriented and provides some operations API.
It has two more important attributes:
ClientCnxn cnxn: responsible for execution of all ZooKeeper node operations
ZKWatchManager watchManager: responsible for maintaining the Watcher registered on a path
For example, create a node operation (synchronization method):
The ZooKeeper object is responsible for creating the Request and handing it over to ClientCnxn for execution, and the ZooKeeper object processes the returned results.
After a request is submitted in synchronous mode, it starts to cycle to determine whether the status of the request packet ends, that is, it is in a blocking state, and once it is finished, it continues to go down and return the result. In asynchronous mode, a request is submitted and returned directly, and the processing logic for the result is included in the callback function. Once the response to the request packet is completed, the callback function is taken out and the corresponding callback method is executed.
The ZooKeeper object mainly encapsulates the user's request and processes the response and other operations. All the execution of user requests is left to ClientCnxn, so let's take a detailed look at the source and general content of ClientCnxn.
Let's take a look at how ClientCnxn comes from:
Step 1: set a default Watcher for ZKWatchManager watchManager
Step 2: give the connection string information to ConnectStringParser for parsing
Connection string such as "192.168.12.1:2181192.168.12.2:2181192.168.12.3:2181/root"
Get two data String chrootPath default follow path and ArrayList serverAddresses, that is, multiple host and port information.
Step 3: create a HostProvider based on the above parsed host and port list results
With the parsing result of ConnectStringParser, why do you need a HostProvider to package it? Mainly to leave room for expansion in the future.
Let's take a look at the detailed API introduction of HostProvider:
HostProvider is mainly responsible for continuously providing available ZooKeeper server addresses, which can be loaded from a url or obtained from other sources. At the same time, for different ZooKeeper clients, give the nearest ZooKeeper server address and so on.
There are three attributes, one of which is the server address list (randomly messed up in the following way):
Collections.shuffle (this.serverAddresses)
The other two attributes are used for tagging. Let's take a look at how StaticHostProvider continuously provides ZooKeeper server addresses:
The code is also very simple, is in the disrupted server address list, constantly traversing, after the end, starting from 0.
What about the spinDelay above?
Normally, currentIndex adds 1 first, and then returns the address of currentIndex+1. When the address is connected successfully, it executes the onConnected method, that is, lastIndex = currentIndex. However, when the connection to the returned currentIndex+1 address is not successful, continue to try the next one, and continue to the next one, you will encounter the case of currentIndex=lastIndex. After polling, no address can be connected. The strategy at this time is to pause and rest before continuing.
Step 4: prepare the parameters for creating the ClientCnxn and create the ClientCnxn.
The first step is to get a ClientCnxnSocket through getClientCnxnSocket (). Let's take a look at what ClientCnxnSocket mainly does:
A ClientCnxnSocket does the lower level communication with a socket implementation. This code has been moved out of ClientCnxn so that a Netty implementation can be provided as an alternative to the NIO socket code.
Dedicated to socket communication, some of the common parts are abstracted and others are left to different implementers. If you can choose the default ClientCnxnSocketNIO, you can also use netty and so on.
First get the system parameter "zookeeper.clientCnxnSocket" and, if not, use the default ClientCnxnSocketNIO, so we can replace the default implementation by specifying this parameter.
When the parameters are ready, how is the ClientCnxn created?
The first thing is to save some object parameters, which are not available for sessionId and sessionPasswd at this time. Then there are two timeout parameters: connectTimeout and readTimeout. In the thread of sending and receiving data in ClientCnxn, the connection timeout and read timeout are constantly detected. Once a timeout occurs, the service is considered to be unstable and the server needs to be replaced, and the next server address will be obtained from HostProvider to connect.
Finally, there are two threads, an event thread, EventThread, and a thread that sends and receives socket data, SendThread.
The event thread EventThread is constantly fetching events from an event queue and processing them.
Take a look at the specific processing process, which is mainly divided into two cases, one is the watch event that we register, and the other is to handle the asynchronous callback function:
You can see that this is the case that triggered our registration of Watch, as well as the asynchronous callback mentioned above.
To understand how EventThread handles events, you need to know how these events come from:
Three methods are provided to add different types of events, such as the SendThread thread, which is called to add events. For event notifications, all Watcher that are concerned about the event are first obtained according to ZKWatchManager watchManager, and then triggered.
Let's take a look at what SendThread does:
SendThread = new SendThread (clientCnxnSocket); pass the clientCnxnSocket passed to ClientCnxn and then pass it to SendThread to serve SendThread.
In SendThread's run method, there is a while loop that constantly does the following:
Task 1: constantly check whether the clientCnxnSocket is connected to the server, and if it is not connected, take a server address from the hostProvider and use clientCnxnSocket to connect.
After a successful connection with the server is established, the ConnectRequest request is sent, and the request is placed in the outgoingQueue request queue waiting to be sent to the server
Task 2: detect timeout: detect read timeout when connected, and connection timeout when unconnected
Once you time out, throw a SessionTimeoutException and see how it is handled.
You can see that once a timeout exception or other exception occurs, it cleans up, sets the connection status to unconnected, and then sends a Disconnected event. At this point, we will enter the process of Task 1.
Task 3: keep sending ping notifications. Every time the server receives a ping request, it will recalculate the session expiration time from the current time. Therefore, when the client continuously sends ping requests at a certain time interval, it can ensure that the session of the client will not expire. The sending interval is as follows:
ClientCnxnSocket.getIdleSend (): is the interval between the last time the packet was sent and the current time. When more than half of the time of readTimeout has passed and no packet has been sent, a Ping transmission is performed. Or if the MAX_SEND_PING_INTERVAL (10s) has not sent a packet in the past, a Ping transmission is performed.
The content sent by ping is only marked by the request header OpCode.ping, and the rest is empty. To send a ping request, you also put the request in the outgoingQueue sending queue, waiting to be executed.
Task 4: perform the IO operation, that is, send the request in the request queue and read the response data on the server side.
The first request is taken from the outgoingQueue request queue, serialized, and then sent using socket.
Read server-side data
There are two types: one is to read the response to the ConnectRequest request, and the other is the other response, not to mention for the time being.
Let's first take a look at the response to the ConnectRequest request:
First, we deserialize to get the ConnectResponse object, and then we can get the sessionId and passwd assigned to our client by the server, as well as the negotiated sessionTimeOut time.
It is preferred to recalculate the readTimeout and connectTimeout values based on the negotiated sessionTimeout time. Then keep and record sessionId and passwd. Finally, a SyncConnected connection success event is sent through EventThread. At this point, the TCP connection and the session initialization request are complete, and the client's ZooKeeper object is ready for normal use.
At this point, we understand the process of establishing a connection between the client and the server.
4 the process of processing the connection on the server side
There are many kinds of server-side situations, let's talk about the simplest stand-alone version for the time being. At the same time, the server-side startup process is no longer given (described in more detail in a later article).
Let's start with an overview of the server side:
The first is the server-side configuration file, with tickTime, minSessionTimeout, maxSessionTimeout-related attributes. By default, tickTime is 3000 Ms, minSessionTimeout is 2 times, tickTime,maxSessionTimeout is 20 times tickTime.
By default, NIOServerCnxnFactory is used on the server side to handle socket. For each client socket request, a NIOServerCnxn is created for that client. The interaction with the client is then left to NIOServerCnxn to handle. For the ConnectRequest request from the client, the processing is as follows:
First deserialize the ConnectRequest
Then start negotiating the sessionTimeout time.
That is to judge whether the sessionTimeout time passed by the user is between minSessionTimeout and maxSessionTimeout. After the negotiation is completed, different processing is carried out according to whether the sessionId passed by the user is 0. The first time the client requests, the sessionId is 0. When the client has connected to a server address and assigned a sessionId, and then if an exception such as a timeout occurs, the client will take the assigned sessionId to connect to the next server address, and the sessionId is not 0. A sessionId of 0 means that you want to create a session. If the sessionId is not 0, you need to check the validity of the sessionId and whether it has expired. Let's first take a look at the situation where sessionId is 0:! [create session] (https://static.oschina.net/uploads/img/201508/01065436_4nHs.png "create session") is generally divided into three steps: 1. Use sessionTracker to create a new session 2 based on sessionTimeout time, create a password based on sessionId 3, submit the request to create session to the request processor chain, and finally return the sessionId and password to the client
Here is a detailed description of these three processes:
4.1 create a session using sessionTracker
SessionTracker is used to create delete session and perform expiration check of session.
Take a look at the SessionTrackerImpl used by default:
Let's first take a look at the attributes of session:
Unique indication of final long sessionId:session
Final int timeout: the timeout time of this session (that is, the timeout time agreed by the client and server above)
Long tickTime: the next timeout of this session (as the client keeps sending PING requests, the time will be refreshed and changed later)
An identifier for boolean isClosing:session to indicate whether session is still in normal use
Object owner: the owner that created the session. Will be used when the client changes the connected server (more on later)
Then take a look at a few pieces of data from SessionTracker:
HashMap sessionsById: it's easy to store session in sessionId
ConcurrentHashMap sessionsWithTimeout: store the timeout time of each session in sessionId
HashMap sessionSets: a collection of session at a point in time (for session expiration checking)
Long nextSessionId: the initial sessionId, and then the sessionId created is self-increasing on this basis
NextExpirationTime: the next expiration time point, a session expiration check will be carried out at that time point.
Cycle of expirationInterval:session expiration check
The contents to be clarified are as follows: 1 the process of creating session 2 the process of session expiration check
Let's take a look at the process of creating a session: the code is simple: create a SessionImpl object, store it in SessionTracker, and start calculating the session timeout. There is a content here is the origin of sessionId, we can see that it is based on nextSessionId, and is constantly increasing.
SessionId is an important label for a client and is globally unique. Let's first take a look at the stand-alone nextSessionId initialization:
The stand-alone server uses 1 to initialize the nextSessionId by calculation. The corresponding id of the cluster version is the sid specified by each machine.
The first step is to take the current time, which is 1010011101110011011001010111001100011 and 41 is binary.
Step 2: long has 64 bits and moves 24 bits to the left, which actually removes the previous 1 and adds the 24-bit 0 after it.
Step 3: the result of the second step may be positive or negative, at present it is positive, and then it may be negative. You can calculate how many years it will take, . In order to ensure that when you move to the right, you need to use the unsigned right shift, that is, >. Unsigned right shift of 8 bits is used here
Step 4: move the passed id here, that is, 1 to the left, 56 bits. Then perform or operate with the positive number result of the third step to get the final benchmark nextSessionId, so when the id value here is not very large, usually just a few machines, it also ensures that sessionId is a positive number, and the first eight digits are the machine's sid number. Therefore, the first eight bits of each machine are different, ensuring that the same sessionId will not be configured in each machine, and the sessionId of each machine is self-increasing operation, so the sessionId in a single machine will not be repeated.
To sum up, the results show that the sessionId is unique and there is no redistribution.
After figuring out the allocation of sessionId, the next step is to figure out how to check the expiration of session:
Let's take a look at how the session activation process is handled:
First get the session data, and then calculate its expiration time
Long expireTime = roundToInterval (System.currentTimeMillis () + timeout)
Private long roundToInterval (long time) {
/ / We give an one interval grace period return (time / expirationInterval + 1) * expirationInterval
}
That is, take the current time plus the timeout time of the session, and then take the whole expirationInterval, that is, it will always be a positive multiple of the expirationInterval, that is, the expiration time of each session will eventually fall on the integer multiple of the expirationInterval.
If the overdue time of the original session is greater than the overdue time calculated by you, no processing is done, otherwise the overdue time of the session is set to the overdue time of the above calculation results.
Take out the expiration time of the original session and delete it from the collection
Retrieve the collection where the current expiration time is located and add it to it
To sum up, the activation of session is to recalculate the timeout and finally take a positive multiple of expirationInterval, then remove it from the collection of previous points in time, and then add it to the collection of new points in time.
At this point, the session check is much more convenient, just take out the collection at the expirationInterval integer point in time and mark it as expired one by one. On the other hand, those session that are constantly activated are constantly switched from the collection of one point in time to the collection of the next point in time.
SessionTrackerImpl is also a thread that performs an expiration check for session.
4.2 create a password based on sessionId
Go back to the three main steps of creating a session:
Let's take a look at how passwords are generated:
Random r = new Random (sessionId ^ superSecret); r.nextBytes (passwd)
Where superSecret is constant
Static final private long superSecret = 0XB3415C00L
Use Random to randomly generate byte arrays. But for this byte array, the contents of the byte array are the same as long as the parameter is the same as sessionId. That is, when we know the sessionId, we can use the above method to calculate the corresponding password, I think the password is basically useless.
Again, when the client connects with a sessionId and a password, the password will be checked.
Read the above code, once again verified that the password is of no use, know the sessionId, you will fully know the password. So this piece needs to be improved, it should not be completely decided by sessionId, such as plus the current time, etc., so that the client can not create a password, while the server stores encrypted passwords.
4.2 submit the request to create the session to the request handler chain
There is too much in this article, so let's briefly describe it here, and then explain it in detail later.
If the session is created successfully, pass sessionTimeout, sessionId, and passwd to the client. If it is not created successfully, the values of the above three are 0d0rem new byte [16]
After that, the process of the client processing the response, as mentioned above, can be looked back.
The above is the analysis of how to create a connection with the client in ZooKeeper shared by the editor. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.