Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

An example Analysis of Kafka packet consumption

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail the example analysis of Kafka grouping consumption. The content of the article is of high quality, so the editor will share it for you as a reference. I hope you will have some understanding of the relevant knowledge after reading this article.

1Kafka consumption pattern

From kafka consumption messages, kafka clients provide two modes: partition consumption and group consumption.

Regional consumption corresponds to our DirectKafkaInputDStream.

Group consumption corresponds to our KafkaInputDStream.

Relationship between the number of consumers and the number of districts:

1), a consumer can consume one to all partitioned data

2), group consumption. All consumers in the same group consume a complete piece of data. In this case, a partition data can only be consumed by one consumer, while a consumer can consume multiple partition data.

3), in the same consumption group, when the number of consumers is greater than the number of divisions, consumers will have spare time = number of divisions-number of consumers

2 the strategy of rebalancing consumption in groups

When consumer joins or leaves a group, partitions equilibrium partition.assignment.strategy will be triggered, which determines the allocation policy of partition to consumers. There are two allocation strategies:

1,org.apache.kafka.clients.consumer.RangeAssignor

This rebalancing method is used by default, which only reallocates all partitions of a single topic subscribed to by consumers. The algorithm for Consumer Rebalance is as follows:

1) sort all the Partirtion under the target Topic and store them in TP

2), all Consumer under a Consumer Group are sorted by name according to dictionary, stored in CG, and the first Consumer is marked as Ci

3), N=size (TP) / size (CG)

4), R=size (TP)% size (CG)

5), the starting position of the partition obtained by Ci = N*i+min (iQuery R)

6), the total number of partitions obtained by Ci = N+ (if (i+ 1 > R) 0 else 1)

2,org.apache.kafka.clients.consumer.RoundRobinAssignor

This allocation strategy is allocated to all partitions of all topic consumed by consumers. Rebalance is triggered when a new consumer joins or a consumer exits. There are two requirements for this approach

A), assigning each topic the same number of streams when instantiating each consumer

B), each consumer instance must subscribe to the same topic

Map topicCountMap = new HashMap (); topicCountMap.put (topic, new Integer (1)); Map consumerMap = consumer.createMessageStreams (topicCountMap)

Where the value corresponding to topic is the number of streams. The corresponding kafka source code is in

In the consume method of kafka.consumer.ZookeeperConsumerConnector, the same number of KafkaStream is built based on this parameter.

The specific allocation steps for this strategy:

1) all partitions of all topic are sorted according to the hash after topic+partition to string

2) to sort consumers by dictionary

3), and then assign the division to the consumers by rotation training

3, give examples for comparison

For example, for example, there are two consumers (c0Magnec 1), two topic (t0Query T1), and each topic has three partitions p (0-2).

Then using RangeAssignor, the result is:

* C0: [t0p0, t0p1, t1p0, t1p1]

* C1: [t0p2, t1p2]

Using RoundRobinAssignor, the results are as follows:

* C0: [t0p0, t0p2, t1p1]

* C1: [t0p1, t1p0, t1p2]

4 Survival detection of group members

A better function of grouping consumption is to automatically detect failed consumers and kick them out of the group, and then redistribute them. So how does kafka detect failed consumers? Let's take 0.10.x as an example.

After consuming a group of subscribed topic, the grouping will be added when the poll (long) function is called, and the new consumers in the group will be rebalanced. The Poll function is designed to keep consumers alive. As long as the poll function is called continuously, the consumer stays in the group and continuously consumes messages from the partition assigned to him. Consumers also use a background thread to send periodic heartbeats to broker. If the consumer dies or fails to send a heartbeat within the session.timeout.ms time frame, the consumer will be considered dead and its partition will be reassigned. Session.timeout.ms defaults to 10000ms. The value should be between group.max.session.timeout.ms=300000ms and group.min.session.timeout.ms=6000ms.

Because the heartbeat is sent periodically by the background thread, there will be situations where the consumer heartbeat is sent normally, but does not consume the message. To avoid this situation where consumers indefinitely occupy the partitions assigned to them, kafka provides a survival detection mechanism using max.poll.interval.ms configuration. Basically, if the interval between two calls to the poll function is greater than this value, the consumer will leave the grouping, and its partition will be consumed by other consumers. When this happens, you will receive an exception with a failed offset submission. This mechanism ensures that only active consumers can submit offset.

Consumers have two configurations to control the behavior of the poll function:

Max.poll.interval.ms: increasing the interval between two calls to poll actually increases the time it takes for consumers to process messages pulled by the last poll. The downside, of course, is that increasing this value increases the rebalancing time of the consumer group, because consumers can participate in rebalancing only during the call to poll. One thing to note, request.timeout.ms=305000, the default value should be modified larger than max.poll.interval.ms, that is, greater than 5min. This value is the blocking time of the JoinGroup request on the server side when the consumer rebalances.

Max.poll.records: limits the maximum number of messages returned per call to poll. With this parameter, we can estimate twice.

In some cases, the data processing time is unpredictable, and the above two parameters are not difficult to meet the requirements. In this case, it is recommended that the message processing be executed in another background thread so that consumers can continuously call the poll function. But in this case, it is necessary to deal with the problem submitted by offset. A typical practice is to disable the automatic submission of offset and manually re-submit the offset after message processing. In this case, you need to call the pause function on the partition of the consumption, so that the new data is not accepted when the poll function is called, and then the consumption can be resumed by calling resume (Collection) after processing.

This is the end of the sample analysis of Kafka grouping consumption. I hope the above content can be helpful to you and learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report