Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Thinking caused by an abnormal consumer of KAFKA

2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Problem description:

A server appeared online very slowly, so the kafka broker on the server was turned off. After shutting down, some kafka consumer cannot consume data normally. Log error:

O.a.kakfa.clients.consumer.internals.AbstractCordinator Marking the coordinator (39.0.2.100) as dead.

Reason:

After some investigation, it is found that the consumer group message:

(kafka.coordinator.GroupMetadataMessageFormatter type):

GroupId:: [groupId,Some (consumer), groupState,Map (memberId-> [memberId,clientId,clientHost,sessionTimeoutMs],...-> []...)]

Stored in the KAFKA internal topic: _ _ consumer_offsets, whose key is groupId.

At the same time, it is found that the broker parameter offsets.topic.replication.factor is incorrectly set to 1. This parameter represents the number of copies of TOPIC: _ _ Consumer_offsets. So once a broker is closed, if the closed Broker is the Leader of some partition of _ _ Consumer_offsets. Makes some consumer group unavailable. Once broker has been started, you need to manually expand the number of copies through the command line.

Reassignment.json: {"version": 1, "partitions": [{"topic": "xxx", "partition": 0, "replicas": {brokerId1, brokerId2}}]} kafka-reassign-partitions-- zookeeper localhost:2818-- reassignment-json-file reassignment.json-- execute

The process of searching for Consumer Coordinator on the client:

Client org.apache.kafka.clients.consumer.internals.AbstractCoordinator

If the Coordinator is unknown (AbstractCoordinator.coordinatorUnknown ()), initiate a request lookupCoordinator and send the FindCoordinatorRequest to the node with the lowest load

The server KafkaApis.handleFindCoordinatorRequest receives the request:

First, call the hashCode of GroupMetaManager.partitionFor (consumerGroupId) consunerGroupId to get the partition id for the total number of fragments of _ _ consumer_offsets, then find the corresponding Partition Metadata of partition from the Topic of _ _ consumer_offset, and get the corresponding Partition leader and return it to the client.

Extended thinking

What exactly is the failover mechanism of KAFKA? If _ _ consumer_offset sets the correct number of copies, what is the process of re-election. If some replicas are unavailable after broker downtime, will the replicas be automatically migrated to other nodes? With these questions, I took a little look at the code related to KAFKA:

When a Broker is turned off, there are two steps:

KafkaController.onBrokerFailure-> KafkaController.onReplicasBecomeOffline

It is mainly through the PartitionStateMachine.handleStateChanges method to inform the Partition state machine to set the state to offline. The ReplicaStateMachine.handleStateChanges method changes the Replica state to OfflineReplica and modifies partition ISR. If the broker is partition leader, then you need to re-trigger the partition leader election, and finally send LeaderAndIsrRequest to get the latest Leader ISR information.

KafkaController.unregisterBrokerModificationsHandler unregisters BrokerModificationsHandler and cancels listening for broker events in zookeeper.

When an ISR request is made, KafkaApis.handleLeaderAndIsrRequest () is called. If you need to change whether the partition of leader belongs to the special topic of _ _ consumer_offset, it depends on whether the current broker node is partition leader. GroupCoordinator.handleGroupImmigration and GroupCoordinator.handleGroupEmmigration are called respectively. If it is partition leader, GroupCoordinator.handleGroupImmigration-> GroupMetadataManager.loadGroupsForPartition will re-read the group data from _ _ consumer_offset to the local metadata cache. If it is partition follower, GroupCoordniator.handleGroupImigration-> GroupMetadataManager.removeGroupsForPartition will remove the group information from metadata cache. And change the state of group to dead. OnGroupUnloaded in the callback function. Notify all group members waiting for join or sync at the same time.

KAFKA does not automatically migrate partition copies when Broker is closed. At this point, the copy on the closed Broker becomes under replicated. This state continues until the Broker is pulled back and the new data is caught up, or until the user manually replicates to another node from the command line.

Officials suggest setting two parameters to guarantee graceful shutdown. The former controlled.shutdown.enable=true auto.leader.rebalance.enable=true ensures that log data is synchronized to disk and re-elected before shutdown. The latter ensures that the pre-downtime leader state will be obtained again after broker recovery. Avoid reading and writing hotspots caused by uneven distribution of leader.

Reference

Https://blog.csdn.net/zhanglh046/article/details/72833129

Https://blog.csdn.net/huochen1994/article/details/80511038

Https://www.jianshu.com/p/1aba6e226763

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report