Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is rebalance in KAFKA

2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the relevant knowledge of "what is rebalance in KAFKA". In the operation of actual cases, many people will encounter such a dilemma. Then let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

First, write in front

Let's comb through rebalance from beginning to end.

What is rebalance?

Chinese literal translation means rebalancing.

What is to rebalance? Consumer members in the consumer group rebalance. (if you are not clear about the concept of consumer group, you will only mention these concepts later when I write about the consumption module.)

Why do you need rebalancing? Because of the failover and dynamic partition allocation of members in the consumer group.

Translate:

Failover of members in a consumer group: when there are three consumers in a consumer group, respectively, the consumption zone: amemb

A-> aB-> bC-> c

If there is something wrong with consumer An at this time, it means that there are no consumers to consume in zone a, which is definitely not possible. Then partition an is assigned to other surviving consumer clients through rebalance, and the consumption strategy that may be obtained after rebalance:

A-> a (GG) B-> bmai a C-> c

This is the failover of the members of the consumer group, where a consumer client has a problem and assigns its original consumption partition to other surviving consumer clients through REBALNACE.

Dynamic partition allocation: when the number of partitions of a topic changes, the number of partitions that can be consumed for consumer groups changes, so you need rebalance to re-allocate dynamic partitions. For example, a topic has only 3 partitions, but now I have expanded to 10 partitions. Does it not mean that there are 7 more partitions without consumer consumption? This is obviously not possible, so the rebalance process is needed to allocate partitions, allowing existing consumers to consume all 10 partitions.

Third, how is rebalance triggered?

In fact, this has already been mentioned in the above section, and I will make a little more supplement and summary in this section.

Trigger condition:

Changes in the membership of the consumer group: offline / online / failure is kicked out.

The number of partitions for consumption has changed: topic has been deleted and the number of topic partitions has increased.

There is a problem with the coordinator node: because the metadata information of the consumer group is in the coordinator node, a problem with the coordinator node will also trigger rebalance to find a new coordinator node. How can I find it? Obviously just go through the FIND_COORDINATOR request, and then find the node with the lowest load and ask, where is my new coordinator? Then get the answer and ask the consumer client to connect to the new coordinator node.

IV. The macroscopic process of rebalance

The whole process of rebalance is a process of state machine transfer. The schematic diagram of the whole process is as follows: source: https://www.cnblogs.com/huxi2b/p/6815797.html

In fact, the above state machine transfer process in the case of understanding the principle, has been very clear, but if you have not seen the source code, still do not know why it is so circulating, under what circumstances is Empty, what state is Stable? When does the Empty state transition to the PreparingRebalance state?

Let me take a look at the flow process of the whole state according to the order of requests:

Let's answer some of the more detailed questions raised later in the last section:

What key data do these requests carry?

When FIND_COORDINATOR requests, it will bring its own group. id value, which is used to calculate where its coordinator is. The corresponding calculation method is: coordinatorId=groupId.hash% 50 is a number that represents a specific partition. Which topic partition? It's obviously _ _ consumer_offsets.

In the JOIN_GROUP request, there are no key parameters, but in the response, a client is selected as the leader, and then in the response, it is told that it is selected as leader and the consumer group metadata information is sent to it, and then the client is assigned to the partition.

When making a SYNC_GROUP request, leader will bring the partition allocation scheme it has assigned according to the specific policy. After receiving it, the server will update it to the metadata, and then the rest of the consumer client will tell it which partitions to consume as soon as it sends the SYNC request, and then let it consume the ok.

Which stage causes the rebalance process to deteriorate to a few minutes?

In my diagram, the JOIN phase is specially marked in red to make this stage more conspicuous, and yes, this stage will cause the whole rebalance process to deteriorate to a few minutes.

