In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-07 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
How to implement log storage in Kafka? in view of this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.
1. Log storage format
The latest version of kafka logs is stored in batches, which means that kafka compresses multiple logs into the same batch, and then performs subsequent tasks such as index creation and message query in batch units. For each batch, the default size is 4KB, and the metadata information such as initial displacement and timestamp of the whole batch is saved, while for each message, the metadata such as displacement and timestamp is stored relative to the increment of the metadata of the whole batch. In this way, kafka can reduce the disk space occupied by the data in each message. Here we first show the data format of each batch:
Each metadata of the message batch in the figure has a fixed length size, while only the last number of messages is variable. The following is the meaning of the main attributes in batch:
Start shift: occupies 8 bytes, which stores the displacement of the first message in the current batch
Length: occupies 4 bytes, which stores the amount of disk space occupied by the entire batch. Through this field, kafka can quickly jump to the next batch for data reading when traversing messages.
Partition leader version number: records the server version of the leader of the partition where the current message resides. It is mainly used for checksum conversion of some data versions.
CRC: the CRC check code for the current data of the entire batch, which is mainly used for error checking of the data
Attribute: occupies 2 bytes. The lowest 3 bits of this field record the current compression of messages in batch. Now there are mainly GZIP, LZ4 and Snappy. Bit 4 records the type of timestamp, and bits 5 and 6 record the type of transaction and control introduced by the new version.
Maximum displacement increment: the unique increment of the displacement of the latest message relative to the first message
Start timestamp: occupies 8 bytes and records the timestamp of the first message in the batch
Maximum timestamp: occupies 8 bytes and records the timestamp of the latest message in batch
PID, producer epoch and starting sequence number: these three parameters are mainly used to achieve transaction and idempotency, in which PID and producer epoch are used to determine whether the current producer is legal, while the starting sequence number is mainly used for idempotent verification of messages.
Number of messages: occupies 4 bytes and records the number of all messages in the current batch
As can be seen from the above introduction, the number of bytes occupied in the header data of each batch is fixed at 61 bytes, and the variable part is mainly related to specific messages. Let's take a look at the format of each message in batch:
The header data of the message here is very different from that of batch, and you can see that the length of most of the data is variable. Since it is variable, we need to emphasize two issues here:
For digital storage, kafka uses the Zig-Zag storage method, that is, negative numbers are not encoded by complement, but are converted to corresponding positive integers, such as-1 mapping to 1, 1 mapping to 2,-2 mapping to 3, and 2 mapping to 4. The diagram is as follows:
As can be seen from the figure, when we decode the data, we only need to convert the corresponding integer to its original value.
When using Zig-Zag encoding, the maximum size of each byte is 128. half of it is to store positive numbers, half negative numbers, and a 0, that is to say, the maximum integer that each byte can represent is 64. If there are numbers greater than 64, kafka will use multiple bytes to store, and the representation of the multiple bytes is achieved by using the maximum bit of each byte as the reserved bit. If the highest bit is 1, it means that the target number needs to be represented together with subsequent bytes, and if the highest bit is 0, the current bit can represent the target number.
The advantage of kafka using this encoding is that most of the data increments are very small numbers, so it can be saved with one byte, which is about seven times less memory than using the original type of data directly.
For the format of each of the above messages, in addition to the fields related to message key and value, there are attribute fields and header. The main function of attribute fields is to store the compression mode of the current message key and value, while header provides users to add some dynamic attributes to achieve some customized work. Through the storage format of the kafka message log, we can see that it uses batch to extract some common information, thus ensuring that it only needs to store one copy. Although it seems that there is more header information in each batch, it uses fewer bytes after it is allocated to each message. At the message level, kafka uses data increment and Zig-Zag encoding to compress data, thus greatly reducing the number of bytes it occupies. Overall, this storage method greatly reduces the amount of disk space consumed by kafka.
two。 Log storage mode
When using kafka, messages are pushed to a topic, and then producer calculates which partition the current message will be sent to. In partition, kafka sets an offset for each message, that is, if you want to uniquely locate a message, you can use a triple. Based on the kafka architecture model, each partition is evenly allocated to each broker, that is, each broker is allocated to provide log storage services for one or more partitions. On the broker server, kafka logs are also stored as partition, creating a directory for each topic's partition in the specified log storage directory, where the log data for these partitions is stored, and the directory name is created in the format of. The following is a schematic diagram of the storage directory of the kafka log:
What we need to note here is that for the storage of partition logs in the figure, the current broker only stores the logs of the partitions assigned to them. For example, the connect-status in the figure has only the directories of partition 1 and partition 4, but not the directories of partition 2 and partition 3, because these partitions are assigned to other nodes of the cluster. In each partition log directory, there are three types of log files, namely, files with suffixes of log, index, and timeindex. The log file is the file that really stores the message log, the index file stores the displacement index data of the message, and the timeindex file stores the time index data. The following figure shows the message log data for a partition:
As can be seen from the figure, each type of log file is segmented. Here, the rules for segmentation need to be explained as follows:
When segmenting the log, the file name of each file is named after the offset of the first message in the paragraph
Kafka closes each log file when it reaches 1G in size, and opens a new file for data writing. As you can see, except for the latest log file, all the other log files in the figure are 1G in size.
For index files and timeindex files, after each log file is segmented, the two index files are also segmented, which is why their file names are the same as those of log files
The retention time of kafka logs is 7 days by default, that is, kafka deletes logs that have been stored for more than 7 days, but for some files, some of the logs are stored for less than 7 days and some of them are stored for 7 days, and the file will be retained until all its messages exceed the retention time.
3. Index file
There are two main types of index files in kafka: displacement index files and timestamp index files. The displacement index file stores the physical address of the message corresponding to the displacement of the message, while the timestamp index file stores the time stamp of the message and the displacement value of the message. In other words, if you need to query the message record through the timestamp, it will first query the displacement value corresponding to the timestamp through the timestamp index file, and then query the specific physical address of the message in the displacement index file through the displacement value. With regard to the displacement index file, there are two points to note:
Because kafka messages are stored in the form of batch, the smallest unit of the index element in the index file is batch, that is, the batch where the message is located can be located through the displacement index file, but it is impossible to locate the specific location of the message in the batch. When looking for the message, the batch needs to be further traversed.
The displacement value recorded in the displacement index file is not the real displacement value of the message, but the offset of the displacement relative to the initial displacement of the displacement index file, which can greatly reduce the size of the displacement index file. The following figure shows a schematic diagram of a displacement index file format:
The following is an example of a specific displacement index file:
With regard to the timestamp index file, because the change of the timestamp is larger than that of the displacement, even if it uses an incremental way to store the timestamp index, it can not effectively encode the data in the Zig-Zag way, so the timestamp index file is the timestamp data of the message stored directly, but the displacement data stored in the timestamp index file does not change much. Therefore, it is still stored in the way of relative displacement, and this storage mode can also be directly mapped into the index file without calculation. The following figure shows the format of the timestamp index file:
The following is a storage example of a timestamp index file:
As you can see, if you need to locate the message through the timestamp, you need to first locate the specific displacement in the timestamp index file, and then locate the specific physical address of the message in the displacement index file.
4. Log compression
The so-called log compression feature is mainly aimed at such scenarios, such as modifying a user's mailbox data for a total of three times. The modification process is as follows:
Email=john@gmail.comemail=john@yahoo.com.cnemail=john@163.com
After making such a change, it is clear that the main thing we need to care about is the last change, because it is the final data record, but if we process the above message sequentially, we need to process the message three times. Kafka log compression exists to solve this problem. For messages using the same key, it will only keep the record of the latest message, and the messages in the intermediate process will be cleaned up by kafka cleaner. It is important to note, however, that kafka does not clean up message records in currently active log files. The so-called log file that is currently active is the log file that is currently writing data. The following figure shows an example of log compression by kafka:
In the figure, K1 has V1, V3 and V4. After compression, only V4 remains, K2 has V2, V6 and V10, and V10 retains only V10. Similarly, other Key data can be inferred. In addition, it should be noted that kafka enables log compression using log.cleanup.policy, and its default value is delete, which is the policy we normally use. If it is set to compaction, log compression policy is enabled. However, it should be noted that enabling log compression policy does not mean that kafka will clean up historical data. Only if log.cleaner.enable is set to true will historical data be cleaned regularly.
In kafka, log compression strategy is also used, which is mainly reflected in the offset storage of kafka messages. In the old version, kafka saved the offset information currently consumed by each consumer packet in zookeeper, but because zookeeper is a distributed coordination tool, it has very high performance for read operations, but low performance for write operations, and the displacement submission of consumer is very frequent, which will inevitably lead to what zookeeper calls the bottleneck of kafka message consumption. Therefore, in the latest version, kafka stores the displacement data consumed by groups in a special topic, namely _ _ consumer_offsets. Because the displacement information of each packet group is submitted to this topic, kafka defaults to setting a very large number of partitions, that is, 50 partitions. In addition, when consumer submits the displacement, the key used is groupId+topic+partition, and the value is the currently committed displacement, that is, for the partition of the topic consumed by each group, it only retains the latest displacement. If the consumer needs to read the displacement, you only need to assemble the key in the above format, and then read the latest message data in the topic.
This is the answer to the question about how to implement log storage in Kafka. I hope the above content can be of some help to you. If you still have a lot of doubts to be solved, you can follow the industry information channel for more related knowledge.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.