Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Brief introduction of Kafka 0.10 KafkaConsumer process

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Network Security >

Share

Shulou(Shulou.com)06/01 Report--

ConsumerConfig.scala stores the configuration of Consumer

As far as I understand it, the 0.10 Kafka does not have a special SimpleConsumer, and it still uses version 0.8.

1. Starting with poll

The rules of consumption are as follows:

A partition can only be consumed by one thread of the same ConsumersGroup.

The number of threads is less than the number of partition, and some threads will consume multiple partition.

The number of threads is equal to the number of partition, and a thread consumes exactly one thread.

When a consumer thread is added, a change in the allocation of rebalance,partition is triggered.

The offset of the same partition ensures orderly consumption, while different partition consumption does not guarantee the order.

Usage of Consumers programming:

Private final KafkaConsumer consumer; / / consumer...consumer for communicating with Kafka = new KafkaConsumer (props); consumer.subscribe (Collections.singletonList (this.topic)); ConsumerRecords records = consumer.poll;

Consumer, is a pure single-threaded program, all the mechanisms discussed later (including coordinator,rebalance, heartbeat, etc.) are completed in this single-threaded poll function. As a result, there are no locks inside the consumer code.

1.1 components included

As you can see from the constructor of KafkaConsumer, KafkaConsumer has the following core components:

Metadata: store the mapping relationship between Topic/Partion and broker

NetworkClient: network layer A network client for asynchronous request/response network iCompo.

ConsumerNetworkClient: Higher level consumer access to the network layer / / A pair of NetworkClient encapsulation, non-thread safe

ConsumerCoordinator: just a class on the client side, just a medium to communicate with the GroupCoordinator on the server side. (the Coordinator on the broker side is responsible for reblance, Offset submission and heartbeat)

SubscriptionState: Topic of consumer, offset state maintenance of Partition

Fetcher: manage the fetching process with the brokers. / / get the message

The workflow of Consumers will be explained by components later.

1.2 the work process of Consumer consumers:

When consumer starts or when the coordinator node fails over, consumer sends ConsumerMetadataRequest to any brokers. In ConsumerMetadataResponse, it receives the location information of the Coordinator to which the corresponding Consumer Group belongs.

Consumer connects to the Coordinator node and sends HeartbeatRequest. If the IllegalGeneration error code is returned in the returned HeartbeatResponse, the orchestration node is already initializing the balance. The consumer will stop grabbing the data, submit the offsets, and send the JoinGroupRequest to the orchestrating node. In JoinGroupResponse, it receives a list of topic-partitions that the consumer should have, as well as the new generation number of the current Consumer Group. At this point, Consumer Group management is complete, and Consumer can start fetch data and submit the offsets for the partitions it owns.

If HeartbeatResponse returns without errors, Consumer will continue to grab data from the list of partitions it last owned, and the process will not be interrupted.

2 Design 2.0 MetaData

See the analysis in Producer.

In addition, both KafkaConsumer and KafkaProducer get the metadata information in the constructor and get the information by calling the metadata.update method.

2.1 coordinator Why, what to do 1. Go to zookeeper dependence-- Why?

In client api prior to 0. 9, consumer relied on Zookeeper. Because all the consumer in the same consumer group need to work together, the new channel IELTS training is related to the rebalance that will be discussed later. (ConsumerConnector, KafkaStream, ConsumerIterator)-package kafka.consumer

After 0. 9, the new consumer does not depend on Zookeeper, and the consumer in a consumerGroup is managed by Coordinator. (KafkaConsumer)-- package org.apache.kafka.clients.consumer

Why? Talk about it later.

Question: why is it that within a group, one parition can only be owned by one consumer?

2.coordinator Protocol / partition allocation problem

Given a topic, there are four partition: p0, p1, p2, p3, and a group has three consumer: c0, C1, c2.

So, if you press the RangeAssignor policy, the result of the assignment is:

C0: p0, c1: p1, c2: p2, p3

If you press the RoundRobinAssignor policy:

C0: p1, p3, c1: p1, c2: p2

Partition.assignment.strategy=RangeAssignor, default

What kind of distribution state is it?

So how is the whole distribution process carried out? See the following figure:

Three-step allocation process

1. Step 1: for each consumer group,Kafka cluster, select a broker from the broker cluster as its coordinator. So, the first step is to find the coordinator. (1 consumer group corresponds to one coordinattor)

