In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to understand kafka zoning, production and consumption". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to understand kafka zoning, production and consumption".
Kafka partition description
The partition rule refers to dividing each Topic into multiple Partition, each partition is an ordered set of message logs, and each message produced by the producer is sent to only one of the partitions.
Partition is an ordered and immutable data sequence, and message data is constantly added to the end of the sequence. Each message data in the partition is assigned a contiguous digital ID, the offset (offset), to uniquely identify each message data in the partition.
The function of Partition is to provide load balancing capability. Different partitions of a single topic can be stored on the same or different node machines. In order to achieve high scalability (Scalability) of the system, different partitions are placed on machines of different nodes, and each node machine independently performs its own read and write tasks. If the performance is insufficient, you can add new node machines to increase the throughput of the overall system.
Kafka partition structure
The data under the Kafka partition uses message log (Log) to save the data by creating a physical file on disk that can only append (Append-only) messages. Because writes can only be appended, the slow random Iripple O operation is avoided and replaced by a better performance sequential Iripple O write operation. The Kafka log file is divided into multiple Log Segment, and messages are appended to the latest log segment. When a log segment is full, Kafka will automatically slice out a new log segment and seal the old log segment.
Kafka stores message data according to Partition, and the Partition is divided into several Segment, each of which is of equal size. Segment consists of index file, log file, timeindex file, etc., with suffixes of ".index" and ".log", which are represented as Segment index files and data files, respectively. Each Segment stores multiple pieces of information.
Kafka zoning policy
The partitioning policy is the algorithm that determines which partition the producer sends the message to. Kafka provides a default partitioning policy and supports custom partitioning policies.
Official zoning strategy
The Kafka default partition policy implements two policies at the same time: if Key is specified, the message key order preservation policy is implemented by default; if Key is not specified, polling policy is used
Polling strategy
Polling policy (Round-robin), that is, sequential allocation policy. If a Topic has three partitions, the first message is sent to partition 0, the second to partition 1, the third to partition 2, and so on. When the fourth message is produced, it is re-polled and assigned to partition 0.
Polling policy is the default partitioning policy provided by Kafka Java producer API. If the partitioner.class parameter is not specified, the producer program stores messages evenly across all partitions of the Topic in a polled manner. The polling strategy has excellent load balancing performance and ensures that messages are distributed evenly across all partitions to the maximum extent.
Random strategy
The random policy (Randomness) is to randomly place messages on any partition. If you want to implement the random policy version of the partition method, the Java version is as follows:
List partitions = cluster.partitionsForTopic (topic); return ThreadLocalRandom.current () .nextInt (partitions.size ())
First calculate the total number of partitions for Topic, and then randomly return a positive integer that is less than the number of partitions. In essence, the random strategy strives to distribute the data evenly to each partition, but the actual performance is worse than the polling strategy. If the uniform distribution of data is pursued, polling strategy is recommended.
Press the message key to keep the order policy
Kafka allows you to define a message key, called Key for short, for each message. A Key can be a string with a clear business meaning, such as a customer code, department number, or business ID, or it can be used to represent message metadata. Once the message Key is defined, it is guaranteed that all messages from the same Key enter the same partition.
The partition method that implements the partitioning policy requires only two lines of code:
List partitions = cluster.partitionsForTopic (topic); return Math.abs (key.hashCode ())% partitions.size (); location-based zoning strategy
Location-based zoning strategies are usually only aimed at large-scale Kafka clusters, especially those that span cities, countries and even continents. Suppose Tmall plans to provide each newly registered user with a registration gift, for example, European and American users can get a free iphone SE mobile phone when they sign up for Tmall, while newly registered Chinese users can get a Huawei P40 Pro. In order to implement the corresponding registration business logic, you only need to create a double-partition theme, and then create two consumer programs to deal with the registered user logic of users in Europe, the United States and China, respectively. At the same time, messages registered by users in different geographical locations must be sent to different computer rooms, because consumer programs that deal with registration messages can only be started in one computer room. The location-based partitioning strategy can be customized according to the IP address of Broker.
List partitions = cluster.partitionsForTopic (topic); return partitions.stream (). Filter (p-> isChina (p.leader (). Host ()). Map (PartitionInfo::partition). FindAny (). Get ()
You can find out all the partitions of the Leader copy in China from all the partitions, and then randomly pick one to send the message.
Custom partitioning policy
If you want to customize the partitioning policy, you need to explicitly configure the producer-side parameter partitioner.class. When writing a producer program, you can write a concrete class that implements the org.apache.kafka.clients.producer.Partitioner interface (partition () and close ()), usually only the most important partition method.
Int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster)
Topic, key, keyBytes, value, and valueBytes are all message data, while cluster is cluster information (such as how many topics there are in the current Kafka cluster, how many Broker, and so on). Set the partitioner.class parameter to the Full Qualified Name of your own implementation class, and the producer program will partition the message according to the code logic of the custom partitioning policy.
Kafka partition storage policy
Kafka retains all messages regardless of whether the messages are consumed or not, and periodically checks whether the old log segments can be deleted to reclaim disk space. There are two deletion strategies:
Based on time: log.retention.hours=168
Based on size: log.retention.bytes=1073741824
It is important to note that because the time complexity for Kafka to read specific messages is O (1), that is, it has nothing to do with file size, deleting expired files here has nothing to do with improving Kafka performance.
Kafka Compression producer Compression
Before Kafka version 2.1.0, three compression algorithms, GZIP, Snappy and LZ4, are supported. Version 2.1.0 officially supports the Zstandard algorithm (abbreviated as zstd, an open source compression algorithm of Facebook), which can provide ultra-high compression ratio (compression ratio). The compression algorithm can be measured by two indicators: compression ratio and compression / decompression throughput. The performance of different compression algorithms is compared as follows:
In production environment, GZIP, Snappy, LZ4 and zstd have their own performance. In terms of throughput: LZ4 > Snappy > zstd > GZIP;, in terms of compression ratio, zstd > LZ4 > GZIP > Snappy.
If you want to enable compression on the Producer side, there must be sufficient CPU resources on the machine on which the Producer program runs. In addition to sufficient CPU resources, it is also recommended to enable compression on the Producer side if the bandwidth resources in the production environment are limited. In general, bandwidth is much more expensive than CPU and memory, so it is easy to run out of bandwidth resources for Kafka clusters in gigabit networks. If the client machine has abundant CPU resources, it is recommended to enable zstd compression on the Producer side, which can greatly save the consumption of network resources. For decompression, it is necessary to avoid abnormal decompression, such as the decompression operation of message format conversion and the inconsistency between Broker and Producer decompression algorithms.
Consumer decompression
After Producer sends a compressed message to Broker, Broker saves it intact. When the Consumer program requests a message, the Broker sends it as is, and when the message reaches the Consumer side, Consumer automatically decompresses the message. Kafka encapsulates the compression algorithm used in the message set, and when Consumer reads into the message set, it knows which compression algorithm the message uses. In addition to decompression on the Consumer side, the broker side will also decompress, and each compressed message set will be decompressed when it is written on the broker side, performing various verifications on the message. Decompression has a certain impact on the performance of the Broker side.
Kafka partition message preservation storage message preservation
If Topic is set to a single partition, all messages of the Topic will be read and written in only one partition, ensuring global ordering, but will lose the high throughput and load balancing performance advantages of Kafka multi-partition.
The method of keeping the order of multi-partition messages is to press the message key order-preserving strategy, extract the logical body of the messages that need to keep order according to the business, and establish a special partition policy for the flag bits of the message flag bit ID, to ensure that all messages with the same flag bit are sent to the same partition, which can not only ensure the message order in the partition, but also enjoy the throughput brought by the multi-partition.
Note: message retry simply resends the message to the original partition and does not reselect the partition.
Order preservation of consumer news
Kafka can only guarantee the order in the partition, but not the order in the sub-region, so when consuming, the data is relatively orderly.
Message routing strategy
When publishing a message through API, the producer publishes the message with Record. The message itself contains key and value,value in the Record, while the key is used to store the Partition where the message is routed. The Partition to which the message is written is not random, but is determined by the routing policy.
Specify Partition, which is written directly to the specified Partition.
If Partition is not specified but key is specified, by modulating the hash value of key and the number of Partition, the result is the Partition index to be selected.
If neither Partition nor key is specified, a Partition is selected using a polling algorithm.
When partitions are added, messages within the Partition are not redistributed, and as data continues to be written, the new partition participates in rebalancing.
Producer message production process
Producer first determines the partition of data entry through the partition strategy, and then finds the Leader of Partition from Zookeeper.
Producer sends the message to the Leader of the partition.
Leader connects the message to the local Log and notifies the Followers of the ISR (In-sync Replicas, replica synchronization list).
The Followers in the ISR pull the message from the Leader, writes the local Log and sends the ACK to the Leader (message sending acknowledgement mechanism).
After Leader receives the ACK of the Followers in all the ISR, it adds the HW (high watermark, the last offset of the commit) and sends an ACK to the Producer, indicating that the message was written successfully.
The producer guarantees the success of the delivery
Messages must be sent using the producer.send (msg, callback) interface.
The Producer side sets the acks parameter value to all. The acks parameter value of all means that all copies of Broker in the ISR have received the message before the message has been submitted.
Set the retries parameter value on the Producer side to a large value, indicating the number of automatic Producer retries. When instantaneous network jitter occurs, message transmission may fail, and Producer can automatically retry message delivery to avoid message loss.
Set the unclean.leader.election.enable = false,unclean.leader.election.enable parameter on the broker side to control the Broker that is eligible to run for partition Leader. If a Broker lags too much behind the original Leader, then becoming a new Leader will inevitably result in message loss. Therefore, set the unclean.leader.election.enable parameter to false.
Set the parameter replication.factor > = 3 on the Broker side to save multiple copies of the message.
Set the Broker parameter min.insync.replicas > 1 to ensure that the minimum number of Broker copies in the ISR will not take effect until acks=- 1. Setting to greater than 1 can improve message persistence, and the default value of 1 cannot be used in a production environment.
You must make sure that replication.factor > min.insync.replicas, and if the two are equal, the entire partition will not work as long as one copy hangs up. It is recommended to set replication.factor = min.insync.replicas + 1.
Ensure that the message consumption is completed before submitting. The Consumer end parameter enable.auto.commit is set to false, and the displacement is submitted manually.
Producer interceptor
The interceptor implementation class on the Producer side inherits the org.apache.kafka.clients.producer.ProducerInterceptor interface. The ProducerInterceptor interface has two core methods:
OnSend: called before the message is sent.
OnAcknowledgement: called after a message is successfully submitted or failed to be sent. The onAcknowledgement call predates the call that sends the callback notification callback. The onAcknowledgement and onSend methods are not called in the same thread, so if a shared mutable object is used in both methods, keep the thread safe.
Suppose the full classpath of the first interceptor is com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor, and the second interceptor is com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,Producer. An example of the Java code for the specified interceptor is as follows:
Properties props = new Properties (); List interceptors = new ArrayList (); interceptors.add ("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); / / interceptor 1interceptors.add ("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); / / interceptor 2props.put (ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); consumer consumption process
Consumer submits a connection request to Broker, and the connected Broker sends it the communication URL of Broker Controller, that is, the listeners address in the configuration file.
When Consumer specifies the Topic to be consumed, a consumption request is sent to Broker Controller
Broker Controller assigns one or more Partition Leader to Consumer and sends the current offset of Partition to Consumer
Consumer consumes the messages according to the Partition assigned by Broker Controller
When the Consumer consumes the message, Consumer sends a message to the Broker that the message has been consumed, that is, the offset of the message
After Broker receives the offset of Consumer, it will update the corresponding _ _ consumer_offset
Consumer interceptor
The implementation class of the Consumer interceptor implements the org.apache.kafka.clients.consumer.ConsumerInterceptor interface, and ConsumerInterceptor has two core methods.
OnConsume: called before the message is returned to the Consumer program. Before starting to formally process the message, the interceptor does some processing before returning it to Consumer.
OnCommit:Consumer is called after the displacement is submitted, and you can perform some log operations and so on.
The solution to the problem of repeated consumption
Repeat consumption of the same Consumer
When Consumer causes consumption timeout due to low consumption power, repeated consumption may be formed.
When a certain data is consumed but is about to be submitted to the offset, and the consumption time expires, Broker believes that the message is not consumed successfully, resulting in a problem of repeated consumption.
The solution: extend the offset submission time.
Different Consumer repeat consumption
When the Consumer consumes the message but goes down before the offset is submitted, the message that has been consumed will be consumed repeatedly.
Thank you for your reading, the above is the content of "how to understand kafka zoning, production and consumption". After the study of this article, I believe you have a deeper understanding of how to understand kafka zoning, production and consumption, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.