In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
How to understand the mechanism of Kafka consumption and heartbeat? I believe many inexperienced people don't know what to do about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
Guide kafka is a distributed, partitioned, multi-copy, multi-subscriber message publishing and subscription system (distributed MQ system), which can be used to search logs, monitor logs, access logs and so on. Kafka is a distributed, partitioned, multi-copy, multi-subscriber message publishing and subscription system (distributed MQ system), which can be used to search logs, monitor logs, access logs and so on. Today, the editor will lead you to learn about the mechanism of Kafka consumption and heartbeat.
1. Kafka consumption
First of all, let's look at consumption. Kafka provides a very simple consumer API. Users only need to initialize the Broker Server address of the Kafka, and then instantiate the KafkaConsumer class to get the data in the Topic. The code for a simple Kafka consumption example is as follows:
Public class JConsumerSubscribe extends Thread {public static void main (String [] args) {JConsumerSubscribe jconsumer = new JConsumerSubscribe (); jconsumer.start ();} / * * initialize Kafka cluster information. * / private Properties configure () {Properties props = new Properties (); props.put ("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092"); / / specify Kafka cluster address props.put ("group.id", "ke"); / / specify consumer group props.put ("enable.auto.commit", "true") / / enable autocommit props.put ("auto.commit.interval.ms", "1000"); / / interval between autocommit / / deserialization message primary key props.put ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") / / deserialize consumption records props.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props;} / * * to implement a single-threaded consumer. * / @ Override public void run () {/ / create a consumer instance object KafkaConsumer consumer = new KafkaConsumer (configure ()); / / subscribe to consumption topic collection consumer.subscribe (Arrays.asList ("test_kafka_topic")); / / Real-time consumption ID boolean flag = true While (flag) {/ / get topic message data ConsumerRecords records = consumer.poll (Duration.ofMillis) For (ConsumerRecord record: records) / / circularly print the message record System.out.printf ("offset =% d, key =% s, value =% s% n", record.offset (), record.key (), record.value ());} / / an exception occurs to close the consumer object consumer.close ();}}
With the above code, we can get the data in Topic very easily. However, when we call the poll method to pull the data, Kafka Broker Server does those things. Next, we can look at the implementation details of the source code. The core code is as follows:
Org.apache.kafka.clients.consumer.KafkaConsumer
Private ConsumerRecords poll (final long timeoutMs, final boolean includeMetadataInTimeout) {acquireAndEnsureOpen (); try {if (timeoutMs)
< 0) throw new IllegalArgumentException("Timeout must not be negative"); if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } // poll for new data until the timeout expires long elapsedTime = 0L; do { client.maybeTriggerWakeup(); final long metadataEnd; if (includeMetadataInTimeout) { final long metadataStart = time.milliseconds(); if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { return ConsumerRecords.empty(); } metadataEnd = time.milliseconds(); elapsedTime += metadataEnd - metadataStart; } else { while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) { log.warn("Still waiting for metadata"); } metadataEnd = time.milliseconds(); } final Map records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() >0 | | client.hasPendingRequests () {client.pollNoWakeup ();} return this.interceptors.onConsume (new ConsumerRecords (records));} final long fetchEnd = time.milliseconds (); elapsedTime + = fetchEnd-metadataEnd;} while (elapsedTime)
< timeoutMs); return ConsumerRecords.empty(); } finally { release(); } } 上述代码中有个方法pollForFetches,它的实现逻辑如下: private Map pollForFetches(final long timeoutMs) { final long startMs = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); // if data is available already, return it immediately final Map records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call // updateAssignmentMetadataIfNeeded before this method. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout >RetryBackoffMs) {pollTimeout = retryBackoffMs;} client.poll (pollTimeout, startMs, ()-> {/ / since a fetch might be completed by the background thread, we need this poll condition / / to ensure that we do not block unnecessarily in poll () return! fetcher.hasCompletedFetches ();}) / / after the long poll, we should check whether the group needs to rebalance / / prior to returning data so that the group can stabilize faster if (coordinator.rejoinNeededOrPending ()) {return Collections.emptyMap ();} return fetcher.fetchedRecords ();}
In the bold position in the above code, we can see that every time the consumer client pulls data, the fetchedRecords function in fetcher is first called through the poll method. If the data is not available, a new sendFetches request will be initiated. When consuming data, there is a maximum data volume limit for each batch to pull data from Kafka Broker Server. The default is 500items, which is controlled by max.poll.records. You can set this attribute value in the client to adjust the amount of data pulled each time we consume.
Tip: note here that max.poll.records returns the sum of the data requested by an poll, regardless of the number of partitions. Therefore, the total number of pieces of data pulled by Topic from all partitions per consumption does not exceed the value set by max.poll.records.
In the class of Fetcher, there is a limit to pull data capacity in the sendFetches method, by max.partition.fetch.bytes, the default 1MB. There may be such a scenario, when the max.partition.fetch.bytes limit is met, if we need to Fetch 10000 records by default, then we need to execute 20 times to complete the Fetch of all requests initiated through the network.
Here, some students may have questions, can't we set the default value of the max.poll.records attribute to 10000? Can be adjusted, but there is another attribute that needs to be coordinated together, this is the timeout of each poll (Duration.ofMillis (100)), here you need to set the timeout according to your actual capacity of each data, if you set the maximum value to 10000, when you have a large capacity of each record, the timeout or 100ms, then you may pull less than 10000 pieces of data.
Here, there is another thing to pay attention to, which is the problem of session timeout. Session.timeout.ms defaults to 10s _ group.min.session.timeout.ms and 6s _ group.max.session.timeout.ms defaults to 30min. When you are dealing with the business logic of consumption, if it is not finished within 10 seconds, then the consumer client will be disconnected from Kafka Broker Server, and the resulting offset will not be submitted to Kafka, because Kafka Broker Server thinks that the consumer program has been disconnected, and even if you set the autocommit property or set the auto.offset.reset attribute, you will still consume repeatedly when you consume. This is due to the session.timeout.ms timeout.
2. Heartbeat mechanism
At the end of the above, it is mentioned that the session timeout leads to message repeated consumption. Why is there a timeout? Some students will have such a question, my consumer thread is obviously started, but also did not exit, why not consume the message of Kafka? The consumer group can't find my ConsumerGroupID. This may be caused by the timeout, which is controlled by Kafka through the heartbeat mechanism, which is insensitive to the consumer client, which is an asynchronous thread, and when we start a consumer instance, the heartbeat thread starts to work.
In org.apache.kafka.clients.consumer.internals.AbstractCoordinator, a HeartbeatThread thread is started to regularly send the heartbeat and detect the status of the consumer. Each consumer has an org.apache.kafka.clients.consumer.internals.ConsumerCoordinator, and each ConsumerCoordinator starts a HeartbeatThread thread to maintain the heartbeat. The heartbeat information is stored in the org.apache.kafka.clients.consumer.internals.Heartbeat, and the declared Schema is as follows:
Private final int sessionTimeoutMs; private final int heartbeatIntervalMs; private final int maxPollIntervalMs; private final long retryBackoffMs; private volatile long lastHeartbeatSend; private long lastHeartbeatReceive; private long lastSessionReset; private long lastPoll; private boolean heartbeatFailed
The implementation code of the run method in the heartbeat thread is as follows:
Public void run () {try {log.debug ("Heartbeat thread started"); while (true) {synchronized (AbstractCoordinator.this) {if (closed) return If (! enabled) {AbstractCoordinator.this.wait (); continue } if (state! = MemberState.STABLE) {/ / the group is not stable (perhaps because we left the group or because the coordinator / / kicked us out), so disable heartbeats and wait for the main thread to rejoin. Disable (); continue;} client.pollNoWakeup (); long now = time.milliseconds () If (coordinatorUnknown ()) {if (findCoordinatorFuture! = null | | lookupCoordinator () .failed ()) / / the immediate future check ensures that we backoff properly in the case that no / / brokers are available to connect to. AbstractCoordinator.this.wait (retryBackoffMs);} else if (heartbeat.sessionTimeoutExpired (now)) {/ / the session timeout has expired without seeing a successful heartbeat, so we should / / probably make sure the coordinator is still healthy. MarkCoordinatorUnknown ();} else if (heartbeat.pollTimeoutExpired (now)) {/ / the poll timeout has expired, which means that the foreground thread has stalled / / in between calls to poll (), so we explicitly leave the group. MaybeLeaveGroup ();} else if (! heartbeat.shouldHeartbeat (now)) {/ / poll again after waiting for the retry backoff in case the heartbeat failed or the / / coordinator disconnected AbstractCoordinator.this.wait (retryBackoffMs) } else {heartbeat.sentHeartbeat (now) SendHeartbeatRequest () .addListener (new RequestFutureListener () {@ Override public void onSuccess (Void value) {synchronized (AbstractCoordinator.this) {heartbeat.receiveHeartbeat (time.milliseconds () @ Override public void onFailure (RuntimeException e) {synchronized (AbstractCoordinator.this) { If (e instanceof RebalanceInProgressException) {/ / it is valid to continue heartbeating while the group is rebalancing. This / / ensures that the coordinator keeps the member in the group for as long / / as the duration of the rebalance timeout. If we stop sending heartbeats, / / however, then the session timeout may expire before we can rejoin. Heartbeat.receiveHeartbeat (time.milliseconds ());} else {heartbeat.failHeartbeat (); / / wake up the thread if it's sleeping to reschedule the heartbeat AbstractCoordinator.this.notify () }) }} catch (AuthenticationException e) {log.error ("An authentication error occurred in the heartbeat thread", e); this.failed.set (e) } catch (GroupAuthorizationException e) {log.error ("A group authorization error occurred in the heartbeat thread", e); this.failed.set (e);} catch (InterruptedException | InterruptException e) {Thread.interrupted (); log.error ("Unexpected interrupt received in heartbeat thread", e) This.failed.set (new RuntimeException (e));} catch (Throwable e) {log.error ("Heartbeat thread failed due to unexpected error", e); if (e instanceof RuntimeException) this.failed.set ((RuntimeException) e) Else this.failed.set (new RuntimeException (e));} finally {log.debug ("Heartbeat thread has closed");}}
This contains the two most important timeout functions in the heartbeat thread, which are sessionTimeoutExpired and pollTimeoutExpired.
Public boolean sessionTimeoutExpired (long now) {return now-Math.max (lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;} public boolean pollTimeoutExpired (long now) {return now-lastPoll > maxPollIntervalMs;}
2.1 、 sessionTimeoutExpired
If sessionTimeout times out, it will be marked as the current coordinator processing disconnect, at which point the consumer will be removed and the corresponding relationship between the partition and the consumer will be reassigned. In Kafka Broker Server, ConsumerGroup defines five states (or six states if Unknown is included), org.apache.kafka.common.ConsumerGroupState, as shown in the following figure:
2.2 、 pollTimeoutExpired
If the poll timeout is triggered, the consumer client will exit the ConsumerGroup at this time, and when poll again, it will rejoin the ConsumerGroup and trigger the RebalanceGroup. Instead of repeating poll for us, KafkaConsumer Client requires us to keep calling the poll method in the implemented consumption logic.
3. Partition and consumption thread
With regard to the corresponding relationship between consumption partition and consumption thread, theoretically, the number of consumption threads should be less than or equal to the number of partitions. Previously, there was a view that a consumer thread corresponds to a partition, and when the consumption thread is equal to the number of partitions, the utilization of threads is maximized. Use the KafkaConsumer Client instance directly, so there's really no problem with it. However, if we have a rich CPU, we can actually use threads larger than the number of partitions to improve our consumption capacity. This requires us to modify the KafkaConsumer Client instance to achieve consumption strategy precomputation, and use additional CPU to open more threads to achieve consumption task fragmentation.
After reading the above, have you mastered how to understand the mechanism of Kafka consumption and heartbeat? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.