In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces the relevant knowledge of "what is the transport startup process of communication between elasticsearch nodes". The editor shows you the operation process through an actual case, and the operation method is simple, fast and practical. I hope this article "what is the transport startup process of communication between elasticsearch nodes" can help you solve the problem.
Transport
As the name implies, transport is the basic channel of cluster communication. Both cluster status information and search index request information are transmitted through transport. Elasticsearch defines all the basic interfaces required by tansport,tansportmessage,tansportchannel,tansportrequest,tansportresponse, etc. Here, we will focus on transport, and other APIs will be introduced along with the analysis. First, take a look at the definition of the transport node, as shown in the following figure:
NettyTransport implements this interface. Before analyzing NettyTransport, let's briefly talk about the use of Netty. The use of Netty requires three modules, ServerBootStrap,ClientBootStrap (v3.x) and MessageHandler. ServerBootStrap starts the server, ClientBootStrap starts the client and connects to the server, and MessageHandler is where the message processing logic, that is, business logic. For other details, please refer to the Netty official documentation.
Start serverBootStrap
NettyTransport each starts serverBootStrap, and ClientBootStrap in the doStart () method, and binds ip, as shown in the following code:
Protected void doStart () throws ElasticsearchException {clientBootstrap = createClientBootstrap (); / / launch the client according to the configuration. / / omit the extraneous code createServerBootstrap (name, mergedSettings); / / start the server bindServerBootstrap (name, mergedSettings); / / bind ip}
Each node needs to send and receive, so both need to be started. The startup of client and server are respectively in the corresponding methods. The startup process is the startup process of netty. If you are interested, you can see the corresponding methods. BindServerBootstrap (name, mergedSettings) will bind the local ip and unbind to the netty while setting the export host (I also understand the specific jobs of export host and do not see the relevant binding, which needs further study).
Messagehandler is injected into the channelpipeline during the startup of client and server. At this point, the startup process is complete, but client does not connect any server, and the connection process does not connect to other nodes until the node is started.
How to connect to node
The method code is as follows:
The module of public void connectToNode (DiscoveryNode node, boolean light) {/ / transport must start if (! lifecycle.started ()) {throw new ElasticsearchIllegalStateException ("can't add nodes to a stopped transport");} / / acquire the read lock, and each node can establish a connection with multiple nodes, so the read lock globalLock.readLock () .lock () is used here. Try {/ / acquires a lock based on node.id, which ensures that a connection connectionLock.acquire (node.id ()) can be established only once per node; try {if (! lifecycle.started ()) {throw new ElasticsearchIllegalStateException ("can't add nodes to a stopped transport") } NodeChannels nodeChannels = connectedNodes.get (node); if (nodeChannels! = null) {return } try {if (light) {/ / the light here means that only one channel is obtained for this node, and all types (described below in the five connection types) consume a channel nodeChannels = connectToChannelsLight (node) } else {nodeChannels = new NodeChannels (new Channel [connectionsPerNodeRecovery], new Channel [connectionsPerNodeBulk], new Channel [connectionsPerNodeReg], new Channel [connectionsPerNodeState], new Channel [connectionsPerNodePing]); try {connectToChannels (nodeChannels, node) } catch (Throwable e) {logger.trace ("failed to connect to [{}], cleaning dangling connections", e, node); nodeChannels.close (); throw e }} / / we acquire a connection lock, so no way there is an existing connection connectedNodes.put (node, nodeChannels); if (logger.isDebugEnabled ()) {logger.debug ("connected to node [{}]", node) } transportServiceAdapter.raiseNodeConnected (node);} catch (ConnectTransportException e) {throw e;} catch (Exception e) {throw new ConnectTransportException (node, "general node connection failure", e) } finally {connectionLock.release (node.id ());}} finally {globalLock.readLock (). Unlock ();}}
If it is not a light connection, there are 5 connections between each server and clien, and the 5 connections undertake different tasks.
The code of connection method protected void connectToChannels (NodeChannels nodeChannels, DiscoveryNode node) {/ / five connection methods. Different connection methods correspond to different cluster operations ChannelFuture [] connectRecovery = new ChannelFuture [nodeChannels.recovery.length]; ChannelFuture [] connectBulk = new ChannelFuture [nodeChannels.bulk.length]; ChannelFuture [] connectReg = new ChannelFuture [nodeChannels.reg.length]; ChannelFuture [] connectState = new ChannelFuture [nodeChannels.state.length] ChannelFuture [] connectPing = new ChannelFuture [nodeChannels.ping.length]; InetSocketAddress address = ((InetSocketTransportAddress) node.address ()) .address (); / / attempt to establish a connection for (int I = 0; I
< connectRecovery.length; i++) { connectRecovery[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectBulk.length; i++) { connectBulk[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectReg.length; i++) { connectReg[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectState.length; i++) { connectState[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectPing.length; i++) { connectPing[i] = clientBootstrap.connect(address); } //获取每个连接的channel存入到相应的channels中便于后面使用。 try { for (int i = 0; i < connectRecovery.length; i++) { connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectRecovery[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause()); } nodeChannels.recovery[i] = connectRecovery[i].getChannel(); nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectBulk.length; i++) { connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectBulk[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause()); } nodeChannels.bulk[i] = connectBulk[i].getChannel(); nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectReg.length; i++) { connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectReg[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause()); } nodeChannels.reg[i] = connectReg[i].getChannel(); nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectState.length; i++) { connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectState[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause()); } nodeChannels.state[i] = connectState[i].getChannel(); nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectPing.length; i++) { connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectPing[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause()); } nodeChannels.ping[i] = connectPing[i].getChannel(); nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } if (nodeChannels.recovery.length == 0) { if (nodeChannels.bulk.length >0) {nodeChannels.recovery = nodeChannels.bulk;} else {nodeChannels.recovery = nodeChannels.reg;}} if (nodeChannels.bulk.length = = 0) {nodeChannels.bulk = nodeChannels.reg } catch (RuntimeException e) {/ / clean the futures for (ChannelFuture future: ImmutableList.builder (). Add (connectRecovery) .add (connectBulk) .add (connectReg) .add (connectState) .add (connectPing) .build ()) {future.cancel () If (future.getChannel ()! = null & & future.getChannel () .isOpen ()) {try {future.getChannel () .close () } catch (Exception E1) {/ / ignore} throw e;}} this is the introduction of "what is the transport startup process of communication between elasticsearch nodes". Thank you for reading. If you want to know more about the industry, you can follow the industry information channel. The editor will update different knowledge points for you every day.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.