Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does Kafka Consumer understand

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

In this issue, the editor will bring you about how to understand Kafka Consumer. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

Kafka consumer concept

The application uses KafkaConsumer to subscribe to topics from Kafka and receive messages from those topics before saving them. The application first needs to create a KafkaConsumer object, subscribe to the topic and start accepting messages, validate the messages, and save the results. What should I do when the producer writes to the topic faster than the application validates the data after a period of time? If only a single consumer is used, the application will not be able to keep up with the speed of message generation, just as multiple producers write messages on the same topic, which requires multiple consumers to participate in consuming messages in the topic. split the message.

Kafka consumers belong to consumer groups. Consumers in a group subscribe to the same topic, and each consumer receives messages that are partitioned in part of the topic. The following is a schematic diagram of Kafka partition consumption

The topic T1 in the figure above has four partitions, partition 0, partition 1, partition 2, and partition 3. We create a consumer group 1, and there is only one consumer in the consumer group, which subscribes to topic T1 and receives all the messages in T1. Because one consumer handles messages sent by four producers to the partition, there is a lot of pressure and the need for help to share the task, so it evolves into the following figure

In this way, the spending power of consumers has been greatly improved, but in some environments, such as when users generate a lot of information, the information generated by producers is still too much for consumers, so continue to increase consumers.

As shown in the figure above, the messages generated by each partition can be consumed by consumers in each consumer group. If more consumers are added to the consumer group, the extra consumers will be idle, as shown in the following figure.

Adding consumers to the group is the main way to scale consumption power horizontally. All in all, we can expand horizontally to increase spending power by increasing the number of consumers in the consumer group. This is why it is recommended to use a large number of partitions when creating themes, so that you can increase consumers to improve performance when the consumption load is high. In addition, the number of consumers should not be more than the number of sub-regions, because the extra consumers are idle and do not help.

A very important feature of Kafka is that it only needs to be written once to support any number of applications to read the message. In other words, every application can read all the messages. In order for each application to read all messages, the application needs to have different consumer groups. For the above example, if we add a new consumer group G2 and this consumer group has two consumers, it will evolve to look like this

In this scenario, consumer group G1 and consumer group G2 both receive full messages on T1 topics, which logically belong to different applications.

To sum up, if the application needs to read all the messages, set up a consumer group for the application; if the application does not have enough spending power, consider adding consumers to this consumer group.

Consumer group and divisional rebalancing

What is the consumer group?

A consumer group (Consumer Group) is a scalable and fault-tolerant mechanism consisting of one or more consumer instances (Consumer Instance). Consumers in the consumer group share a consumer group ID, which is also called Group ID. Consumers in the group subscribe to and consume a topic together. Consumers in the same group can only consume messages in one section, and redundant consumers will be idle and useless.

We mentioned two ways of consumption above.

A consumer group consumes messages in a topic, which is also known as peer-to-peer consumption, and peer-to-peer consumption is also known as message queue.

Messages in a topic are consumed by multiple consumer groups, which is also called publish-subscribe model.

Consumer rebalancing

We can know this process from the consumer evolution diagram above: at first, a consumer subscribes to a topic and consumes the message of all its partitions, and then a consumer joins the group, and then more consumers join the group, and the new consumer instance apportion part of the message of the original consumer. This transfer of ownership of a partition from one consumer to another is called rebalancing, also known as Rebalance. As shown in the following figure

Rebalancing is very important, it brings high availability and scalability to consumer groups, and we can safely add or remove consumers, but under normal circumstances we don't want this to happen. During rebalancing, consumers cannot read messages, making the entire consumer group unavailable during rebalancing. In addition, when the partition is reassigned to another consumer, the message's current read state is lost, and it may need to flush the cache, slowing down the application before it resumes state.

Consumers maintain their membership of the consumer group and confirm the partition they own by sending a heartbeat to the organizer (Kafka Broker). For different consumer groups, their organizers and coordinators can be different. As long as the consumer sends a heartbeat on a regular basis, it will assume that the consumer is alive and process the messages in their partition. A heartbeat is sent when a consumer retrieves a record or submits a record it consumes.

