Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the principles of kafka

2025-03-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to analyze the principles of kafka, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.

Analysis of the principles of kafka

How topic is created in deleted

The creation of topic

The specific process text is as follows:

1. Controller registers watcher on the / brokers/topics node of ZooKeeper. When topic is created, controller will get the partition/replica assignment of the topic through watch. 2. Controller reads the list of all currently available broker from / brokers/ids, and for each partition in set_p, select any available broker from all the replica assigned to the partition (called AR) as the new broker, and set AR to the new ISR 2.2, write the new leader and ISR to / brokers/topics/ / partitions/ / state 3, and controller sends LeaderAndISRRequest to the relevant broker through RPC.

Note: this section is very similar to the leader election process of partition, which requires the participation of zk. The relevant information is recorded in zk.

Controller plays a very important role in these processes.

Deletion of topic

Text process:

1. Controller registers watcher on the / brokers/topics node of zooKeeper. When topic is deleted, controller will get the partition/replica assignment of the topic through watch. 2. If delete.topic.enable=false, end; otherwise, the watch registered on / admin/delete_topics by controller will be sent StopReplicaRequest to the corresponding broker through callback by fire,controller.

Many of the fault handling processes we mentioned earlier, including how to ensure high availability in the process of topic creating and deleting partition leader conversion broker failure, all involve a component controller. I will write a blog about the related concepts and terms that appear in kafka, which is briefly mentioned here.

One of the servers in a Controller:Kafka cluster that is used for Leader Election and various Failover.

Have you ever thought about a question, that is, what to do if there is something wrong with controller and how to failover it? Let's look down.

First of all, in our last experiment, we found out which broker controller was on in zk, and looked at the number of times controller_epoch

[zk: localhost:2181 (CONNECTED) 14] ls / kafkagroup/controllercontroller_epoch controller [zk: localhost:2181 (CONNECTED) 14] ls / kafkagroup/controller [] [zk: localhost:2181 (CONNECTED) 15] get / kafkagroup/controller {"version": 1, "brokerid": 1002, "timestamp": "1566648802297"} [zk: localhost:2181 (CONNECTED) 22] get / kafkagroup/controller_epoch23

We can see that the current controller has sent 23 controller switches on 1002 before that.