The specific reason is that the JOIN phase will wait for the surviving members of the original group to send the JOIN_GROUP request. If the members of the original group have not sent the JOIN_GROUP request because of business processing, the server will wait until the timeout. This timeout is the value of max.poll.interval.ms, and the default is 5 minutes, so in this case, the time spent on rebalance will be reduced to 5 minutes, resulting in all consumers being unable to spend normally, which has a great impact.

Why is it divided into so many stages?

This is mainly a design consideration, and the whole process is designed very elegantly. Three requests are needed for the first connection, and the normal consumer only needs two requests for rebalance, because it already knows where its coordinator is, so it does not need a FIND_COORDINATOR request, unless its coordinator is down.

After answering these questions, do you have a better understanding of the whole rebalance process? In fact, there are many details that have not been covered, such as when will the consumer client enter the rebalance state? How does the server wait for members of the original consumer group to send JOIN_GROUP requests? These questions can only be looked at the source code step by step.

I will not type and write the source code of the FIND_COORDINATOR request, it is very simple that you can turn it over by yourself, that is, you have brought a group.id, which is mentioned above.

VI. Source code analysis of JOIN stage

We know from this function that if you join a new consumer group, the server will create a group when it receives the first JOIN request. The initial state of the group is Empty.

/ / if the group does not exist and there is a memberId, it is considered an illegal request and rejected directly. GroupManager.getGroup (groupId) match {case None = > / / where group does not exist yet MemberId is naturally empty if (memberId! = JoinGroupRequest.UNKNOWN_MEMBER_ID) {responseCallback (joinError (memberId, Errors.UNKNOWN_MEMBER_ID))} else {/ / initial state is EMPTY val group = groupManager.addGroup (new GroupMetadata (groupId, initialState = Empty)) / / perform specific add-in operations doJoinGroup (group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType) Protocols, responseCallback)} case Some (group) = > doJoinGroup (group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)}

Let's go to the doJoinGroup function and take a look at the core logic:

Case Empty | Stable = > / / initial state is EMPTY, add member and execute rebalance if (memberId = = JoinGroupRequest.UNKNOWN_MEMBER_ID) {/ / if the member id is unknown, register the member to the group addMemberAndRebalance (rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group ResponseCallback)} else {/ /...} else {/ /...} private def addMemberAndRebalance (rebalanceTimeoutMs: Int, sessionTimeoutMs: Int, clientId: String ClientHost: String, protocolType: String, protocols: List [(String, Array [byte])], group: GroupMetadata Callback: JoinCallback) = {/ / initialize memberID val memberId = clientId + "-" > def add (member: MemberMetadata) {if (members.isEmpty) this.protocolType = Some (member.protocolType) assert (groupId = = member.groupId) assert (this.protocolType.orNull = = member.protocolType) assert (supportsProtocols (member.protocols)) / coordinator to elect leader is simple For the member if (leaderId.isEmpty) leaderId = Some (member.memberId) members.put (member.memberId, member)} of the first join_group request sent

The translation of the above code is very simple, that is, there is a new member, encapsulated and added to the group. What we need to say is that when the group status is Empty, the one who connects first is leader. Then prepare the rebalance:

Private def maybePrepareRebalance (group: GroupMetadata) {group.inLock {if (group.canRebalance) prepareRebalance (group)}} / / here is the input PreparingRebalance status, and then get a SET / / translation: only the status in this SET (Stable, CompletingRebalance, Empty) To open rebalance def canRebalance = GroupMetadata.validPreviousStates (PreparingRebalance). Privateval validPreviousStates: Map [GroupState, set [GroupState]] = Map (Dead-> Set (Stable, PreparingRebalance, CompletingRebalance, Empty, Dead), CompletingRebalance-> Set (PreparingRebalance), Stable-> Set (CompletingRebalance), PreparingRebalance-> Set (Stable, CompletingRebalance, Empty), Empty-> Set (PreparingRebalance) private def prepareRebalance (group: GroupMetadata) {/ / if any members are awaiting sync Cancel their request and have them rejoin if (group.is (CompletingRebalance)) resetAndPropagateAssignmentError (group, Errors.REBALANCE_IN_PROGRESS) val delayedRebalance = if (group.is (Empty)) new InitialDelayedJoin (this, joinPurgatory, group, groupConfig.groupInitialRebalanceDelayMs,// default 3000ms, i.e. 3s groupConfig.groupInitialRebalanceDelayMs, max (group.rebalanceTimeoutMs-groupConfig.groupInitialRebalanceDelayMs, 0) else new DelayedJoin (this, group) Group.rebalanceTimeoutMs) / / the timeout here is the poll interval of the client Default 5-minute / / state machine conversion: EMPTY-> PreparingRebalance group.transitionTo (PreparingRebalance) / / rebalance start marking log info (s "Preparing to rebalance group ${group.groupId} with old generation ${group.generationId}" + s "(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor (group.groupId)})") / / add val groupKey = GroupKey (group.groupId) joinPurgatory.tryCompleteElseWatch (delayedRebalance, Seq (groupKey))}

There are two key points in the above code. One is to determine whether you can enter the rebalance process. You can see that only the status in (Stable, CompletingRebalance, Empty) can be enabled for rebalance. When you first came to the first member, the status of the group was obviously Empty, but it was changed to PreparingRebalance recently, so the subsequent member cannot enter after sending JOIN requests, so you can only set a callback and wait.

So when do we have to wait? The second code clearly states that waiting for a deferred task timed out. The creation of this deferred task is based on the current state. If it is Empty, create an InitialDelayedJoin deferred task with a timeout of 3s; if it is not Empty, create a DelayedJoin. The timeout is 5min by default. Look, the source code tells the truth, and this is the JOIN phase waiting for the member code to be implemented.

I need to add here, why do you have to wait for 3s in the state of Empty? This is actually an optimization, mainly to optimize the situation in which multiple consumers are connected at the same time. Take Chestnut, 10 consumers can start and practice in 3 seconds. If you wait for 3 seconds, then the rebalance process will be done. If you don't wait, it means that you have to open rebalance again. A total of 10 times of rebalance will take a long time. Details can be found at https://www.cnblogs.com/huxi2b/p/6815797.html

In addition, why is the delay of 5 minutes when the status is not Empty? In fact, the above answer is to wait for online consumers in the original consumer group to send JOIN requests, which is also the main reason for the time-consuming deterioration of the rebalance process.

Next, let's take a look at what these two deferred tasks do when they time out. The first is InitialDelayedJoin:

/ * * Delayed rebalance operation that is added to the purgatory when a group is transitioning from * Empty to PreparingRebalance * * When onComplete is triggered we check if any new members have been added and if there is still time remaining * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise we complete the * rebalance. * / private [group] class InitialDelayedJoin (coordinator: GroupCoordinator, purgatory: DelayedOperationPurgatory [DelayedJoin], group: GroupMetadata, configuredRebalanceDelay: Int, delayMs: Int RemainingMs: Int) extends DelayedJoin (coordinator, group, delayMs) {/ / Dead is false here In order not to be completed in tryComplete override def tryComplete (): Boolean = false override def onComplete (): Unit = {/ / deferred task processing group.inLock {/ / newMemberAdded is followed by a new member, it will be 3s when true / / remainingMs first creates the deferred service. / / so this condition holds for the first time if (group.newMemberAdded & & remainingMs! = 0) {group.newMemberAdded = falseval delay = min (configuredRebalanceDelay, remainingMs) / / the newly calculated remaining is always equal to 0, in fact, it is essentially 3-3 zero, / / so even if a new InitialDelayedJoin is created here. The timeout of this task is the next moment / / the purpose of writing this is actually equivalent to completing the delayed task val remaining = max (remainingMs-delayMs, 0) purgatory.tryCompleteElseWatch (new InitialDelayedJoin (coordinator, purgatory, group, configuredRebalanceDelay, delay, remaining) Seq (GroupKey (group.groupId))} else / / if no new member is added Directly call the function of the parent class / / complete the JOIN phase super.onComplete ()}

The general idea I wrote in the comments, in fact, is to wait for 3 seconds, and then call the parent function to complete the entire JOIN phase, but do not contact the context to see, or quite painstaking, right to see this need to have an understanding of the time wheel source code, as I have written in front, if you have anything unclear can take a look.

Then take a look at what happens when DelayedJoin times out:

/ * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance * * Whenever a join-group request is received, check if all known group members have requested * to re-join the group; if yes, complete this operation to proceed rebalance. * * When the operation has expired, any known members that have not requested to re-join * the group are marked as failed, and complete this operation to proceed rebalance with * the rest of the group. * / private [group] class DelayedJoin (coordinator: GroupCoordinator, group: GroupMetadata, rebalanceTimeout: Long) extends DelayedOperation (rebalanceTimeout, Some (group.lock)) {override def tryComplete (): Boolean = coordinator.tryCompleteJoin (group, forceComplete _) override def onExpiration () = coordinator.onExpireJoin () override def onComplete () = coordinator.onCompleteJoin (group)} / / nothing after timeout, Because you really don't have to do anything. Just leave it empty / / the core is onComplete function and tryComplete function def onExpireJoin () {/ / TODO: add metrics for restabilize timeouts} def tryCompleteJoin (group: GroupMetadata, forceComplete: () = > Boolean) = {group.inLock {if (group.notYetRejoinedMembers.isEmpty) forceComplete () else false}} def notYetRejoinedMembers = members.values.filter (_ .awaitingJoinCallba ck = = null). ToList def forceComplete (): Boolean = {if (completed.compareAndSet (false) True) {/ / cancel the timeout timer cancel () onComplete () true} else {false}} def onCompleteJoin (group: GroupMetadata) {group.inLock {/ / remove any members who haven't joined the group yet / / if members of the group still fail to connect Then delete it. Receive the current JOIN phase group.notYetRejoinedMembers.foreach {failedMember = > group.remove (failedMember.memberId) / / TODO: cut the socket connection to the client} if (! group.is (Dead)) {/ / State Machine transfer: preparingRebalancing-> CompletingRebalance group.initNextGeneration () if (group.is (Empty)) {info (s "Group ${group.groupId}) With generation ${group.generationId} is now empty "+ s" (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor (group.groupId)}) ") groupManager.storeGroup (group Map.empty, error = > {if (error! = Errors.NONE) {/ / we failed to write the empty group metadata. If the broker fails before another rebalance, / / the previous generation written to the log will become active again (and most likely timeout). / / This should be safe since there are no active members in an empty generation, so we just warn. Warn (s "Failed to write empty metadata for group ${group.groupId}: ${error.message}")} else {/ / JOIN phase flag end log info (s "Stabilized group ${group.groupId} generation ${group.generationId}" + s "(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor (group.groupId)}) ) ") / / trigger the awaiting join group response callback for all the members after rebalancing for (member responseCallback (Array.empty) Errors.UNKNOWN_MEMBER_ID) case PreparingRebalance = > responseCallback (Array.empty, Errors.REBALANCE_IN_PROGRESS) / / only group will be processed when it is in compeletingRebalance state / / the rest of the state is error case CompletingRebalance = > / / set callback to the current member After that, it does nothing and does not return / / wait until leader's partition scheme is in place before it is returned. Group.get (memberId) .awaitingSyncCallback = responseCallback / / if this is the leader, then we can attempt to persist state and transition to stable / / only the SYNC that receives the leader will be processed And transfer if (group.isLeader (memberId)) {info (s "Assignment received from leader for group ${group.groupId} for generation ${group.generationId}") / / fill any missing members with an empty assignment val missing = group.allMembers-- groupAssignment.keySet val assignment = groupAssignment + + missing.map (_-> Array.empty [Byte]). ToMap groupManager.storeGroup (group) Assignment, (error: Errors) = > {group.inLock {/ / another member may have joined the group while we were awaiting this callback, / / so we must ensure we are still in the CompletingRebalance state and the same generation / / when it gets invoked If we have transitioned to another state, then do nothing if (group.is (CompletingRebalance) & & generationId = = group.generationId) {if (error! = Errors.NONE) {resetAndPropagateAssignmentError (group, error) maybePrepareRebalance (group)} else {setAndPropagateAssignment (group) Assignment) / / State machine flow: CompletingRebalance-> Stable group.transitionTo (Stable)})} / / if it is already in stable state It indicates that leader has uploaded the partition allocation scheme / / then it is good to return the corresponding scheme directly from the metadata of group case Stable = > / / if the group is stable, we just return the current assignment val memberMetadata = group.get (memberId) responseCallback (memberMetadata.assignment, Errors.NONE) / / enable heartbeat detection completeAndScheduleNextHeartbeatExpiration (group) Group.get (memberId))}}

We may have a question about the above code case handling, why only leader's SYNC requests are processed? If the other consumer comes in the morning than leader, will it be stuck here? Instead of adding a time wheel to set a maximum timeout as in the JOIN phase? If leader doesn't send SNYC requests all the time, won't all the members wait here and wait indefinitely?

Let's answer them one by one. First of all, let's look at the code above. The first thing to do with each request is to set a callback, and then go and wait until leader brings up the partition allocation scheme through the SYNC request.

The second question is, if the other consumer arrives earlier than leader, will you just wait? Yes, that's right. That's what the code says.

Third question, why not set a maximum timeout or something? We can take a look at the client code. Once rebalance is enabled, only relevant requests will be sent and received, which means that after receiving the return of the JOIN phase, leader will not have any influence from the business code, directly assigning partitions and then sending SYNC requests. This means that there is theoretically no blocking between leader's JOIN response and SYNC request, so there is no need to set a timeout or join the time round.

Fourth, leader has been waiting without sending a SYNC request. Yes, that's what the code says. But if you think about it, what can keep leader from sending SYNC requests all the time? All I can think of is that the GC/leader is down. In either case, it will be detected by the server because of a problem with the heartbeat thread, so restart the next round of rebalance after the corresponding heartbeat task times out. Even if GC resumes and continues to send SYNC requests after a long time, it will get an error and return to start the next round of rebalance due to generation mismatch.

Finally, let's take a look at what leader will do when he arrives:

Private def setAndPropagateAssignment (group: GroupMetadata, assignment: Map [String, Array [Byte]]) {assert (group.is (CompletingRebalance)) / / assign group.allMemberMetadata.foreach (member = > member.assignment = assignment (member.memberId)) / / propagate this allocation scheme propagateAssignment (group, Errors.NONE)} private def propagateAssignment (group: GroupMetadata) throughout group Error: Errors) {/ / traversal / / if follower arrives at the SYNC request before leader / / then only a callback will be set I will do nothing and will not return / / until leader comes with the allocation scheme and changes the status to stable before traversing / / to see which member has sent the request, set the callback, and then return them to the corresponding partition scheme / / so the name is [Propagation allocation Scheme] / / wonderful for (member sessionTimeout). } public synchronized void maybeLeaveGroup () {if (! coordinatorUnknown () & & state! = MemberState.UNJOINED & & generation! = Generation.NO_GENERATION) {/ / this is a minimal effort attempt to leave the group. We do not / / attempt any resending if the request fails or times out. Log.debug ("Sending LeaveGroup request to coordinator {}", coordinator); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder (groupId, generation.memberId); client.send (coordinator, request) .compose (new LeaveGroupResponseHandler ()); client.pollNoWakeup ();} resetGeneration ();}

To sum up, the problem of how to locate rebalance is to find the flag log, and then eliminate it. If it really doesn't work, open the debug log.

Let's move on to the second question: how long does it take to rebalance at a time? Why did it deteriorate to a few minutes? Because the whole rebalance process is a linear process, that is, the status flows according to the request order, it is good to find the corresponding flag log. Open flag log:

/ / rebalance starts marking log info (s "Preparing to rebalance group ${group.groupId} with old generation ${group.generationId}" + s "(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor (group.groupId)})")

Two end identification logs: both end logs are fine, because both represent the completion of the rebalance process, and the reason is clear above.

/ / JOIN phase flag end log info (s "Stabilized group ${group.groupId} generation ${group.generationId}" + s "(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor (group.groupId)})") / / SYNC phase end log info (s "Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")

So how do you count the time of the entire rebalance process? Obviously, the end time-the start time.

After knowing why and why rebalance is turned on, how to locate the business problem? Heartbeat timeout: because the heartbeat thread is a daemon thread, it is usually due to the fact that the CPU cannot be obtained at the heartbeat site due to the high machine load on the client.

The poll interval exceeds the configuration: obviously, after the data is released by poll, the time for business processing is too slow. It is recommended to optimize the consumption logic according to the business and change it to multi-thread consumption or asynchronous consumption.

9. How do consumers perceive rebalance?

This is very simple. Let's think about it. All the metadata related to this group is in coordinator. Which requests will interact with coordinator? HEARTBEAT/OFFSET_COMMIT, just these two. In fact, normal member relies on these two requests to perceive that they are going to do rebalance. Let's take a look at them separately.

The first is the HEARTBEAT request, each time with the generation value of the current consumer group, that is, the era value. If the server rebalance has been completed, the era value is + 1, then you will find that you do not match, and then you will set the logo of your own RejoinNeeded and open rebalance in the next round of poll.

If the rebalance has not been completed, it is even easier. If you find that the status of the group is not stable, directly return the corresponding error, then set the identity and add it to the rebalance process.

Server source code:

Case Some (group) = > group.inLock {group.currentState match {case Dead = > / / if the group is marked as dead, it means some other thread has just removed the group / / from the coordinator metadata; this is likely that the group has migrated to some other / / coordinator OR the group is ina transient unstable phase. Let the member retry / / joining without the specified member id ResponseCallback (Errors.UNKNOWN_MEMBER_ID) case Empty = > responseCallback (Errors.UNKNOWN_MEMBER_ID) case CompletingRebalance = > if (! group.has (memberId) responseCallback (Errors.UNKNOWN_MEMBER_ID) else responseCallback (Errors.REBALANCE_IN_PROGRESS) Case PreparingRebalance = > if (! group.has (memberId)) {responseCallback (Errors.UNKNOWN_MEMBER_ID)} else if (generationId! = group.generationId) {responseCallback (Errors.ILLEGAL_GENERATION)} else {val member = group.get (memberId) completeAndScheduleNextHeartbeatExpiration (group Member) responseCallback (Errors.REBALANCE_IN_PROGRESS)} case Stable = > if (! group.has (memberId)) {responseCallback (Errors.UNKNOWN_MEMBER_ID) / / Epoch switching} else if (generationId! = group.generationId) { ResponseCallback (Errors.ILLEGAL_GENERATION)} else {val member = group.get (memberId) / / complete the last delay Create a new deferred task completeAndScheduleNextHeartbeatExpiration (group, member) / / callback response responseCallback (Errors.NONE)}

Client source code:

Private class HeartbeatResponseHandler extends CoordinatorResponseHandler {@ Override public void handle (HeartbeatResponse heartbeatResponse, RequestFuture future) {sensors.heartbeatLatency.record (response.requestLatencyMs ()); Errors error = heartbeatResponse.error (); if (error = = Errors.NONE) {log.debug ("Received successful Heartbeat response"); future.complete (null) } else if (error = = Errors.COORDINATOR_NOT_AVAILABLE | | error = = Errors.NOT_COORDINATOR) {log.debug ("Attempt to heartbeat since coordinator {} is either not started or not valid.", coordinator ()); markCoordinatorUnknown (); future.raise (error) } else if (error = = Errors.REBALANCE_IN_PROGRESS) {log.debug ("Attempt to heartbeat failed since group is rebalancing"); requestRejoin (); future.raise (Errors.REBALANCE_IN_PROGRESS);} else if (error = = Errors.ILLEGAL_GENERATION) {log.debug ("Attempt to heartbeat failed since generation {} is not current", generation.generationId) ResetGeneration (); future.raise (Errors.ILLEGAL_GENERATION);} else if (error = = Errors.UNKNOWN_MEMBER_ID) {log.debug ("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId); resetGeneration (); future.raise (Errors.UNKNOWN_MEMBER_ID) } else if (error = = Errors.GROUP_AUTHORIZATION_FAILED) {future.raise (new GroupAuthorizationException (groupId));} else {future.raise (new KafkaException ("Unexpected error in heartbeat response:" + error.message ();}} protected synchronized void requestRejoin () {this.rejoinNeeded = true;}

So when our client sees this exception, we know what's going on, that is, I'm in the process of rebalance, or I've already finished it, and the era on the client is wrong.

REBALANCE_IN_PROGRESS (27, "The group is rebalancing, so a rejoin is needed.", new ApiExceptionBuilder () {@ Override public ApiException build (String message) {return new RebalanceInProgressException (message) }}), ILLEGAL_GENERATION (22, "Specified group generation id is not valid.", new ApiExceptionBuilder () {@ Override public ApiException build (String message) {return new IllegalGenerationException (message);}})

If we look at the OFFSET_COMMIT request, it is basically the same as the HEARTBEAT request.

Server:

Group.inLock {if (group.is (Dead)) {responseCallback (offsetMetadata.mapValues (_ = > Errors.UNKNOWN_MEMBER_ID))} else if ((generationId)

< 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) { // The group is only using Kafka to store offsets. // Also, for transactional offset commits we don't need to validate group membership and the generation. groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch) } else if (group.is(CompletingRebalance)) { responseCallback(offsetMetadata.mapValues(_ =>

Errors.REBALANCE_IN_PROGRESS)} else if (! group.has (memberId)) {responseCallback (offsetMetadata.mapValues (_ = > Errors.UNKNOWN_MEMBER_ID))} else if (generationId! = group.generationId) {responseCallback (offsetMetadata.mapValues (_ = > Errors.ILLEGAL_GENERATION))} else {val member = group.get (memberId) completeAndScheduleNextHeartbeatExpiration (group, member) groupManager.storeOffsets (group, memberId, offsetMetadata ResponseCallback)}

Client:

Else if (error = = Errors.UNKNOWN_MEMBER_ID | | error = = Errors.ILLEGAL_GENERATION | | error = = Errors.REBALANCE_IN_PROGRESS) {/ / need to re-join group resetGeneration (); future.raise (new CommitFailedException ()); return / * Reset the generation and memberId because we have fallen out of the group. * / protected synchronized void resetGeneration () {this.generation = Generation.NO_GENERATION; this.rejoinNeeded = true; this.state = MemberState.UNJOINED;}

From the source code, we can see that the client perceives rebalance mainly through two mechanisms, one is the state, the other is the era; the state takes effect in the rebalance process, and the era takes effect after the end of the JOIN phase of rebalance.

Both requests that interact with coordinator will bring their own era information, and will check the epoch information before being processed by the server, and if it is wrong, tell the client that you need rebalance.

How to reduce the impact of rebalance online?

First of all, what will be the impact of rebalance? To quote the terminology of JVM, it is STOP THE WORLD.

Once the rebalance process is started, consumers can no longer consume after they enter the JOIN stage, that is, all the members of the group are STW, so it still has a great impact on the business.

This is the end of the content of "what is rebalance in KAFKA". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report