GroupCoordinatorRequest: GCR, where ConsumerNetworkClient sends a request to find coordinator.

two。 Step 2: after finding the coordinator, send the JoinGroup request

Here consumer will be divided into leader and follower (irresponsibly: choose the first consumer)

Leader function: perform the leader synchronization and send back the assignment for the group (responsible for sending the results assigned by partition)

Follower function: send follower's sync group with an empty assignment

3. After step 3:JoinGroup returns, send SyncGroup to get the partition to which you are assigned

SyncGroupRequest

Consumer leader sends SyncGroupRequest to Coordinator,Coordinator and gives it back null.

Follower sends null's SyncGroupRequest to Coordinator,Coordinator and returns the result assigned to it by partition.

Note that in the above three steps, there is a key point:

The allocation strategy and result of partition is actually determined by client, not by coordinator. What do you mean? In step 2, after all the consumer sends JoinGroup messages to coordinator, coordinator specifies one of the consumer as the leader and the other consumer as the follower.

This leader then allocates the partition.

Then in step 3, leader sends the allocation result to coordinator through a SyncGroup message, and other consumer also sends a SyncGroup message to get the allocation result.

Next, we will go to Fetcher to pull data.

2.2 Fetcher

Four steps

Step 0: get the offset of consumer

Step 1: generate FetchRequest and put it in the sending queue

Step 2: network poll

Step 3: get the result

1. Get the offset of consumer

When consumer is launched for the first time, one of the most important problems is: how much offset to start consumption.

Before poll, send a request to the cluster and ask the cluster to tell the client what the current offset of the TopicPartition is. Through SubscriptionState, through ConsumerCoordinator

If (! subscriptions.hasAllFetchPositions () updateFetchPositions (this.subscriptions.missingFetchPositions ())

The core is: send an OffsetFetchRequest to Coordinator and call synchronously until the initial offset is obtained, and then start the next poll. (that is to say, if the information of Offset is stored in Kafka, it is stored in GroupCoordinator.)

Each TopicPartition of consumer has an initial offset, and then you can continuously loop to fetch messages, which is the process of Fetch:

two。 Generate FetchRequest and put it in the sending queue-- fetcher.initFetches (cluster)

The core is to generate FetchRequest: suppose a consumer subscribes to three topic: T0, T1, T2, and the partition assigned to it are: T0: p0; T1: p1, p2; T2: p2

That is, a total of 4 TopicPartition, namely t0p0, t0p1, t1p1, t2p2. These four TopicPartition may be distributed on two machines N0, N1: N0: t0p0, t1p1 N1: t0p1, t2p2

A FetchRequest, or Map, is generated for each machine. So there is a way to put all the TopicPartition belonging to the same Node together to generate a FetchRequest.

3. Network poll

Call ConsumerNetworkClient.poll to send a network request. Send a response request to the server and get a response from the server. (default: executeDelayedTasks=true)

4. Get the result-- fetcher.fetchedRecords ()

Gets the Response returned by Broker, which contains List records

2.3 offset confirmation mechanism

Automatic consumption confirmation: controlled by parameter auto.xxx.commit=true

Manual consumption: consumption control for customizing Consumers

The following is analyzed from the perspective of automatic consumption confirmation, which is realized by ConsumerCoordinator's AutoCommitTask.

Its call is in the DelayedTaskQueue delayedTasks of ConsumerNetworkClient and is then called periodically. Periodically send confirmation messages, similar to HeartBeat, and its implementation mechanism is DelayedQueue + DelayedTask.

Confirm once: submission of offset

Comments in the poll function:

/ / execute delayed tasks (e.g. Autocommits and heartbeats) prior to fetching records

It can be understood like this: on the second poll call, the offset and heartbeat of the last poll are submitted.

First submit the offset, and then pull the record. So this time Offset is actually the offset of the Record of the last poll.

Therefore, when you write the program according to the logic below, it may cause the heartbeat of Consumer and Coordinator to time out.

While (true) {consumer.poll (); do process message / / if this takes too long, the consumer will not be able to send a heartbeat to coordinator, causing it to mistakenly assume that the consumer has lost contact, causing unnecessary rebalance. In bad cases, duplicate consumption data will be lost. }

Therefore, it is necessary to take out the submission of offset as a separate thread.

At this point, the whole Consumer process is finished.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Network Security

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report