Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to briefly talk about the NIO Network Communication Model in Kafka

2025-01-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article will explain in detail how to briefly talk about the NIO network communication model in Kafka. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

Abstract: many people like to compare RocketMQ with Kafka. In fact, the network communication layers of these two message queues are similar. The editor will briefly introduce Kafka's NIO network communication model.

The following is mainly through the analysis of the Kafka source code to describe the multithreaded network communication model and the overall framework of its Reactor, and briefly introduces the design and implementation of the Kafka network communication layer.

I. Overview of the overall framework of the Kafka network communication model

Kafka's network communication model is based on NIO's Reactor multithreading model. Here is a quote from the comments in the Kafka source code:

An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.

I believe that after reading the above citation notes, you can roughly understand the network communication layer model of Kafka, which mainly uses 1 (1 acceptor thread) + N (N process threads) + M (M business processing threads). The following table is briefly listed below (here we will briefly take a look at the following and explain it in more detail):

Thread number thread name thread specifies the 1kafka-socket-acceptor_%xAcceptor thread, is responsible for listening to the request Nkafka-network-thread_%dProcessor thread initiated by the client, is responsible for reading and writing the Socket Mkafka-request-handler-_% dWorker thread, handles the specific business logic and generates the Response return

The complete framework diagram of the Kafka network communication layer is shown in the following figure:

Communication layer Model of Kafka message queue-1+N+M Model. PNG

At first, you may not quite understand the above frame diagram, but it doesn't matter. Here you can have a general understanding of the network communication layer framework structure of Kafka. The above process will be described in detail later in this article in combination with some of the important source code of Kafka. Here you can briefly summarize several important concepts in its network communication model:

(1), Acceptor: a receiving thread is responsible for listening for new connection requests, registering OP_ACCEPT events, and handing over the new connection to the corresponding Processor thread for processing in "round robin" mode.

(2), Processor: n processor threads, each of which has its own selector, which registers the corresponding OP_READ event to the SocketChannel assigned by Acceptor. The size of N is determined by "num.networker.threads".

(3), KafkaRequestHandler: M request processing threads, including thread pool-KafkaRequestHandlerPool. Request data is obtained from RequestChannel's global request queue-requestQueue and handed over to KafkaApis for processing. The size of M is determined by "num.io.threads".

(4), RequestChannel: it is the request channel of the Kafka server, and the data structure contains a global request queue requestQueue and a number of response queues responseQueue corresponding to the Processor processor, which provides a place for Processor to exchange data with the request processing threads KafkaRequestHandler and KafkaApis.

(5), NetworkClient: the underlying layer is the corresponding encapsulation of Java NIO, which is located in the network interface layer of Kafka. Kafka message producer object-the send method of KafkaProducer mainly calls NetworkClient to complete message sending

(6), SocketServer: a NIO service that starts an Acceptor receive thread and multiple Processor processor threads at the same time. Provides a typical Reactor multithreading mode that separates receiving client requests from processing requests

(7), KafkaServer: represents an instance of Kafka Broker; its startup method is the entry to start the instance.

(8), KafkaApis: Kafka's business logic handles Api and handles different types of requests, such as "sending messages", "getting message offsets-offset" and "handling heartbeat requests".

2. Design and implementation of Kafka network communication layer.

This section will analyze its design and implementation with the source code of the Kafka network communication layer. Here, several important elements of the network communication layer-SocketServer, Acceptor, Processor, RequestChannel and KafkaRequestHandler are introduced in detail. The source code analyzed in this article is based on version 0.11.0 of Kafka.

1 、 SocketServer

SocketServer is the core class that receives the client Socket request connection, processes the request and returns the processing result. The initialization and processing logic of Acceptor and Processor are implemented here. When the KafkaServer instance starts, the initialization method of its startup is called, and one Acceptor and N Processor threads are initialized (each EndPoint is initialized. Generally speaking, only one port is set for a Server). The implementation is as follows:

