Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement the push-pull mode of message queue by RocketMQ and Kafka

2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How RocketMQ and Kafka implement the push and pull mode of message queue, many novices are not very clear about this. In order to help you solve this problem, the following editor will explain it in detail. People with this need can come and learn. I hope you can get something.

Today let's talk about the push-pull mode of message queuing, which is also a hot spot in the interview. For example, if you write RocketMQ on your resume, you will basically ask you whether RocketMQ uses push mode or pull mode. Is it pull mode? Isn't there a PushConsumer?

Today we're going to talk about the push-pull model and take a look at how RocketMQ and Kafka do it.

Push-pull mode

First of all, let's make it clear which step the push-pull pattern is talking about in message queuing. Generally speaking, when we talk about the push-pull pattern, we are referring to the interaction between Comsumer and Broker.

The default is that the way to push is between Producer and Broker, that is, Producer pushes the message to Broker, rather than Broker actively pulls the message.

Imagine that if you need Broker to pull messages, then Producer must save messages locally in the form of logs to wait for Broker to pull. If there are many producers, then the reliability of messages depends not only on Broker itself, but also on hundreds of Producer.

Broker can also rely on mechanisms such as multiple copies to ensure the reliability of messages, while hundreds of Producer reliability is a bit difficult, so the default Producer is to push messages to Broker.

Therefore, in some cases, it is good to be distributed, and sometimes it is better to be managed centrally.

Push mode

Push mode refers to the push of messages from Broker to Consumer, that is, Consumer passively receives messages, and Broker dominates the sending of messages.

Let's think about the benefits of push mode.

The message is real-time. After receiving the message, Broker can immediately push it to Consumer.

It is easier for consumers to use, ah, just wait, anyway, if any news comes, it will be pushed over.

What are the disadvantages of push mode?

The push rate is difficult to adapt to the consumption rate. The goal of the push mode is to push messages as fast as possible. When the rate at which producers send messages to Broker is greater than the rate at which consumers consume messages, consumers may "burst" over time, because they simply cannot consume. Consumers are stupid when the push rate is too fast, like a DDos attack.

And the consumption rates of different consumers are not the same. As Broker, it is difficult to balance the push rate of each consumer. If you want to achieve an adaptive push rate, you need to tell Broker when you push, "I can't do it. Please slow down. Then Broker needs to maintain the status of each consumer to change the push rate."

This actually increases the complexity of Broker itself.

Therefore, the push mode is difficult to control the push rate according to the state of consumers, and it is suitable for the case of small amount of messages, strong consumption power and high real-time requirements.

Pull mode

Pull mode means that Consumer actively requests Broker to pull messages, that is, Broker passively sends messages to Consumer.

Let's think about the benefits of pull mode.

The initiative of pull mode lies with consumers, who can initiate requests to pull messages according to their own situation. Assuming that the current consumer feels that he can no longer consume, it can stop pulling according to a certain strategy, or pull at intervals.

Broker in pull mode is relatively easy. It only stores messages sent by producers. As for consumption, it is naturally initiated by consumers. Give it a message when you ask for it. Where to start to get the message, and how many consumers will tell it. It is an unemotional tool person, and it doesn't matter if consumers don't come to pick it up.

Pull mode is more suitable for batch sending of messages. Based on push mode, you can push a message, or cache some messages and then push. However, when pushing, you don't know whether consumers can handle so many messages at once. The pull mode is more reasonable, which can refer to the information requested by consumers to determine how many messages are cached and then sent in batches.

What are the disadvantages of pull mode?

Message delay, after all, it is the consumer to pull the message, but how does the consumer know that the message has arrived? So it can only pull constantly, but it can't request it very frequently, and if it happens too often, it becomes that consumers are attacking Broker. Therefore, you need to reduce the frequency of requests, such as every 2 seconds, and you are likely to be delayed by 2 seconds when you look at the message.

The message is busy with the request, for example, the message does not arrive until a few hours apart, then the consumer's request is invalid within a few hours and is doing useless work.

Is that push or pull?

We can see that push mode and pull mode have their own advantages and disadvantages, so how to choose?

Both RocketMQ and Kafka choose pull mode, and of course there are push-based message queues such as ActiveMQ in the industry.

Personally, I think the pull mode is more appropriate, because today's message queues have the need to persist messages, that is to say, it has a storage function, and its mission is to accept messages and save good messages so that consumers can consume messages.

There are all kinds of consumers, and as Broker should not have the tendency to rely on consumers, I have saved the good news for you, so you can come and get it.

Although generally speaking, Broker will not become a bottleneck, because consumer-side business consumption is relatively slow, but Broker is a central point after all, as light as possible.

So RocketMQ and Kafka both choose pull mode, so they are not afraid of the disadvantages of pull mode? Afraid, so they operate for a wave, reducing the shortcomings of the pull mode.

Long polling