We manually went to the 1002 node to kill the kafka process [hadoop@kafka02-55-12 $jps11665 Jps10952 Kafka11068 ZooKeeperMain10495 QuorumPeerMain [hadoop@kafka02-55-12$ kill-9 10952 [hadoop@kafka02-55-12 $jps11068 ZooKeeperMain11678 Jps10495 QuorumPeerMain]

If you look at the information on zk, the relevant information has been synchronized to zk.

[zk: localhost:2181 (CONNECTED) 16] get / kafkagroup/controller {"version": 1, "brokerid": 1003, "timestamp": "1566665835022"} [zk: localhost:2181 (CONNECTED) 22] get / kafkagroup/controller_ epoch24 [ZK: localhost:2181 (CONNECTED) 25] ls / kafkagroup/brokers [ids, topics, seqid] [zk: localhost:2181 (CONNECTED) 26] ls / kafkagroup/brokers/ids [1003, 1001]

You will see a lot in the background log.

[hadoop@kafka03-55-13 logs] $vim state-change.log

[2019-08-25 01 Received response 01V 07886] TRACE [Controller id=1003 epoch=24] Received response {error_code=0} for request UPDATE_METADATA wit

H correlation id 7 sent to broker 10.211.55.13 id (id: 1003 rack: null) (state.change.logger)

[hadoop@kafka03-55-13 logs] $pwd

/ data/kafka/kafka-server-logs/logs

Information about state changes

[hadoop@kafka03-55-13 logs] $tailf controller.log

[2019-08-25 01 TRACE [Controller id=1003] Leader imbalance ratio for broker 1002 is 1.0 (kafka.controller.KafkaCont)

Roller)

[2019-08-25 01 INFO [Controller id=1003] Starting preferred replica leader election for partitions (kafka.controll)

Er.KafkaController)

Next, let's analyze in detail what happened inside him and how he switched.

Controller failover is triggered when controller goes down. Each broker registers the watcher with the "/ controller" node of the zookeeper. When the controller goes down, the temporary node in the zookeeper disappears, all the surviving broker is notified by the fire, each broker attempts to create a new controller path, and only one is elected and elected controller.

When a new controller is elected, the KafkaController.onControllerFailover method is triggered and the following is done in this method:

1. Read and add Controller Epoch.

2. Register watcher on reassignedPartitions Patch (/ admin/reassign_partitions).

3. Register watcher on preferredReplicaElection Path (/ admin/preferred_replica_election).

4. Register watcher on broker Topics Patch (/ brokers/topics) through partitionStateMachine.

5. If delete.topic.enable=true (default is false), partitionStateMachine registers watcher on Delete Topic Patch (/ admin/delete_topics).

6. Register Watch on Broker Ids Patch (/ brokers/ids) through replicaStateMachine.

7. Initialize the ControllerContext object, set up all current topic, "live" broker list, leader and ISR of all partition, etc.

Start replicaStateMachine and partitionStateMachine.

9. Set the brokerState status to RunningAsController.

10. Send the Leadership information of each partition to all "living" broker.

11. If auto.leader.rebalance.enable=true (default is true), start the partition-rebalance thread.

12. If delete.topic.enable=true and there is a value in Delete Topic Patch (/ admin/delete_topics), delete the corresponding Topic.

As you can see, they all interact on zk. The new election of controller will notify the relevant location in zk and register watcher in turn. In this process, the election of leader of partition will be sent, and a series of operations such as partition-rebalanced deleting useless topic will occur (because the worst-case scenario we consider directly here is that broker goes down, but the one that goes down is controller).

How consumer consumes messages

Important concept: each Consumer is grouped into a logical Consumer Group. A Partition can only be consumed by one Consumer in the same Consumer Group, but it can be consumed by a different Consumer Group.

If the number of Partition of Topic is c, the number of Consumer subscribing to this Topic in the Group is c, then:

P

< c: 会有 c - p 个 consumer闲置,造成浪费 p >

C: one consumer corresponds to multiple partition

P = c: one consumer corresponds to one partition

The quantity of Consumer and Partition should be allocated reasonably to avoid tilting resources.

I suggest that the number of Partiton should be an integral multiple of the number of Consumer.

How to assign partition to consumer in the process of consumer consumption?

It can also be understood as what is the process of rebalance in consumer?

In the production process, Broker needs to allocate Partition, and here in the consumption process, Partition should also be assigned to consumers.

If a Controller is selected from the Broker, the consumer also chooses a Coordinator from the Broker to allocate the Partition. / / Coordinator and Controller are the same concept, coordinator and organizer

When the number of Partition or Consumer changes, such as increasing Consumer, decreasing Consumer (active or passive), and increasing Partition, the Rebalance of consumer will be carried out. / / rebalance occurs on the client side

See figure:

The text message is:

1. Consumer sends a JoinGroupRequest request to Coordinator. At this point, when other Consumer sends a Heartbeat request, Coordinator will tell them that it is time for Rebalance. Other Consumer also sends JoinGroupRequest requests.

2. Coordinator selects a Leader in the Consumer, and the other is used as a Follower, which is notified to each Consumer. For Leader, the Metadata of Follower is also brought to it.

3. Consumer Leader reassigns Partition according to Consumer Metadata.

4. Consumer sends SyncGroupRequest to Coordinator, where the SyncGroupRequest of Leader contains the allocation.

5. Coordinator returns the packet and tells Consumer about the allocation, including Leader.

Summary:

Rebalance between consumer groups and regions

As you can see, when a new consumer joins a consumer group, it consumes one or more partitions that were previously responsible by other consumers; in addition, when a consumer leaves the consumer group (such as restart, downtime, etc.), the partition it consumes is assigned to another partition. This phenomenon is called rebalance (rebalance).

Rebalancing is a very important property of Kafka, which ensures high availability and horizontal scalability.

It should also be noted, however, that during the rebalancing period, none of the consumers can consume messages, thus making the entire consumer group temporarily unavailable. In addition, rebalancing the partition will also cause the original consumer status to expire, resulting in consumers need to update the status, which will also reduce consumption performance during this period.

Example of a text process:

Consumers keep alive within the consumer group by periodically sending a heartbeat (hearbeat) to a broker that acts as a group coordinator (group coordinator). This broker is not fixed, and each consumer group may be different. When a consumer pulls a message or submits it, a heartbeat is sent.

If a consumer does not send a heartbeat for more than a certain period of time, its session expires, and the group coordinator assumes that the consumer is down and triggers rebalancing. You can see that there is a certain period of time from the consumer's downtime to the session expiration, during which the consumer's partition cannot consume messages; usually, we can gracefully close it. In this way, the consumer will send the leaving message to the group coordinator, so that the group coordinator can immediately rebalance without waiting for the session to expire.

In version 0.10.1, Kafka modifies the heartbeat mechanism to separate sending heartbeats from pulling messages, so that the frequency of sending heartbeats is not affected by the pulling frequency. In addition, a later version of Kafka supports the configuration of how long a consumer does not pull a message but remains alive, which avoids live locks (livelock). Live lock means that the application is not malfunctioning but cannot be consumed further for some reason.

Next, think about the question, how does consumer get the Consumer Fetch Message of the message?

Consumer uses "pull mode" to consume messages, so that Consumer can decide the behavior of consumption on its own.

Consumer calls Poll (duration) to pull the message from the server. The specific behavior of pulling messages is determined by the following configuration items:

# the maximum number of consumer.properties# consumers poll the maximum amount of data returned by the partition when the recordmax.poll.records=500# consumer poll exceeds this value, the server will consider the consumer failed # and kick the consumer out of the corresponding consumer group max.poll.interval.ms=300000

Summary:

1. In Partition, each message has an Offset. New messages are written to the end of the Partition (the end of the latest Segment file), messages on each Partition are consumed sequentially, and the order of consumption of messages between different Partition is uncertain.

2. If a Consumer consumes multiple Partition, the previous consumption order of each Partition is uncertain, but it is sequential consumption on each Partition.

3. If multiple Consumer from different Consumer Group consume the same Partition, the consumption of each Consumer will not affect each other, and each Consumer will have its own Offset.

Take the official Little Chestnut:

How do I save Offset?

When Consumer consumes Partition, you need to save the Offset to record the current consumption location.

Offset can choose to submit automatically or call Consumer's commitSync () or commitAsync () to submit manually, which is configured as follows:

# whether to automatically submit the offsetenable.auto.commit=true# autocommit interval. The default value of valid auto.commit.interval.ms=5000//enable.auto.commit for enable.auto.commit=true is true;, which means the autocommit mechanism is used by default. The default value for auto.commit.interval.ms is 5000, in milliseconds. 5 seconds

The Offset is stored in a Topic called _ _ consumeroffsets. The Key that writes the message consists of GroupId, Topic, and Partition, and Value is Offset.

In general, the Offset of each Key is cached in memory. When querying, you do not have to traverse the Partition. If there is no cache, you will traverse the Partition for the first time to establish the cache, and then the query returns.

The number of Partition for _ _ consumeroffsets is determined by the following Server configuration:

Offsets.topic.num.partitions=50

There is no need for a repale copy of the default consumeroffsets. We can add it dynamically either by specifying the parameter at the beginning or by adding the copy json of the consumeroffsets later.

Auto.create.topics.enable=true default.replication.factor=2num.partitions=3

After testing, the above three parameters are specified for the first time when the kafak is built, but the number of consumeroffsets copies is still 1. The key to this is that the configuration file needs to specify

The kafka configuration file explains this parameter as follows:

# # Internal Topic Settings # # Internal theme settings

# replication factors for internal topics "consumer_offsets" and "transaction_state" of group metadata

For anything other than development testing, it is recommended that you use a value greater than 1 to ensure availability, for example, set to 3.

The replication factor for the group metadata internal topics "_ _ consumer_offsets" and "transaction_state"

For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.

Offsets.topic.replication.factor=2

Transaction.state.log.replication.factor=2

Transaction.state.log.min.isr=2

/ / for these three parameters, you can modify the number of copies of consumer_offsets specified in the kafka program.

Then @ Shanghai-Ma Jihui said that as long as the number of num.partitions=3,__consumer_offsets copies is 3, I will test whether it is still 1.

Therefore, it is still subject to the offsets.topic.replication.factor parameter control.

If it is not the first time to start kafka, those configurations will only take effect at the first startup. Apache kafka downloads should be 1 by default, 2. * is also 1 ah.

It can be modified like this.

Stop the kafka cluster first and delete all consumeroffsets* under each broker data directory

Then delete rmr / kafkatest/brokers/topics/consumer_offsets under zookeeper and restart kafka

After consumption, this _ _ consumer_offsets will be created.

Note: this topic is created at the first consumption, not when the broker cluster is started, and that trancation_state topic is also created when the transaction is used for the first time.

Summary: in production, no one deletes the content in the zk, and the risk coefficient is high, so it is recommended to dynamically expand the copy, as long as the json is written correctly.

The key parameters for controlling the number of copies of _ _ consumer_offsets are these three.

The partition on which the Offset is saved, that is, the partition mechanism of _ _ consumeroffsets, can be expressed as

GroupId.hashCode () mode groupMetadataTopicPartitionCount

GroupMetadataTopicPartitionCount is the number of partitions configured above. Because a Partition can only be consumed by one Consumer of the same Consumer Group, you can use GroupId to represent the partition where the Consumer consumption Offeset resides.

The reliability assurance in Kafka has the following four points:

1. For a partition, its messages are orderly. If a producer writes message An and then message B to a partition, the consumer reads message A first and then message B.

2. When the message is written to all copies of the in-sync status, the message is considered to have been submitted (committed). The writes here may just be written to the cache of the file system and may not necessarily be flushed to disk. The producer can wait for confirmation at different times, such as waiting for the partition master copy to be written back, which waits for all in-sync status copies to be written before returning.

3. Once the message has been submitted, as long as one copy survives, the data will not be lost.

4. Consumers can only read submitted messages.

Seeing here, we have a new understanding of kafka. Why on earth is kafka so powerful? here is a summary of 11:00.

1. Batch processing

2. Client optimization

3. Optimization of log format

4. Log coding

5. Message compression

6. Build an index

7. Zoning

8. Consistency

9. Write the disk sequentially

10. Page cache *

11. Zero copy

For memory mapping:

Even if it is written sequentially to the hard disk, the access speed of the hard disk cannot catch up with the memory. Therefore, the data of Kafka is not written to the hard disk in real time, it makes full use of the paging storage of modern operating system to improve the efficiency of IWeiO by using memory. Memory Mapped Files (hereinafter referred to as mmap) is also translated into memory-mapped files, which works by directly using the Page of the operating system to realize the direct mapping of files to physical memory. After the mapping is completed, your operations on the physical memory will be synchronized to the hard disk (the operating system at the appropriate time). With mmap, the process reads and writes memory like a hard disk, and doesn't have to worry about the size of the memory that has virtual memory for us. Mmap is actually a function used to implement memory mapping in Linux, and MappedByteBuffer can be used to implement memory mapping in Java NIO.

Kafka message compression

Producers send compressed messages by sending multiple messages in batches and compressing multiple messages into a wrapped message to send. Like ordinary messages, the data on disk is in the same format as the data sent from producer to broker, and the data sent to consumer is in the same format.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report