Def startup () {this.synchronized {connectionQuotas = new ConnectionQuotas (maxConnectionsPerIp MaxConnectionsPerIpOverrides) val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId var processorBeginIndex = 0 / / A broker generally sets only one port config.listeners.foreach {endpoint = > val listenerName = endpoint.listenerNameval securityProtocol = processorBeginIndex + numProcessorThreads / / N processor for (I 0) {val keys = nioSelector.selectedKeys () val iter = keys.iterator () while (iter.hasNext & isRunning) {try {val key = iter.next iter.remove () if (key. IsAcceptable) / / if the event occurs, call the accept method to handle accept for the OP_ACCEPT event (key Processors (currentProcessor) else throw new IllegalStateException ("Unrecognized key state for acceptor thread.") / polling algorithm / / round robin to the next processor thread currentProcessor = (currentProcessor + 1)% processors.length} catch {case e: Throwable = > error ("Error while accepting connection", e)} / / Code omission} def accept (key: SelectionKey Processor: Processor) {val serverSocketChannel = key.channel (). AsInstanceOf [ServerSocketChannel] val socketChannel = serverSocketChannel.accept () try {connectionQuotas.inc (socketChannel.socket (). GetInetAddress) socketChannel.configureBlocking (false) socketChannel.socket (). SetTcpNoDelay (true) socketChannel.socket (). SetKeepAlive (true) if (sendBufferSize! = Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket (). SetSendBufferSize (sendBufferSize) processor.accept (socketChannel)} catch {/ / omit some code}} def accept (socketChannel: SocketChannel) {newConnections.add (socketChannel) wakeup ()}

As you can see in the source code above, after the Acceptor thread starts, the OP_ACCEPT event is first registered with the server-side socket object-ServerSocketChannel, which is used to listen on the port. Then wait for the event of concern to occur in a poll. If this event occurs, the accept () method is called to handle the OP_ACCEPT event. Here, the Processor is selected through the round robin method, which ensures that the load of the subsequent Processor threads is basically uniform.

The main purpose of the accept () method of Acceptor is as follows:

(1) obtain the corresponding serverSocketChannel instance through SelectionKey, and call its accept () method to establish a connection with the client.

(2) call the connectionQuotas.inc () method to increase the connection statistics count, and set the socketChannel properties returned in step (1) (such as sendBufferSize, KeepAlive, TcpNoDelay, configureBlocking, etc.)

(3) the socketChannel is handed over to the processor.accept () method for processing. The main thing here is to add socketChannel to the concurrent queue newConnections queue of the Processor processor, and then wake up the processor thread to get the socketChannel from the queue and process it. Where newConnections is accessed concurrently by acceptor threads and Processor threads, so newConnections is a ConcurrentLinkedQueue queue (an unbounded thread safety queue based on linked nodes)

3 、 Processor

Processor, like Acceptor, is a threaded class that inherits the abstract class AbstractServerThread. It mainly reads the data from the client's request and returns the response result after KafkaRequestHandler processing to the client. The main focus in this thread class is on the following important variables:

(1), newConnections: as mentioned in the Acceptor section above, it is a queue of type ConcurrentLinkedQueue [socket Channel] that holds the socketChannel for new connections to be processed by Processor

(2), inflightResponses: is a collection of type Map [String, RequestChannel.Response] to record responses that have not yet been sent

(3), selector: is a variable of type KSelector used to manage network connections

The following is the flow chart for the execution of the run method of the Processor processor thread:

Processing flowchart of Kafk_Processor thread. PNG

From the above flowchart, you can see that the Processor processor thread mainly completes the following steps in its main process:

(1) to process socketChannel in the newConnections queue. Iterate through each socketChannel in the fetch queue and register the OP_READ event on the selector

(2) process the Response in the response queue corresponding to the current Processor in the RequestChannel. In this step, the judgment is made according to the type of responseAction (NoOpAction/SendAction/CloseConnectionAction). If "NoOpAction", the request corresponding to the connection does not need to respond; if "SendAction", the Response needs to be sent to the client, the OP_WRITE event will be registered through "selector.send", and the Response will be moved from the responseQueue response queue to the inflightResponses collection; "CloseConnectionAction" means that the connection is to be closed.

(3), call the selector.poll () method for processing. The bottom layer of this method is to call the nioSelector.select () method for processing.

(4) to process the accepted completed packet queue-completedReceives. Call the "requestChannel.sendRequest" method in the processCompletedReceives method to add the request Request to requestChannel's global request queue-requestQueue, waiting for KafkaRequestHandler to process. At the same time, call the "selector.mute" method to cancel the OP_READ event on the connection channel corresponding to the request

(5) to process the queue that has been sent-completedSends. When you have finished sending the response to the client, remove it from the inflightResponses and re-register the OP_READ event for the corresponding connection channel by calling the "selector.unmute" method

(6) to deal with disconnected queues. Remove the response from the inflightResponses collection and subtract the connectionQuotas count by 1

4 、 RequestChannel

In the network communication layer of Kafka, RequestChannel provides a data buffer for data exchange between Processor processor threads and KafkaRequestHandler threads, which is the place where Request and Response are cached during communication. Therefore, its function is to play the role of a data buffer queue in communication. The Processor thread adds the read request to RequestChannel's global request queue-requestQueue; the KafkaRequestHandler thread acquires and processes it from the request queue, then adds Response to RequestChannel's response queue-responseQueue, and wakes up the corresponding Processor thread through responseListeners. Finally, the Processor thread takes it out of the response queue and sends it to the client.

5 、 KafkaRequestHandler

KafkaRequestHandler is also a thread class that instantiates a thread pool-KafkaRequestHandlerPool object (containing several KafkaRequestHandler threads) when the KafkaServer instance is started, which runs in the background as a daemon thread. In the run method of KafkaRequestHandler, the request is read in a loop from RequestChannel, and then handed over to KafkaApis for specific processing.

6 、 KafkaApis

KafkaApis is a central forwarding component used to process requests for business messages transmitted from the communication network. This component reflects what services Kafka Broker Server can provide.

On how to briefly talk about Kafka in the NIO network communication model to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report