Both RocketMQ and Kafka use "long polling" to implement pull patterns, so let's take a look at how they work.

In order to simplify, below, I will describe the number of messages that do not meet this pull, ah, the total size, and so on, as there is no message yet, anyway, they all do not meet the conditions.

Long polling in RocketMQ

PushConsumer in RocketMQ is actually a way of wearing a push mode and actually a pull mode, but it just looks like a push mode.

Because RocketMQ secretly went to Broker to request data for us behind his back.

There will be a RebalanceService thread in the background, which will load balance according to the number of queues in topic and the number of consumers in the current consumer group, and the pullRequest generated by each queue will be put into the blocking queue pullRequestQueue. Then another PullMessageService thread constantly fetches the pullRequest from the blocking queue pullRequestQueue and then requests broker over the network to pull messages in quasi-real time.

I'm not going to cut this part of the code, that's what it is, which will be shown in a diagram later.

Then the processRequest method in Broker's PullMessageProcessor is used to handle the pull message request. If there is a message, it will be returned directly. What if there is no message? Let's take a look at the code.

Let's take a look at what the suspendPullRequest method does.

The thread PullRequestHoldService will take the PullRequest request from pullRequestTable every 5 seconds, and then check whether the offset of the message request to be pulled is less than the maximum offset of the current consumption queue. If the condition is true, it will call notifyMessageArriving, and finally call the executeRequestWhenWakeup () method of PullMessageProcessor to retry to process the request for this message, that is, again, the whole long polling time defaults to 30 seconds.

To put it simply, it's time to check the message once every 5 seconds, and if it does, call processRequest to process it again. This doesn't seem to be real-time, huh? Five seconds?

Don't worry, there is also a ReputMessageService thread that constantly parses data from commitLog and distributes requests, builds two types of data, ConsumeQueue and IndexFile, and also has a wake-up request operation to make up for such a slow delay every 5 seconds.

I won't cut the code, that is, the message will be written and pullRequestHoldService#notifyMessageArriving will be called.

Finally, I will draw a picture to describe the whole process.

Long polling in Kafka

For example, Kafka has parameters in the pull request, which can cause the consumer request to block waiting in the "long poll".

To put it simply, the consumer goes to Broker to pull the message, which defines a timeout, that is, the consumer requests the message, and returns the message immediately if there is any. If not, the consumer waits until the message times out, and then initiates the pull message request again.

And Broker also has to cooperate. If the consumer requests, a message will be returned immediately. If there is no message, then a delayed operation will be established, and then return when the conditions are met.

Let's take a brief look at the source code. In order to highlight the point, I will delete some of the code.

Let's take a look at the consumer-side code first.

The above poll interface must be familiar to all of you. In fact, we know directly from the comments that we are really waiting for the arrival of data or timeout. Let's simply move on.

Let's take a look at what client.poll calls in the end.

The last call is the selector wrapped by Kafka, and eventually Java nio's select (timeout) is called.

Now that the consumer code is clear, let's take a look at how Broker does it.

The entry that Broker handles all requests actually I introduced in the previous article, under the handle method of the KafkaApis.scala file, this time the protagonist is handleFetchRequest.

When this method comes in, I intercept the most important part.

The following picture is the internal implementation of the fetchMessages method, the comments given by the source code have been very clear, you can enlarge the picture and take a look.

It's interesting to get this purgatory name, simply using the time wheel mentioned in my previous article to perform scheduled tasks, such as delayedFetchPurgatory, which is designed to handle delayed pull operations.

First of all, let's briefly think about what methods need to be implemented for this deferred operation. first, the constructed deferred operation needs to have a check mechanism to see if the message has arrived, and then there has to be a method to be executed after the message arrives. there also needs to be a method of what to do after execution, and of course there has to be a method of what to do after a timeout.

These methods actually correspond to the DelayedFetch in the code, which inherits the following internal DelayedOperation:

The method of checking whether the condition is satisfied by isCompleted

Methods to be executed after the tryComplete condition is met

The method that is called after the execution of onComplete

Methods to be executed after onExpiration expires

The judgment of whether it expires or not is driven by the turn of time, but you can't wait until it expires to see if the news has arrived, right?

Here, the mechanism of Kafka is the same as that of RocketMQ, which will also remind these delayed request messages when they are written. I will not post the specific code. You can see it by going deeper into the ReplicaManager#appendRecords method.

However, although the code is not posted, the picture still needs to be drawn.

Both RocketMQ and Kafka adopt the mechanism of "long polling". The specific approach is that consumers wait for messages. When there is a message, Broker will directly return the message. If there is no message, it will adopt the strategy of delayed processing. In order to ensure the timeliness of the message, it will remind the message when a new message arrives in the corresponding queue or partition and return the message in a timely manner.

In a word, consumers and Broker cooperate with each other. When the request for pulling a message does not meet the conditions, hold resides, avoiding frequent pull actions and reminding you to return as soon as the message arrives.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report