Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Introduction to the Design principle of Kafka

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article introduces the relevant knowledge of "introduction to the Design principles of Kafka". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

What is a message queue? In a nutshell, a message queue is a container for messages. The client can send a message to the message server or get the message from the message server.

Today, I will share around the following questions:

Why do you need a messaging system?

The principle of Kafka architecture?

How does Kafka store messages?

How does Producer send messages?

How does Consumer consume messages?

How do I save Offset?

What problems may be encountered in the messaging system?

Why do you need a messaging system?

Peak clipping

The processing capacity of the database is limited, in the peak period, too many requests fall into the background, once the processing capacity of the system is exceeded, the system may fail.

As shown in the figure above, the processing capacity of the system is 2kUniverse MQ is 8k/s, and the processing capacity of peak request 5kUniverse MQ is much larger than that of the database. During the peak period, requests can be overstocked in MQ first, and the system can consume these requests at the speed of 2k/s according to its own processing capacity.

As soon as the peak is over, the request may only be 100 seconds, and the system can quickly consume the backlog of requests in the MQ.

Note that the above request refers to a write request, and query requests are generally resolved through caching.

Decoupling

In the following scenario, the S system is closely coupled with the A, B, C systems. Due to the change of demand, system A modifies the relevant code, and system S also needs to adjust the code related to A.

In a few days, C system needs to be deleted, S immediately removes C-related code; after a few days, D system needs to be added, S system needs to add D-related code; in a few days, programmers are crazy.

In this way, each system is closely coupled, which is not conducive to maintenance and expansion. Now the introduction of MQ,A system changes, A can modify their own code; C system delete, directly unsubscribe; D system add, subscribe to related messages.

In this way, through the introduction of message middleware, each system can interact with MQ, thus avoiding the complicated calling relationship between them.

The principle of Kafka architecture?

Concepts related to Kafka:

The servers included in the Broker:Kafka cluster.

Producer: message producer.

Consumer: message consumer.

Consumer Group: each Consumer belongs to one Consumer Group, and each message can only be consumed by one Consumer in the Consumer Group, but can be consumed by multiple Consumer Group.

Topic: the category of the message. Each message belongs to a Topic, and different Topic are independent of each other, that is, the Kafka is Topic-oriented.

Partition: each Topic is divided into multiple Partition,Partition, which is the unit assigned by Kafka. The physical concept of Kafka is equivalent to a directory in which the log files make up the Partition.

A copy of Replica:Partition to ensure the high availability of Partition.

A character in Leader:Replica, Producer and Consumer only interact with Leader.

A role in Follower:Replica that copies data from Leader.

One of the servers in a Controller:Kafka cluster that is used for Leader Election and various Failover.

Zookeeper:Kafka stores the Meta information of the cluster through Zookeeper.

Topic and Logs

Message is organized according to Topic, and each Topic can be divided into multiple Partition (corresponding to server.properties/num.partitions).

Partition is a sequential append log, which belongs to sequential write disk (sequential write disk is more efficient than random write memory, ensuring Kafka throughput).

Its structure is as follows: server.properties/num.partitions represents the num.partitions configuration item in the file server.properties, same as below.

Each record (Message) in Partition contains three attributes: Offset,messageSize and Data.

Where Offset represents the message offset, messageSize represents the size of the message, and Data represents the specific content of the message.

Partition is stored in the file system as a file, the location is specified by server.properties/log.dirs, and its naming rule is -.

For example, a message whose Topic is "page_visits" is divided into five Partition with a directory structure of:

Partition may be on a different Broker, Partition is segmented, and each segment is a Segment file.

The common configurations of Segment are:

# server.properties # segment file size, default is 1G log.segment.bytes=1024*1024*1024 # maximum time to scroll to generate new segment files, maximum length of time retained by log.roll.hours=24*7 # segment files, timeout will be deleted log.retention.hours=24*7

Data files and index files are included in the Partition directory. The following figure shows the directory structure of a Partition:

Index uses sparse storage, it does not build an index for every Message, but an index every certain number of bytes to avoid index files taking up too much space.

The disadvantage is that the Offset without indexing can not locate the location of the Message at once and needs to do a sequential scan, but the scope of the scan is very small.

The index contains two parts (both 4-byte numbers), which are relative Offset and Position, respectively.

The relative Offset represents the Offset,Position in the Segment file indicates the location of the Message in the data file.

Summary: Kafka Message storage uses Partition, disk sequential read and write, LogSegment and sparse index to achieve high efficiency.

Partition and Replica

A Topic is physically divided into multiple Partition, located on different Broker. If there is no Replica, once Broker goes down, all Patition on it will not be available.

Each Partition can have multiple Replica (corresponding to server.properties/default.replication.factor), assigned to a different Broker.

One of these Leader is responsible for reading and writing, processing requests from Producer and Consumer; the others, as Follower from Leader Pull messages, keep synchronized with Leader.

How do I allocate Partition and Replica to Broker? The steps are as follows:

