In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "what is the basic principle of Kafka". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the basic principle of Kafka".
1. What is Kafka?
Kafka is a messaging system written in Scala, originally developed from LinkedIn and used as the basis for LinkedIn's activity flow (Activity Stream) and operational data processing pipeline (Pipeline). It is now used by many different types of companies as multiple types of data pipelines and messaging systems.
Kafka is a distributed, publish / subscribe based messaging system.
Kafka uses zookeeper as its distributed coordination framework, which well combines the processes of message production, message storage and message consumption. At the same time, with the help of zookeeper,kafka, all components, including producers, consumers and broker, can establish a subscription relationship between producers and consumers under stateless conditions, and achieve load balancing between producers and consumers.
2. Characteristics of kafka
The main results are as follows: (1) the ability of message persistence is provided in the way of time complexity O (1), which can guarantee the access performance of constant time complexity even for data above TB level.
(2) high throughput. Even on very cheap commercial machines, it is possible to support the transmission of more than 100K messages per second on a single machine.
(3) support message partitioning and distributed consumption between Kafka Server, while ensuring sequential storage and transmission of messages in each Partition.
(4) support both offline data processing (Offline) and real-time data processing (Online).
(5) Scale out: supports online horizontal expansion. The machine can be expanded without downtime.
(6) support the mechanism of deleting data on a regular basis. You can delete it by time period or by document size.
(7) Consumer consumes data in the way of pull, and the consumption status is controlled by Consumer to reduce the burden of Broker.
3. Kafka architecture
(1) Broker: similar to the concept of Broker in RabbitMQ. A kafka server is a Broker, while a kafka cluster contains one or more Broker. Broker persists the data to the corresponding Partition without cache pressure.
(2) Topic: theme. Each message has a category, which is called Topic. The Topic of Kafka can be understood as the Queue message queue of RabbitMQ, where messages of the same category are sent to the same Topic and then consumed by the Consumer of this Topic. Topic is a logical concept, while a physical implementation is Partition.
(3) Partition: partition. Partitioning is a physical concept. Each Topic contains one or more Partition, and each Partition is an ordered queue. Messages sent to Topic go through a partitioning algorithm (which can be customized) to determine which Partition the messages are stored in. Each piece of data is assigned an ordered id:Offset. Note: kafka only guarantees that messages are sent to Consumer in the order in one partition, not the order of a Topic as a whole (among multiple partition).
(4) Replication: backup. Replication is based on Partition rather than Topic. Each Partition has its own backup and is distributed over a different Broker.
(5) Offset: offset. The storage files of kafka are all named by offset, and the advantage of using offset as a name is that it is easy to find. For example, if you want to find the location at 2049, just find the 2048.log file. Of course, the first offset is 00000000000.log. Note: the Offset in each Partition is an ordered sequence starting with 0 that does not affect each other.
(6) Producer: message producer.
(7) Consumer: message consumer. Consumer uses pull to get messages from Broker, and Consumer has to maintain consumption status, so in Kafaka systems, the focus of business is generally on Consumer, unlike RabbitMQ, Broker does most of the things.
(8) Consumer Group: consumer group. Each Consumer belongs to a specific Consumer Group (you can specify a group name for each Consumer, or the default group if you do not specify group name). Each Topic can be subscribed by multiple Group, and there can be multiple Consumer in each Group. A message sent to Topic will be consumed by one Consumer in each Group, and multiple Consumer will staggered consume the message of the entire Topic to achieve load balancing.
(9) Record: message. Each message consists of a Key, a Value, and a timestamp.
Kafka internal structure diagram (picture from the network)
Kafka topology diagram (picture from network)
4. Topic, Partition file storage 4.1, the relationship between Topic and Partition
Topic can be logically thought of as a queue, and each consumer must specify its Topic, which can be simply understood as indicating which queue to put the message into. In order to improve the throughput of Kafka linearly, the Topic is physically divided into one or more Partition, and each Partition physically corresponds to a folder in which all the messages and index files of the Partition are stored. If you create two topic of topic1 and topic2 with 13 and 19 partitions respectively, a total of 32 folders will be generated on the entire cluster. The partiton naming convention is topic name + ordinal number, the first partiton ordinal starts at 0, and the maximum ordinal number is the number of partitions minus 1.
4.2.Characteristics of Partition file storage
(1) each partition directory is equivalent to a giant file that is evenly distributed among multiple segment data files of the same size. However, the number of segment file messages is not necessarily equal, which makes it easy for old segment file to be deleted quickly.
(2) each partiton only needs to support sequential reading and writing, and the life cycle of segment files is determined by server configuration parameters.
(3) segment file is composed of two parts, namely index file (suffix ".index") and data file (suffix ".log"). These two files correspond to each other and appear in pairs.
(4) segment file naming rules: the first segment of the partition global starts at 0, and each subsequent segment file is named as the offset value of the last message in the previous segment file. The maximum numeric value is 64-bit long size, 19-digit character length, and no numbers are filled with 0.
Segment file structure diagram (picture from the network)
Taking a pair of segment file files in figure 2 as an example, it shows that the physical structure of the corresponding relationship between index and log files in segment is as follows:
Kafka cluster Partition distribution figure 1 (picture from network)
When 2 nodes are added to the cluster and the number of Partition increases to 6, the distribution is as follows:
Kafka cluster Partition distribution figure 2 (picture from network)
In a Kafka cluster, each Broker has an equal opportunity to allocate Leader Partition.
In the above figure Broker Partition, the arrow points to the copy. Take Partition-0 as an example: parition-0 in broker1 is Partition-0 in Leader,Broker2. Each Broker (ordered by BrokerId) allocates the master Partition in turn, and the next Broker is the replica, so that multiple replicas follow this rule.
Replica allocation algorithm:
(1) sort all n Broker and I Partition to be allocated.
(2) assign the first Partition to the (i mod n) Broker.
(3) assign the j copy of the I th Partition to the ((I + j) mod n) Broker
For example, the third Partition:partition-2 in figure 2 will be assigned to Broker3 ((3 mod 6) = 3), and a copy of partition-2 will be assigned to Broker4 ((3, 1) mod 6, 4).
4.5. characteristics of kafka file storage
(1) Kafka divides a large parition file in topic into multiple small file segments. Through multiple small file segments, it is easy to regularly remove or delete consumed files and reduce disk footprint. You can set the segment file size to delete periodically and the message expiration time to delete regularly
(2) message can be located quickly through index information.
(3) by mapping all index metadata to memory, the IO disk operation of segment file can be avoided.
(4) through the sparse storage of index files, the space occupied by index file metadata can be greatly reduced.
4.6.The relationship between Consumer and Partition
For multiple Partition, multiple Consumer
(1) if there are more consumer than partition, it is a waste, because the design of kafka does not allow concurrency on a partition, so the number of consumer should not be greater than the number of partition.
(2) if the consumer is less than the partition, one consumer will correspond to multiple partition. Here, the number of quills and partition should be allocated reasonably, otherwise the data in the partition will be taken unevenly. It is best that the number of partiton is an integral multiple of the number of consumer, so the number of partition is very important. For example, if you take 24, it is easy to set the number of consumer.
(3) if consumer reads data from multiple partition and does not guarantee the ordering of data, kafka only guarantees that the data is ordered on one partition, but multiple partition will vary according to the order in which you read it.
(4) adding or decreasing consumer,broker,partition will lead to rebalance, so the corresponding partition of consumer will change after rebalance.
(5) when data is not available in the High-level API, it will block.
About the initial value of Offset in zookeeper:
The initial value of Offset in Zookeeper is illegal by default, so you tell Consumer what to do if you read illegal Offset by setting the parameter auto.offset.reset of Consumer.
Auto.offset.reset has three values:
(1) smallest: automatically set the offset in zookeeper to the smallest offset in Partition
(2) largest: automatically set offset in zookeeper to the largest offset in Partition
(3) anything else: throw an exception
The default value of auto.offset.reset is largest. In this case, if producer first sends 10 pieces of data to a Partition, and then Consumer activates, modify the illegal offset value in zookeeper to the maximum value of 9 in Partition (Offset starts from 0), so that Consumer ignores these 10 messages. Even if you set it to smallest again, you won't be able to read the previous 10 pieces of data, because Offset is legal at this time.
So, to read the previous data, you need to specify auto.offset.reset=smallest at the beginning.
5. Replication copy synchronization mechanism
Replication is based on Partition rather than Topic. Each Partition has its own backup and is distributed over a different Broker. One of these Partition is Leader, and the others are Follower. Leader Partition is responsible for reading and writing, and Follower Partition is only responsible for copying data from Leader to make itself consistent with Leader. Zookeeper is responsible for the failover between the two (fail over, which can be understood as a Leader election).
Message replication latency is limited by the slowest Follower, and Leader is responsible for tracking the status of all Follower. If the Follower "lags" too much or fails, Leader removes the Follower from the Replication synchronization list, but the Follower is alive and pulls data from the Leader until the gap is less than the replica.lag.max.messages value, and then rejoins the synchronization list. When a message is saved successfully by all Follower, the message is considered "committed" and the Consumer can consume the message. This kind of synchronization requires a good network environment between Leader and Follower.
When a partition's follower lags far behind leader, it is considered not in the synchronized replica list or in a lagged state. In Kafka-0.8.2.x, replica lag is determined on the basis that the replica lags behind the maximum number of messages in leader (replica.lag.max.messages) or the maximum waiting time for replication response Leader partition (replica.lag.time.max.ms). The former is used to detect slow copies, while the latter is used to detect invalid or dead copies. Suppose replica.lag.max.messages is set to 4, indicating that as long as the number of messages that follower lags behind leader is less than 4, it will not be removed from the list of synchronous copies. Replica.lag.time.max is set to 500 ms, which means that whenever the interval between follower sending a pull data request to leader exceeds 500 ms, it will be marked as dead and removed from the list of synchronized copies.
When Leader is at the peak of traffic, such as receiving four pieces of data in an instant, all Follower will be considered as "out-of-sync" and removed from the synchronization copy list, and then Follower pulls the data to catch up with Leader and then rejoins the synchronization list, so Follower frequently switches back and forth between removing and rejoining the replica synchronization list.
Even if only one replicas instance survives, messages can be sent and received normally, as long as the zookeeper cluster survives (note: unlike other distributed storage, for example, hbase requires a "majority" to survive).
When leader fails, you need to select a new leader in followers. Maybe follower lags behind leader at this time, so you need to choose a "up-to-date" follower. The algorithm of "voting majority" is not used in leader election in kafka, because this algorithm has high requirements for conditions such as "network stability" / "number of voting participants", and the design of kafka cluster also needs to tolerate the failure of one replicas. For kafka, all the replicas information in each partition can be obtained in zookeeper, so it will be very simple to elect leader. When selecting follower, you need to take into account the number of partition leader already hosted on the server server where the new leader is located. If there is too much partition leader on a server, it means that the server will be under more IO pressure. In electing a new leader, "load balancing" needs to be taken into account. Broker with less partition leader will be more likely to become the new leader. In the entire cluster, as long as one replicas is alive, the partition can continue to accept read and write operations.
6. Consumer equalization algorithm
Partitions equilibrium is triggered when Consumer joins or leaves a Group. The ultimate goal of equilibrium is to improve the concurrent consumption capacity of Topic.
(1) if topic1, it has the following partitions: P0, P1, P2, P3.
(2) join group with the following consumer: C0Query C1
(3) first, the partitions is sorted according to the partition index number: P0, P1, P2, P3.
(4) according to the order of consumer.id: C0Pol C1.
(5) calculation multiple: M = [P _ 0 ~ P _ 1 ~ P _ 2 ~ P _ 3]. Size / [C _ 0 ~ C _ 1] .size, the value of this example is M ~ 2 (rounded up)
(6) then assign partitions: C0 = [P0 _ 1], C1 = [P2 ~ P3], that is, Ci = [P (I * M), P ((I + 1) * M-1)]
Through this algorithm, you can know which partition the specific Consumer consumes.
7. Producer message routing mechanism
In kafka-Client-0.11.0.0.jar, default KafkaProducer and DefaultPartitioner implementations are provided. DefaultPartitioner mainly provides a routing algorithm for Producer to send messages to partitions. If the key value is given, it is calculated by taking the remainder of the hash value of Key and the number of partitions; if no Key is given, the random number generated by ThreadLocalRandom.current (). NextInt () is used to take the remainder of the number of partitions (which involves complex steps refer to the following code). The specific code is as follows:
Public class DefaultPartitioner implements Partitioner {private final ConcurrentMap topicCounterMap = new ConcurrentHashMap () Public void configure (Map configs) {} / * calculates the partition of a given record * @ param topic The topic name * @ param key The key to partition on (or null if no key) * @ param keyBytes serialized key to partition on (or null if no key) * @ param value The value to partition on or null * @ param valueBytes serialized value to partition on or null * @ param cluster The current cluster Metadata * / public int partition (String topic Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic (topic) Int numPartitions = partitions.size (); if (keyBytes = = null) {int nextValue = nextValue (topic); List availablePartitions = cluster.availablePartitionsForTopic (topic); if (availablePartitions.size () > 0) {int part = Utils.toPositive (nextValue)% availablePartitions.size (); return availablePartitions.get (part). Partition () Else {/ / no partitions are available, give a non-available partition return Utils.toPositive (nextValue)% numPartitions;}} else {/ / hash the keyBytes to choose a partition return Utils.toPositive (Utils.murmur2 (keyBytes))% numPartitions;}} private int nextValue (String topic) {AtomicInteger counter = topicCounterMap.get (topic) If (null = = counter) {counter = new AtomicInteger (ThreadLocalRandom.current (). NextInt ()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent (topic, counter); if (currentCounter! = null) {counter = currentCounter;}} return counter.getAndIncrement ();} public void close () {}}
We can also set our own Partition routing rules, which need to inherit the Partitioner class implementation
Public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster); 8. Kafka message delivery guarantee (delivery guarantee)
There are three main delivery guarantees for Kafka messages:
(1) At most once at most once. Messages may be lost, but they are never transmitted repeatedly.
(2) At least once at least once. Messages are never lost, but may be transmitted repeatedly.
(3) Exactly once is just one time. Each message is transmitted exactly once and consumed once.
8.1. Producer delivery guarantee
The delivery guarantee of Producer can be guaranteed by setting the parameter request.required.acks:
(1) request.required.acks=0.
It is equivalent to sending messages asynchronously. Send the next message as soon as it is sent. Because there is no need for ack, it may result in data loss, which is equivalent to implementing At most once.
(2) request.required.acks=1.
The message is sent to Leader Partition, and the next message is sent after the Leader Partition acknowledgment message and the ack producer.
(3) request.required.acks=-1.
The message is sent to Leader, and the next message is sent after producer is ack after Leader receives all the ack of Follower confirmation to save the message.
So a message from Producer to Broker at least ensures At least once, because there is a Replication, as long as the message reaches Broker, it will not be lost. If there is a problem with the ack, such as a network outage, it may cause the producer to fail to receive the ack and repeat the message. Exactly once this way, did not find the relevant implementation.
The specific steps of (3) are as follows:
A. Producer first finds the leader of the zookeeper from the "/ brokers/.../state" node of the partition
B. producer sends messages to the leader
C. leader writes the message to the local log
D. Followers from leader pull message, write to local log and send ACK to leader
E. after leader receives the ACK of replica in all ISR, add HW (high watermark, the last offset of commit) and send ACK to producer
8.2. Consumer delivery guarantee
After Consumer pulls data from Broker, you can select commit. This operation stores this Consumer in zookeeper to read the Offset of the corresponding Partition message, so that the next time the data is pulled, it will be consumed from the next Offset of Partition to avoid repeated consumption.
Similarly, Consumer can automatically confirm the message by setting the parameter enable.auto.commit=true, that is, consumers automatically commit as soon as it receives the message. If you only look at the reading process of the message, kafka ensures the Exactly once, but in reality, it is impossible for Consumer to read the data and often need to deal with the read data. Therefore, the order in which Consumer processes messages and commit messages determines the category guaranteed by delivery.
(1) commit after processing
At least once is implemented in this way. Consumer receives the message first and then submits it. If the machine crashes after processing, causing Offset not to be updated, Consumer will re-read the last consumed data the next time it starts. In fact, this message has already been processed.
(2) commit before processing
At most once is implemented in this way. Immediately after Consumer receives the message, commit, update the Offset on the zookeeper, and then process the message. If the Consumer crashes before the end of the processing, the next piece of data after the Offset update will be read when Consumer starts again, which results in data loss.
9. High Level API and Low Level API
Kafka provides two kinds of Consumer API, and the choice of API depends on the specific situation.
9.1 、 High Level Consumer API
High Level Consumer API revolves around the logical concept of Consumer Group, which shields the Offset management of each Partition of each Topic (automatically reading the last offset of the Partition in the zookeeper), Broker failover, and load balancing when adding or decreasing Partition and Consumer (when Partition and Consumer increase or decrease, Kafka automatically Rebalance).
9.2 、 Low Level Consumer API
Low Level Consumer API, as the underlying Consumer API, provides greater control over the consumption of Kafka Message, and users can achieve repeated reading, skip reading and other functions.
When using Low Level Consumer API, there is no processing for the increase or decrease of Broker, Consumer and Partition. If these increases or decreases occur, you need to handle the load balancer yourself.
Low Level Consumer API provides more flexible control at the expense of increased complexity:
(1) Offset is no longer transparent
(2) Broker automatic failover needs to be dealt with.
(3) to add Consumer, Partition and Broker, you need to do your own load balancing.
Thank you for your reading, these are the contents of "what are the basic principles of Kafka". After the study of this article, I believe you have a deeper understanding of what the basic principles of Kafka are, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
Action.c: Error-27987: Requested p_w_picpath not found [MsgId: MERR-27987] Recording Options-- >
© 2024 shulou.com SLNews company. All rights reserved.