If the Kafka stops sending heartbeats after a period of time, the Session expires and the organizer assumes that the Consumer is dead, triggering a rebalance. If the consumer goes down and stops sending messages, the organizer waits a few seconds to confirm that it is dead before rebalancing is triggered. During this time, dead consumers will not process any messages. When cleaning up the consumer, the consumer will notify the coordinator that it is leaving the group, and the organizer will trigger a rebalance to minimize the processing pause.

Rebalancing is a double-edged sword that brings high availability and scalability to consumer groups, but also has some obvious disadvantages (bug), which the community still can't modify.

The process of rebalancing has a great impact on the consumer group. Because every rebalancing process causes everything to come to a standstill, refer to the garbage collection mechanism in JVM, that is, Stop The World, STW, (quoted from p76's description of the Java collector in "understanding the Serial virtual machine"):

More importantly, when it does garbage collection, it must pause all other worker threads. Until it's collected. The name Stop The World sounds cool, but in fact, the work is initiated and done automatically by the virtual machine in the background, stopping all the user's working threads when the user is invisible, which is unacceptable for many applications.

In other words, during the rebalancing period, the consumer instances in the consumer group will stop spending and wait for the rebalancing to be completed. And the process of rebalancing is slow.

Create consumers

The above theory says a lot. Let's explain how consumers spend through the code.

Before reading the message, you need to create a KafkaConsumer object. Creating a KafkaConsumer object is very similar to creating a KafkaProducer object-put the properties that need to be passed to the consumer in the properties object. Later, we will focus on some configuration of Kafka. Here we simply create it and use three attributes, namely bootstrap.server,key.deserializer,value.deserializer.

We have used these three attributes many times, if you are not very clear, you can refer to take you up posture to get to know Kafka Producer

Another attribute is group.id, which is not required and specifies which consumer group KafkaConsumer belongs to. It is also possible to create consumers who do not belong to any group.

Properties properties = new Properties (); properties.put ("bootstrap.server", "192.168.1.9 bootstrap.server"); properties.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaConsumer consumer = new KafkaConsumer (properties)

Topic subscription

Once the consumer is created, the next step is to subscribe to the topic. The subscribe () method takes a list of topics as a parameter, which is easy to use.

Consumer.subscribe (Collections.singletonList ("customerTopic"))

For simplicity, we subscribe to only one topic customerTopic, the parameter is passed in a regular expression, and the regular expression can match multiple topics. If someone creates a new topic and the name of the topic matches the regular expression, then a rebalance is immediately triggered and the consumer can read the new topic.

To subscribe to all test related topics, you can do this

Consumer.subscribe ("test.*")

Polling

We know that Kafka supports the subscription / publish model, and producers send data to Kafka Broker, so how do consumers know that producers send data? In fact, consumers do not know the data generated by producers. KafkaConsumer uses polling to retrieve data in Kafka Broker periodically. If there is any data, it will be used for consumption. If not, it will continue to poll and wait. The following is the specific implementation of polling waiting.

Try {while (true) {ConsumerRecords records = consumer.poll (Duration.ofSeconds); for (ConsumerRecord record: records) {int updateCount = 1; if (map.containsKey (record.value () {updateCount = (int) map.get (record.value () + 1);} map.put (record.value (), updateCount);}} finally {consumer.close () }

This is an infinite cycle. The consumer is actually a long-running application that requests data from Kafka by polling.

The third line of code is very important that the Kafka must periodically cycle through the request data, otherwise the Consumer will be assumed to be dead, rebalancing will be triggered, and its partition will be handed over to other consumers in the group. The poll () method is passed a supermarket time, represented by the java.time.Duration class, and if this parameter is set to 0, the poll () method will return immediately, otherwise it will wait for broker to return data within the specified number of milliseconds.

The poll () method returns a list of records. Each record contains information about the topic to which the record belongs, the information about the partition in which the record is located, the offset recorded in the partition, and the key-value pair of the record. We typically iterate through this list, processing each record one by one.

Use the close () method to close the consumer before exiting the application. The network connection and socket are also shut down and a rebalance is triggered immediately, rather than waiting for the group coordinator to discover that it is no longer sending a heartbeat and assume that it is dead.

Thread safety

In the same group, we cannot have a thread run multiple consumers, nor can multiple threads safely share a consumer. According to the rule, a consumer uses one thread, and if multiple consumers in a consumer group want to run, then each consumer must be allowed to run in its own thread, and multiple consumers can be started to process it using ExecutorService in Java.

Consumer configuration

So far, we have learned how to use the consumer API, but only a few of the most basic properties are introduced, and the Kafka document lists all the consumer-related configuration instructions. Most parameters have reasonable default values and generally do not need to be modified. Let's take a look at these parameters.

Fetch.min.bytes

This property specifies the minimum number of bytes that the consumer gets the record from the server. When broker receives a data request from a consumer, if the amount of data available is less than the size specified by fetch.min.bytes, it will wait until enough data is available before returning it to the consumer. This reduces the workload on consumers and broker, because they don't have to process messages back and forth when topics are not used very frequently. If you don't have a lot of data available, but consumers have high CPU usage, you need to set the value of this property to be higher than the default value. If there are a large number of consumers, increasing the value of this attribute can reduce the workload of broker.

Fetch.max.wait.ms

We tell Kafka through the fetch.min.bytes above that we won't return it to the consumer until there is enough data. Fetch.max.wait.ms, on the other hand, is used to specify the wait time for broker, which defaults to 500ms. If there is not enough data to flow into kafka, the minimum amount of data obtained by consumers will not be met, resulting in a delay of 500ms. If you want to reduce the potential delay, you can set the parameter value to be smaller. If fetch.max.wait.ms is set to a delay of 100ms and the value of fetch.min.bytes is set to 1MB, then Kafka either returns the data of 1MB or returns all available data after 100 ms after receiving the consumer request. It depends on which condition is met first.

Max.partition.fetch.bytes

This property specifies the maximum number of bytes that the server returns to the consumer from each partition. Its default value is 1MB, that is, the record returned by the KafkaConsumer.poll () method from each partition is no more than the bytes specified by max.partition.fetch.bytes. If a topic has 20 partitions and 5 consumers, each consumer needs at least 4 MB of available memory to receive records. When allocating memory to consumers, you can allocate them more, because if a consumer in the group crashes, the remaining consumers need to deal with more partitions. The value of max.partition.fetch.bytes must be greater than the maximum number of bytes of messages that broker can receive (configured through the max.message.size property), otherwise the consumer may not be able to read these messages, causing the consumer to hang and retry all the time. Another consideration when setting this property is the time it takes the consumer to process the data. Consumers need to call the poll () method frequently to avoid session expiration and partition rebalancing. If too much data is returned by a single call to poll (), consumers need more time to process, and may not be able to conduct the next poll in time to avoid session expiration. If this happens, you can reduce the max.partition.fetch.bytes value or extend the session expiration time.

Session.timeout.ms

This property specifies the time that the consumer can disconnect from the server before it is considered dead, which defaults to 3s. If the consumer does not send a heartbeat to the group coordinator within the time specified by session.timeout.ms, it will be considered dead and the coordinator will trigger rebalancing. Assign its partition to other consumers in the consumer group, which is closely related to heartbeat.interval.ms. Heartbeat.interval.ms specifies the frequency at which the poll () method sends the heartbeat to the group coordinator, while session.timeout.ms specifies how long the consumer can not send the heartbeat. Therefore, these two properties generally need to be modified at the same time, and heartbeat.interval.ms must be smaller than session.timeout.ms, which is usually 1/3 of session.timeout.ms. If session.timeout.ms is 3s, then heartbeat.interval.ms should be 1s. Setting the session.timeout.ms value lower than the default value can detect and recover angry nodes more quickly, but long polling or garbage collection can lead to unexpected rebalancing. Setting the value of this property higher can reduce unexpected rebalancing, but it takes longer to detect node crashes.

Auto.offset.reset

This property specifies what to do if the consumer reads a partition without an offset or if the offset is invalid. Its default value is latest, which means that if the offset is invalid, the consumer will start reading the data from the latest record. The other value is earliest, which means that if the offset is invalid, the consumer will start reading the record of the partition from the starting position.

Enable.auto.commit

We will introduce several different ways to submit offsets later. This attribute specifies whether the consumer automatically commits the offset, and the default value is true. To avoid duplicate data and data loss, you can set it to false and control when the offset is submitted. If you set it to true, you can also control the frequency of submissions through the auto.commit.interval.ms property

Partition.assignment.strategy

We know that partitions are assigned to consumers in the group. PartitionAssignor decides which partitions should be assigned to which consumers based on a given consumer and topic. Kafka has two default allocation policies, Range and RoundRobin.

Client.id

This attribute can be any string that broker uses to identify messages sent from the client and is commonly used in logs, metrics, and quotas

Max.poll.records

This property is used to control the number of records that can be returned by a single call to the call () method, and can help you control the amount of data that needs to be processed in the poll.

Receive.buffer.bytes and send.buffer.bytes

The TCP buffer that socket uses to read and write data can also be sized. If they are set to-1, the operating system defaults are used. If the producer or consumer is in a different data center from broker, you can increase these values appropriately, because networks across data centers generally have higher latency and lower bandwidth.

The concept of submission and offset

Special offset

As we mentioned above, each time a consumer calls the poll () method for a scheduled poll, it returns records written to Kafka by the producer but not yet consumed by the consumer, so we can track which records are read by which consumers in the group. Consumers can use Kafka to track the location (offset) of messages in the partition

The consumer will send a message to a special topic called _ consumer_offset, which will save the partition offset in each message sent. The main function of this topic is to record the offset after the consumer triggers the rebalance. Every time the consumer sends a message to this topic, normally the rebalance is not triggered. This topic does not work. When the rebalance is triggered, the consumer stops working. Each consumer may be assigned a corresponding partition, and this topic is set so that consumers can continue to process messages.

If the submitted offset is less than the last offset processed by the client, the message between the two offsets will be reprocessed

If the submitted offset is greater than the offset at the last consumption, the message between the two offsets will be lost

Since _ consumer_offset is so important, how does it submit? Let's talk about it next.

Submission mode

KafkaConsumer API provides several ways to submit offsets

Automatic submission

The easiest way is to have the consumer automatically submit the offset. If enable.auto.commit is set to true, the consumer will automatically submit the maximum offset polled from the poll () method every 5 seconds. The commit interval is controlled by auto.commit.interval.ms, and the default is 5s. Like everything else in the consumer, automatic submission is done in a poll. In each poll, the consumer checks to see if the offset is committed, and if so, the offset returned from the previous poll is submitted.

Submit the current offset

Setting auto.commit.offset to false allows the application to decide when to commit the offset. Use commitSync () to submit the offset. This API commits the latest offset returned by the poll () method, returns as soon as the submission succeeds, and throws an exception if the submission fails.

CommitSync () will submit the latest offset returned by poll (). If you have finished processing all the records, make sure that commitSync () is called, otherwise there is still a risk of losing messages. If it occurs in equalization, all messages from the most recent batch of messages to those that occur in equilibrium will be repeated.

Asynchronous submission

The biggest difference between asynchronous commit commitAsync () and synchronous commit commitSync () is that asynchronous commit is not retried, synchronous commit is consistent with retry.

Combined synchronous and asynchronous submission

In general, for occasional commit failures, it will not be a big problem not to retry, because if the commit failure is caused by a temporary problem, then subsequent submissions will always be successful. But if you commit for the last time before closing the consumer or rebalancing, make sure the commit is successful.

Therefore, it is common to use a combination of commitAsync and commitSync to commit offsets before the consumer shuts down.

Submit a specific offset

The consumer API allows you to pass in the map of the partition and offset you want to commit when calling the commitSync () and commitAsync () methods, that is, commit a specific offset.

This is how the Kafka Consumer shared by the editor is understood. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report