Sort all Broker (assuming a total of n Broker) and the Partition to be assigned.

Assign the 1st Partition to the (i mod n) th Broker.

Assign the j Replica of the I Partition to the ((I + j) mode n) Broker.

According to the above allocation rules, if the number of Replica is greater than the number of Broker, there must be two identical Replica assigned to the same Broker, resulting in redundancy. Therefore, the number of Replica should be less than or equal to the number of Broker.

Leader election

Kafka dynamically maintains an ISR (in-sync replicas) in Zookeeper (/ brokers/topics/ / partitions/ / state).

All the Replica in the ISR are "up". Leader,Controller will choose one of the ISR to be the Leader.

The specific process is as follows:

Controller registers Watcher at the / brokers/ids/ [brokerId] node of Zookeeper, and Zookeeper will Fire Watch when Broker goes down.

Controller reads the available Broker from the / brokers/ids node.

Controller determines the set_p, which contains all the Partition on the down Broker.

For each Partition in set_p, read the ISR from the / brokers/topics/ [topic] / partitions/ [partition] / state node, decide on the new Leader, and write the new Leader, ISR, controller_epoch and leader_epoch information to the State node.

Send leaderAndISRRequest commands to the relevant Broker through RPC.

When ISR is empty, a Replica (not necessarily a member of ISR) is chosen as the Leader;. When all Replica is closed, it will wait for any Replica to be resurrected as a Leader.

The Follower in the ISR (synchronization list) is "keeping up" with Leader. "keeping up" does not mean it is exactly the same. It is configured by server.properties/replica.lag.time.max.ms.

Represents the maximum time that Leader waits for Follower synchronization messages. If it times out, Leader removes the ISR from Follower. The configuration item replica.lag.max.messages has been removed.

Replica synchronization

Kafka synchronizes through "pull mode" synchronization messages, that is, Follower pulls data in batches from Leader.

The specific reliability is determined by the producer (according to the configuration item producer.properties/acks).

In Kafka 0.9 request. Requests. Ackswords 1 which configration of producer is replaced by acks=all, but this old config is remained in docs.

In version 0. 9, the producer configuration item request.required.acks=-1 was replaced by acks=all, but the old configuration item remained in the documentation.

PS: the latest document 2.2.x request.required.acks no longer exists.

In Acks=-1, if the number of ISR is less than the number specified by min.insync.replicas, a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception will be thrown.

How does Producer send messages?

Producer first encapsulates the message in a ProducerRecord instance.

Message routing:

If Partition is specified when sending a message, it is used directly.

If Key is specified, hash the Key and select a Partition. This Hash (that is, the partitioning mechanism) is implemented by a class specified by producer.properties/partitioner.class, which needs to implement the Partitioner interface.

If none is specified, select Partition through Round-Robin.

Instead of being sent immediately, the message is serialized and sent to Partitioner, the Hash function mentioned above, where the Partitioner determines the target partition and sends it to a memory buffer (send queue).

Another worker thread of Producer, that is, the Sender thread, is responsible for extracting the prepared message from the buffer in real time and encapsulating it into a batch and sending it to the corresponding Broker.

The process goes something like this:

The picture is from 123archu

How does Consumer consume messages?

Each Consumer is grouped into a logical Consumer Group, and a Partition can only be consumed by one Consumer in the same Consumer Group, but it can be consumed by a different Consumer Group.

If the number of Partition of Topic is c, the number of Consumer subscribing to this Topic in the Group is c, then:

P

< c: 会有 c - p 个 consumer闲置,造成浪费 p >

C: one consumer corresponds to multiple partition p = c: one consumer corresponds to one partition

The number of Consumer and Partition should be allocated reasonably to avoid resource tilt, preferably the number of Partiton is an integral multiple of the number of Consumer.

How ① assigns Partition to Consumer

In the production process, Broker needs to allocate Partition, and here in the consumption process, Partition should also be assigned to consumers.

If a Controller is selected from the Broker, the consumer also chooses a Coordinator from the Broker to allocate the Partition.

Rebalance occurs when the number of Partition or Consumer changes, such as increasing Consumer, decreasing Consumer (active or passive), and increasing Partition.

The process is as follows:

Consumer sends a JoinGroupRequest request to Coordinator. At this point, when other Consumer sends a Heartbeat request, Coordinator will tell them that it is time for Rebalance. Other Consumer also sends JoinGroupRequest requests.

Coordinator selects one Leader from the Consumer, the other as Follower, notifies each Consumer, and for Leader, it brings the Metadata of Follower to it.

Consumer Leader reassigns Partition based on Consumer Metadata.

Consumer sends a SyncGroupRequest to the Coordinator, where the SyncGroupRequest of the Leader contains the allocation. Coordinator returns the packet, telling Consumer about the allocation, including Leader.

② Consumer Fetch Message

Consumer uses "pull mode" to consume messages, so that Consumer can decide the behavior of consumption on its own.

Consumer calls Poll (duration) to pull the message from the server. The specific behavior of pulling messages is determined by the following configuration items:

# consumer.properties # maximum number of record max.poll.records=500 for consumer poll # maximum amount of data returned by partition when consumer poll max.partition.fetch.bytes=1048576 # Consumer maximum poll interval # if this value is exceeded, the server will consider this consumer failed # and kick the consumer out of the corresponding consumer group max.poll.interval.ms=300000

In Partition, each message has an Offset. New messages are written to the end of the Partition (the end of the latest Segment file), messages on each Partition are consumed sequentially, and the order of consumption of messages between different Partition is uncertain.

If a Consumer consumes multiple Partition, the previous consumption order of each Partition is uncertain, but it is sequential consumption on each Partition.

If multiple Consumer from different Consumer Group consume the same Partition, the consumption between each Consumer will not affect each other, and each Consumer will have its own Offset.

Consumer An and Consumer B belong to different Consumer Group. Cosumer A reads the Offset=9, and Consumer B reads the Offset=11, which indicates the location of the next read.

That is to say, Consumer A has read the message with Offset 0: 8, and Consumer B has read the message with Offset 0: 10.

The next Consumer to be read from Offset=9 is not necessarily Consumer A because Rebalance may occur.

How do I save Offset?

When Consumer consumes Partition, you need to save the Offset to record the current consumption location.

Offset can choose to submit automatically or call Consumer's commitSync () or commitAsync () to submit manually, which is configured as follows:

# whether to automatically commit offset enable.auto.commit=true # autocommit interval. Effective auto.commit.interval.ms=5000 in enable.auto.commit=true

The Offset is stored in a Topic called _ _ consumeroffsets. The Key that writes the message consists of GroupId, Topic, and Partition, and Value is Offset.

In general, the Offset of each Key is cached in memory. When querying, you do not have to traverse the Partition. If there is no cache, you will traverse the Partition for the first time to establish the cache, and then the query returns.

The number of Partition for _ _ consumeroffsets is determined by the following Server configuration:

Offsets.topic.num.partitions=50

The partition on which the Offset is saved, that is, the partition mechanism of _ _ consumeroffsets, can be expressed as follows:

GroupId.hashCode () mode groupMetadataTopicPartitionCount

GroupMetadataTopicPartitionCount is the number of partitions configured above. Because a Partition can only be consumed by one Consumer of the same Consumer Group, you can use GroupId to represent the partition where the Consumer consumption Offeset resides.

What problems may be encountered in the messaging system?

Kafka supports three message delivery semantics:

At most once: once at most, the message may be lost, but will not be repeated

Get data-> commit offset-> Business processing

At least once: at least once, the message will not be lost and may be repeated

Get data-> Business processing-> commit offset.

Exactly once: only once, the message is not lost, not repeated, and consumed only once (implemented in 0.11, only downstream is also Kafka)

How does ① ensure that messages are not consumed repeatedly? (idempotency of messages)

For update operations, it is naturally idempotent. For new operations, you can give each message a unique id to determine whether it has been processed before processing. This id can be stored in Redis and can be constrained with a primary key if you are writing to the database.

How does ② ensure the reliable transmission of messages? (problem of message loss)

According to the Kafka architecture, there are three places where messages can be lost: Consumer,Producer and Server.

The consumer lost the data: when server.properties/enable.auto.commit is set to True, Kafka will Commit Offset before processing the message. If an exception occurs at this time, the message will be lost.

Therefore, you can turn off the auto-commit Offset and manually submit the Offset after the processing is completed, which ensures that the message will not be lost. However, if you fail to submit the Offset, it may lead to the problem of repeated consumption, so you can ensure idempotency.

Kafka lost the message: if a Broker accidentally dies, if there is only one Replica at this time, the message on the Broker will be lost.

If Replica > 1, reselect a Follower for Leader as the new Leader, and if there are some messages in Follower that are not synchronized, these messages will be lost.

The above problems can be avoided by configuring as follows:

Set the replication.factor parameter for Topic: this value must be greater than 1, requiring at least 2 copies of each Partition.

Set the min.insync.replicas parameter on the Kafka server: this value must be greater than 1. This requires a Leader to perceive that at least one Follower is still in contact with itself and is not left behind, so as to ensure that there is a Follower when the Leader is dead.

Set acks=all on the Producer side: this requires that each piece of data must be written to all Replica before it can be considered a successful write.

Set retries=MAX on the Producer side (a large value, meaning unlimited retries): this requires that if the write fails, the retry will be infinite, which is stuck here.

Producer lost the message: set acks=all on the Producer side to ensure that all ISR synchronize the message before the write is considered successful.

How does ③ ensure the ordering of messages?

Messages on the Partition in Kafka are sequential, and messages that need to be consumed sequentially can be sent to the same Partition and consumed with a single Consumer.

This is the end of the introduction to the Design principles of Kafka. Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report