Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the principle of Kafka and its actual combat analysis?

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

It is believed that many inexperienced people do not know what the Kafka principle and actual combat analysis are like. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Background

Recently, we are going to migrate the original centralized log monitoring system, and the original implementation scheme is: Log Agent = > Log Server = > ElasticSearch = > Kibana, in which Thrift RPC is used between Log Agent and Log Server, and we have implemented a simple load balancing (WRB).

The original scheme actually works well, and asynchronized Agent basically has no impact on application performance. There is no pressure to support our tens of millions of PV applications every day. However, there is a disadvantage is that if the error log explodes, the Log Server can not be handled, which will result in message loss. Of course, we do not reach this level, and we can also deal with it by introducing queue buffering. Taken together, however, it is actually easier to use message queues directly. PRC, load balancing and load buffering are all built-in. Another way is to read the log directly, similar to logstash or flume. However, considering the flexibility, we decided to use the message queue. Anyway, we have deployed Zookeeper. After a survey, Kafka is the most suitable for this data transfer and buffering. Therefore, I intend to change the scheme to: Log Agent = > Kafka = > ElasticSearch = > Kibana.

Introduction to Kafka I. basic concepts of Kafka

A Broker:Kafka cluster consists of one or more servers, which are called broker.

Topic: every message published to the Kafka cluster has a category, which is called Topic.

Message

Offset of the message: 8 bytes, similar to the Id of the message.

Total length of message: 4 bytes

CRC32: CRC32 checksum, 4 bytes.

Magic: Kafka service program protocol version number, used for compatibility. 1 byte.

Attributes: this field occupies 1 byte, in which the lower two bits are used to indicate the compression mode, the third bit indicates the timestamp type (0 indicates LogCreateTime,1 represents LogAppendTime), and the high four bits are reserved positions, which have no practical significance.

Timestamp: message timestamp, which must be included in the header when magic > 0. 8 bytes.

Key-length: message key length, 4 bytes.

Key: the message key the actual data.

Payload-length: the actual data length of the message, 4 bytes.

Payload: actual data of the message

Message is the basic unit of Kafka communication, which consists of a fixed-length header and a variable-length message body (payload). In the Java client, it is also called Record.

The parts of the message structure are described as follows:

Actually storing a message also includes an additional 12-byte overhead (LogOverhead):

Partition:

Partition (partition) is a physical concept, and each Topic contains one or more Partition.

Each partition consists of a series of ordered and immutable messages, which is an ordered queue.

Each partition physically corresponds to a folder with a naming convention of ${topicName}-{partitionId}, such as _ _ consumer_offsets-0.

The partition directory stores the log segment of the partition, including log data files and two index files.

Each message is appended to the corresponding partition and is written sequentially to the disk, so it is very efficient, which is an important guarantee of high throughput for Kafka.

Kafka can only guarantee the ordering of messages within a partition, not cross-partition messages.

LogSegment:

Data file

Offset index file

Timestamp index file

A data file is a message set file (FileMessageSet) with a .log suffix, which is used to save the actual data of the message.

The naming rule is: the offset of the first message of the data file, also known as the base offset (BaseOffset), and the left complement 0 form 20-digit numeric characters.

The base offset of each data file is the LEO+1 of the previous data file (the first data file is 0)

The file name is the same as the data file, but with the suffix .index. Its purpose is to quickly locate the location of the message based on the offset.

First of all, Kafka saves each log segment to a ConcurrentSkipListMap jump table with BaseOffset as key, so that when looking for a message with a specified offset, the binary search method can be used to quickly locate the data file and index file where the message is located.

Then, through binary search in the index file, the search value is less than or equal to the maximum offset of the specified offset, and finally, the data file is scanned sequentially from the maximum offset found, until a message with an offset equal to the specified offset is found in the data file.

It should be noted that there is not an index for every message, but a sparse storage method, in which an index is built for every certain byte of data, and we can set the index span through index.interval.bytes.

Starting with version 0.10.1.1, Kafka introduced a timestamp-based index file with the same name as the data file, but suffixed with .timeindex. Its function is to quickly locate the location of the message according to the timestamp.

Kafka API provides an offsetsForTimes (Map timestampsToSearch) method that returns the offset and timestamp corresponding to the first message whose timestamp is greater than or equal to the time to be queried. This function is actually quite useful. Suppose we want to start consuming from a certain period of time, we can use the offsetsForTimes () method to locate the offset of the first message closest to that time, then call the seek (TopicPartition, long offset) method to move the consumer offset over, and then call the poll () method to long poll and pull the message.

The log file is scrolled and split into one or more log segments (LogSegment) by size or time, where the log segment size is specified by the configuration item log.segment.bytes, and the default is 1GB. The length of time is set according to the log.roll.ms or log.roll.hours configuration item; the currently active log segment is called activeSegment.

Unlike ordinary log files, the log segment of Kafka has two auxiliary index files in addition to a specific log file:

Producer:

Request.required.acks: Kafka provides three message acknowledgement mechanisms (ACK) for producers, which are used to configure broker to send acknowledgements to producers after receiving messages, so that producers can deal with them according to ACK. This mechanism can be set to 0,-1, 1 by attribute request.required.acks. The default is 1.

Message.send.max.retries: the number of times the producer retries before abandoning the message. The default is 3 times.

Retry.backoff.ms: the amount of time to wait before each retry, in ms. The default is 100.

Queue.buffering.max.ms: in asynchronous mode, the maximum time for messages to be cached, after which messages are sent in batches; if batch.num.messages, the maximum value of cached data, is also configured in asynchronous mode, any one of these two thresholds will trigger batch sending of messages. The default is 1000ms.

Queue.buffering.max.messages: the maximum number of unsent messages that can be cached in the queue in asynchronous mode. The default is 10000.

Queue.enqueue.timeout.ms:

Batch.num.messages: Kafka supports batch messages (Batch) to send messages to specific partitions of broker. The batch size is set by the attribute batch.num.messages, which indicates the maximum number of messages sent in a batch. Changing the configuration item will become invalid when the producer sends messages in synchronous mode. The default is 200.

Request.timeout.ms: the timeout for the producer to wait for a broker reply when an acks is needed. The default is 1500ms.

Send.buffer.bytes: Socket send buffer size. The default is 100kb.

Topic.metadata.refresh.interval.ms: the interval between regular requests by producers to update topic metadata. If set to 0, the update data will be requested after each message is sent. The default is 5min.

Client.id: producer id, which is mainly used by businesses to track call location problems. The default is console-producer.

Acks=0: the producer does not need to wait for the broker to return the confirmation message, but sends the message continuously.

Acks=1: the producer needs to wait for the Leader copy to successfully write the message to the log file. This approach reduces the possibility of data loss to some extent, but there is still no guarantee that data will not be lost. Because you are not waiting for the synchronization of the follower copy to complete.

Acks=-1: a confirmation message is sent to the producer only when the copy of the Leader and all copies in the ISR list have completed the data storage. To ensure that the data is not lost, you need to ensure that the synchronized copy is at least greater than 1. Through the parameter min.insync.replicas setting, when the number of synchronous copies is less than the configuration item, the producer will throw an exception. But this method also affects the speed and throughput of producers to send messages.

= 0: it means to join the queue directly when the queue is not full, and discard it immediately when it is full

0: a QueueFullException exception is thrown when the blocking reaches this value

Responsible for releasing messages to Kafka broker.

Some important configuration items for producers:

Consumer & Consumer Group & Group Coordinator:

Group.id: A unique string that identifies the consumer group this consumer belongs to.

Client.id: The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

Bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.

Key.deserializer: Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface.

Value.deserializer: Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface.

Fetch.min.bytes: The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.

Fetch.max.bytes: The maximum amount of data the server should return for a fetch request.

Max.partition.fetch.bytes: The maximum amount of data per-partition the server will return.

Max.poll.records: The maximum number of records returned in a single call to poll ().

Heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities.

Session.timeout.ms: The timeout used to detect consumer failures when using Kafka's group management facility.

Enable.auto.commit: If true the consumer's offset will be periodically committed in the background.

Auto commit: does not commit periodically, but detects whether the interval between the last commit and the last commit exceeds the auto.commit.interval.ms when certain events occur.

Manual submission

Enable.auto.commit=true

Auto.commit.interval.ms

Enable.auto.commit=false

CommitSync (): synchronous submission

CommitAsync (): submit asynchronously

Consumer: the message consumer, the client that reads the message to Kafka broker. The Kafka0.9 version releases new consumers based on Java rewriting, which is no longer dependent on the scala runtime environment and zookeeper.

Consumer Group: each consumer belongs to a specific Consumer Group, which can be specified through the group.id configuration item. If group name is not specified, it defaults to test-consumer-group.

Group Coordinator: for each Consumer group, a brokers is selected as the coordinator of the consumption group.

Each consumer also has a globally unique id, which can be specified through the configuration item client.id, and if not specified, Kafka automatically generates a globally unique id for that consumer in the format ${groupId}-${hostName}-${timestamp}-${UUID the first eight characters}.

Kafka provides two ways to submit consumer_offset: Kafka auto commit or client call KafkaConsumer corresponding API manual submission.

Some important configuration items for consumers:

ISR: Kafka dynamically maintains an ISR (In-Sync Replica) in ZK, that is, a list of replicas that keep synchronized, and this list holds the brokerId corresponding to all replicas that keep messages synchronized with leader replicas. If a replica is down or too far behind, the copy of the follower will be removed from the ISR list.

Zookeeper:

/ consumers: after the old consumer starts, a consumer's node will be created under this node in ZK.

/ brokers/seqid: auxiliary generated brokerId. When the user does not configure broker.id, ZK will automatically generate a globally unique id.

/ brokers/topics: each time a theme is created, a node with the same name as the topic is created in that directory.

/ borkers/ids: every time Kafka starts a KafkaServer, a child node named {broker.id} is created in this directory

/ config/topics: stores configuration information at the topic level that is dynamically modified

/ config/clients: stores configuration information at the client level for dynamic modification

/ config/changes: store the corresponding information when dynamically modifying the configuration

/ admin/delete_topics: save the information of the topic to be deleted when deleting the topic

/ cluster/id: save cluster id information

/ controller: save the brokerId information of the controller, etc.

/ isr_change_notification: save the appropriate path to be notified when the ISR list of Kafka copies changes

Kafka uses ZK to save corresponding metadata information, including broker information, Kafka cluster information, old consumer information and consumption offset information, topic information, partition status information, partition replica fragmentation scheme information, dynamic configuration information, and so on.

Kafka registers a node in zk:

During startup or operation, Kafka will create corresponding nodes on ZK to save metadata information, and register corresponding listeners in these nodes through the listening mechanism to listen for changes in node metadata.

TIPS

If it corresponds to ES, Broker is equivalent to Node,Topic and Index,Message to Document, while Partition is equivalent to shard. The Segment of LogSegment relative to ES.

How to view message content (Dump Log Segments)

In the process of using kafka, we sometimes need to look at all kinds of information about the messages we produce, which are stored in the log file of kafka. Because of the special format of the log file, we cannot directly view the information content in the log file. Kafka provides a command that dumps binary segmented log files into character-type files:

$bin/kafka-run-class.sh kafka.tools.DumpLogSegmentsParse a log file and dump its contents to the console Useful for debugging a seemingly corrupt log segment.Option Description-deep-iteration uses deep iteration instead of shallow iteration -- required files. Enter log segment files, comma separated-- key-decoder-class custom key value deserializer. `kafka.serializer.Decoder` trait must be implemented. The jar package needs to be placed in the `kafka/ libs` directory. (default is `kafka.serializer.StringDecoder`). -- maximum number of bytes for max-message-size messages (default is 5242880)-- print-data-log prints out log messages at the same time-- value-decoder-class custom value deserializer. `kafka.serializer.Decoder` trait must be implemented. The jar package needs to be placed in the `kafka/ libs` directory. (default is `kafka.serializer.StringDecoder`). -- verify-index-only only verifies that the index does not print the index content $bin/kafka-run-class.sh kafka.tools.DumpLogSegments-- files / tmp/kafka-logs/test-0/00000000000000000000.log-- print-data-log Dumping / tmp/kafka-logs/test-0/00000000000000000000.logStarting offset: 0offset: 0 position: 0 CreateTime: 1498104812192 isvalid: true payloadsize: 11 magic: 1 compresscodec: NONE crc: 3271928089 payload: hello worldoffset: 1 position: 45 CreateTime : 1498104813269 isvalid: true payloadsize: 14 magic: 1 compresscodec: NONE crc: 242183772 payload: hello everyone

Note: here-- print-data-log means to view the content of the message. If you don't add this item, you can only see Header, not payload.

It can also be used to view index files:

$bin/kafka-run-class.sh kafka.tools.DumpLogSegments-files / tmp/kafka-logs/test-0/00000000000000000000.index-print-data-log Dumping / tmp/kafka-logs/test-0/00000000000000000000.indexoffset: 0 position: 0

The timeindex file is also OK's:

$bin/kafka-run-class.sh kafka.tools.DumpLogSegments-- files / tmp/kafka-logs/test-0/00000000000000000000.timeindex-- print-data-log Dumping / tmp/kafka-logs/test-0/00000000000000000000.timeindextimestamp: 1498104813269 offset: 1Found timestamp mismatch in: / tmp/kafka-logs/test-0/00000000000000000000.timeindexIndex timestamp: 0, log timestamp: 1498104812192.Found out of order timestamp in: / tmp/kafka-logs/test-0/00000000000000000000.timeindexIndex timestamp: 0, Previously indexed timestamp: 1498104813269 Consumer equilibrium process

Consumer balance (Consumer Rebalance) refers to the process in which consumers rejoin consumer groups and reassign divisions to consumers. Consumer balancing operations can be caused under the following circumstances:

New consumers join the consumer group

The current consumer exits from the consumption group (whether it is abnormal exit or normal shutdown)

Consumers unsubscribe to a topic

Increase in the number of partitions for subscription topics (the number of partitions in Kafka can be dynamically increased but not decreased)

New coordinator elected for broker downtime

When the consumer has not sent a heartbeat request within the ${session.timeout.ms} time, the group coordinator thinks that the consumer has exited.

The consumer automatic balancing operation provides high availability and high scalability for consumers, so that when we increase or decrease the number of consumers or partitions, we do not need to care about the allocation relationship between the underlying consumers and partitions. It should be noted, however, that during the rebalancing process, consumers cannot pull messages for a short period of time because of the need to reassign partitions to consumers.

NOTES

Special attention should be paid to the last situation here, which is the so-called slow consumer (Slow Consumers). If the heartbeat request is not received within session.timeout.ms time, the coordinator can remove the slow consumer from the group. In general, if message processing is slower than session.timeout.ms, you will become a slow consumer. Causes the interval between two calls to the poll () method to be longer than the session.timeout.ms time. Because the heartbeat is sent only when poll () is called (in version 0.10.1.0, the client heartbeat is sent asynchronously in the background), this causes the coordinator to mark the slow consumer to die.

If the heartbeat request is not received within the session.timeout.ms time, the coordinator marks the consumer dead and disconnects from it. At the same time, the rebalance operation is triggered by sending an IllegalGeneration error code to the HeartbeatResponse of other consumers in the group.

In manual commit offset mode, pay special attention to this problem, otherwise there will be a failure of commit. Resulting in repeated consumption all the time.

Second, the characteristics of Kafka

Message order: the order within each partition is guaranteed, but the global order across partition is not guaranteed. Topic can only have one partition if global messages are to be ordered.

Consumer in consumer group:consumer group acquires messages concurrently, but in order to ensure the sequence of partition messages, each partition is consumed by only one consumer. Therefore, the number of consumer in consumer group needs to be less than or equal to the number of partition of topic. (for global messages to be ordered, there can only be one partition and one consumer)

A message of the same Topic can only be consumed by one Consumer within the same Consumer Group, but multiple Consumer Group can consume the message at the same time. This is the means used by Kafka to broadcast (to all Consumer) and unicast (to a Consumer) a Topic message. A Topic can correspond to multiple Consumer Group. If you need to implement broadcasting, as long as each Consumer has a separate Group. To achieve unicast, all Consumer are in the same Group.

Producer Push message, Client Pull message pattern: some logging-centric system, such as Facebook's Scribe and Cloudera's Flume, adopt the push pattern. In fact, push mode and pull mode have their own advantages and disadvantages. The push model is difficult to adapt to consumers with different consumption rates, because the message delivery rate is determined by broker. The goal of the push pattern is to deliver messages as quickly as possible, but it is easy to cause Consumer to be too late to process messages, typically characterized by denial of service and network congestion. On the other hand, pull mode can consume messages at an appropriate rate according to the consumption power of Consumer. Pull mode can simplify the design of broker, and Consumer can independently control the rate of consuming messages. At the same time, Consumer can control its own consumption mode, that is, it can consume in batches or one by one, and it can choose different submission methods to achieve different transmission semantics.

In fact, one of the design ideas of Kafka is to provide both offline processing and real-time processing. According to this feature, real-time streaming systems such as Storm or Spark Streaming can be used for real-time online processing of messages, while batch processing systems such as Hadoop can be used for offline processing, and data can be backed up to another data center in real time at the same time. You only need to make sure that the Consumer used in these three operations belongs to different Consumer Group.

3. HA of kafka

In versions prior to 0.8, Kafka did not provide a High Availablity mechanism, and once one or more Broker went down, all Partition on it could not continue to provide services during the downtime. If the Broker can never be recovered, or if the disk fails, the data on it will be lost. One of the design goals of Kafka is to provide data persistence, and for distributed systems, especially when the size of the cluster rises to a certain extent, the possibility of one or more machines downtime is greatly improved, and the requirement for Failover is very high. Therefore, Kafka provides the High Availability mechanism from 0. 8. It is mainly manifested in Data Replication and Leader Election.

Data Replication

The number of partition-level replication,replication offered by Kafka from 0. 8 is available in the

Configuration in $KAFKA_HOME/config/server.properties:

Default.replication.factor = 1

The Replication works with leader election to provide an automatic failover mechanism. Replication has a certain impact on the throughput of Kafka, but greatly enhances availability. By default, the number of replication for Kafka is 1. Each partition has a unique leader, all read and write operations are done on leader, and follower batch pull data from the leader. In general, the number of partition is greater than or equal to the number of broker, and the leader of all partition is evenly distributed on the broker. The logs on follower are exactly the same as those on its leader.

It is important to note that replication factor does not affect the throughput testing of consumer, because consumer only reads data from the leader of each partition, regardless of replicaiton factor. Similarly, consumer throughput has nothing to do with synchronous or asynchronous replication.

Leader Election

After the introduction of Replication, there may be multiple replicas of the same Partition (Replica), and you need to choose between these replicas one Leader,Producer and Consumer to interact with only this Leader copy, and the other Replica replicates data from the Leader as a Follower. Note that only Leader is responsible for reading and writing data, and Follower only provides sequential Fetch data (N channels) to Leader, and does not provide any read and write services, so the system is simpler and more efficient.

Think about why follower copies do not provide read and write, only cold backup?

It is understandable that follwer replicas do not provide write services, because if follower also provides write services, then all replicas need to be synchronized with each other. Nxn paths are needed for n replicas to synchronize data. If asynchronous synchronization is used, the consistency and order of data are difficult to be guaranteed, while if synchronization is used for data synchronization, the write delay is actually magnified by n times, which is counterproductive.

So why not let follower replicas provide read service to reduce the reading pressure on leader replicas? In addition to the data inconsistency caused by synchronization delay, unlike other storage services (such as ES,MySQL), the reading of Kafka is essentially an orderly message consumption, and the progress of consumption depends on an offset called offset, which is to be saved. If multiple replicas are read-balanced, then this offset is not easy to determine.

TIPS

The leader copy of Kafka is similar to the replica of the primary shard,follower copy of ES relative to ES. ES is also an index with multiple shard (compared to Kafka, a topic has multiple partition). Shard is divided into primary shard and replicition shard, where primary shard is used to provide read and write services (sharding is very similar to MySQL: shard = hash (routing)% number_of_primary_shards. However, ES introduces the role of coordinator node (coordinating node) to make it transparent to the client. Replication shard only provides read service (here, like Kafka, ES waits for relication shard to return success before finally returning it to client).

Students with traditional MySQL sub-database and sub-table experience will feel that this process is very similar, that is, a sharding + replication data structure, only transparent to you through client (SDK) or coordinator.

Propagate message

When Producer publishes a message to a Partition, it first finds the Leader of the Partition through ZooKeeper, and then no matter how many Replication Factor of the Topic (that is, how many Replica the Partition has), Producer only sends the message to the Leader of the Partition. Leader writes the message to its local Log. Each Follower has data from Leader pull. In this way, the order of data stored by Follower is the same as that of Leader. After Follower receives the message and writes its Log, it sends the ACK to Leader. Once Leader receives the ACK of all Replica in ISR (in-sync replicas), the message is considered commit, and Leader will add HW (High-Watermark) and send ACK to Producer.

To improve performance, each Follower sends an ACK to the Leader as soon as it receives the data, rather than waiting for the data to be written to the Log. Therefore, for messages that have been commit, Kafka can only guarantee that they are stored in the memory of multiple Replica, but not that they are persisted to disk, so there is no complete guarantee that the message will be consumed by Consumer after an exception occurs. However, considering that such scenarios are very rare, it can be considered that this approach makes a good balance between performance and data persistence. In future releases, Kafka will consider providing higher persistence.

Consumer read messages are also read from Leader, and only messages that have been commit (messages whose offset is lower than HW) will be exposed to Consumer.

The data flow of Kafka Replication is shown in the following figure:

There are many and complicated contents about this aspect, so we won't expand it here. This article is well written, and students who are interested can learn it.

Kafka Design Analysis (2): Kafka High Availability (1).

Several cursors of Kafka (offset / offset)

The following picture shows all the cursors of kafka very simply and clearly.

(https://rongxinblog.wordpress.com/2016/07/29/kafka-high-watermark/):

Here is a brief explanation:

0 、 ISR

In-Sync Replicas list, as its name implies, is the Replicas that "saves synchronization" with leader. The meaning of "keep synchronized" is somewhat complicated. In version 0.9, the parameter replica.lag.time.max.ms of broker is used to specify the definition of ISR. If leader does not receive a pull request from follower for such a long time, or if follower does not have a log end offset for fetch to leader for such a long time, leader will remove it from ISR. ISR is a very important indicator, controller will use it when selecting the leader replica of partition, and leader needs to maintain the ISR list, so when leader selects ISR, it will record the results on Zookeeper.

In scenarios where the leader needs to be elected, the leader and ISR are determined by controller. After selecting leader, ISR is decided by leader. If the leader and ISR exist only on the ZK, then each broker needs to listen on the Zookeeper for changes in the leader and ISR of each partition of its host, which is less efficient. If you don't put it on the Zookeeper, you need to retrieve this information from all the broker after the controller fail, and it's not reliable considering the problems that may arise in the process. So the leader and ISR information exists on the Zookeeper, but when you change the leader, controller will first make the change on the Zookeeper and then send the LeaderAndIsrRequest to the relevant broker. In this way, it is more efficient to include all the partition changes on the broker in a LeaderAndIsrRequest, that is, a batch of updated information from the batch to the broker. In addition, when leader changes the ISR, the change is made on the Zookeeper before modifying the ISR in the local memory.

1 、 Last Commited Offset

The location where the Consumer was last submitted, which will be saved in a special topic:_consumer_offsets.

2 、 Current Position

The location where Consumer is currently read, but has not yet been submitted to broker. After submission, it becomes Last Commit Offset.

3. High Watermark (HW)

This offset is the smallest location (minimum LEO across all the ISR of this partition) of all ISR's LEO, and consumer cannot read messages that exceed HW, because this means that messages that are not fully synchronized (and therefore not fully backed up) are read. In other words: HW is a message that has been copied by all nodes in ISR. It is also the largest offset of messages available to consumers (note that not all replica must have these messages, but only those in ISR).

With the immediate change of the pull progress of follower, HW is changing at any time. Follower always asks leader for data starting from the next offset of its existing messages, so when follower sends out a fetch request that requires offset to be above A, leader knows that the log end offset of this follower is at least A. At this point, you can count whether the LEO of all the replica in the ISR is greater than the HW, and if so, improve the HW. At the same time, when leader sends fetch local messages to follower, it will also attach its own HW to the reponse returned to follower. In this way, follower knows the HW at leader (but in the implementation, follower only gets the HW when reading the local log of leader, and there is no guarantee that it is the latest HW). However, the HW of leader and follower is out of sync, and the HW recorded in follower may lag behind leader.

Hight Watermark Checkpoint

Because HW changes from time to time, if you update to Zookeeper immediately, it will bring efficiency problems. While HW is so important that it needs to be persisted, ReplicaManager starts a separate thread to periodically record the values of all partition's HW in a file, that is, highwatermark-checkpoint.

# 4. Log End Offset (LEO)

This is easy to understand, which is the current location of the latest log write (or synchronization).

IV. Kafka client

Kafka supports JVM languages (java, scala), as well as a high-performance Cmax Cure + client, and various language clients based on librdkafka encapsulation. For example, Python client: confluent-kafka-python. The Python client also has a pure python implementation: kafka-python.

Here is an example of Python (take confluent-kafka-python as an example):

Producer:

From confluent_kafka import Producerp = Producer ({'bootstrap.servers':' mybroker,mybroker2'}) for data in some_data_source: p.produce ('mytopic', data.encode (' utf-8')) p.flush ()

Consumer:

From confluent_kafka import Consumer, KafkaErrorc = Consumer ({'bootstrap.servers':' mybroker', 'group.id':' mygroup' 'default.topic.config': {' auto.offset.reset': 'smallest'}}) c.subscribe ([' mytopic']) running = Truewhile running: msg = c.poll () if not msg.error (): print ('Received message:% s'% msg.value (). Decode ('utf-8')) elif msg.error () .code ()! = KafkaError._PARTITION_EOF: Print (msg.error ()) running = Falsec.close ()

The use of message queues is basically the same as that of ordinary message queues.

V. offset management of Kafka

Kafka actually reads messages based on offset, and if there is an error in offset, it is possible to read messages repeatedly or skip unread messages. Before 0.8.2, kafka saved offset in ZooKeeper, but we know that zk writes are expensive and cannot be extended linearly, and frequent writes to zk can lead to performance bottlenecks. So Offset Management was introduced in 0.8.2, saving the offset in a compacted kafka topic (_ consumer_offsets), and Consumer submitted the offset by sending an OffsetCommitRequest request to the specified broker (offset manager). This request contains a series of partitions and the consumption location (offset) in those partitions. The offset manager appends a message in the form of key-value to a specified topic (_ _ consumer_offsets). Key is made up of consumerGroup-topic-partition, while value is offset. At the same time, in order to provide xing energy, a recent record is maintained in memory, so that the OffsetFetchRequests can be quickly given in the case of a specified key without scanning all offset topic logs. If the offset manager fails for some reason, the new broker will become the offset manager and regenerate the offset cache by scanning the offset topic.

How to view consumption offset

Kafka prior to version 0.9 provides a kafka-consumer-offset-checker.sh script that can be used to view the consumer consumption offset of a consumer group to one or more topic, which calls the

Kafka.tools.Consumer.OffsetChecker . This script is no longer recommended after version 0. 9, but the kafka-consumer-groups.sh script, which calls kafka.admin.ConsumerGroupCommand. This script actually manages the consumer group, not just looking at the offset of the consumer group. Only the latest kafka-consumer-groups.sh script usage is introduced here.

With the ConsumerGroupCommand tool, we can use list,describe, or delete consumer groups.

For example, to list all consumer group information in all topics, use the list parameter:

$bin/kafka-consumer-groups.sh-bootstrap-server broker1:9092-listtest-consumer-group

To view the current consumption offset of a consumer group, use the describe parameter:

$bin/kafka-consumer-groups.sh-bootstrap-server broker1:9092-describe-group test-consumer-groupGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERtest-consumer-group test-foo 0 1 3 2 Consumer-1_/127.0.0.1

NOTES

This script only supports the deletion of consumption groups that do not include any consumption groups, and can only delete consumption groups corresponding to the old version of consumers (that is, the grouping metadata is stored in zookeeper), because the essence of this script deletion operation is to delete the nodes of the corresponding consumption groups and their child nodes in ZK.

How to manage consumption offset

The above describes querying the Kafka consumption offset through a scripting tool. In fact, we can also query the consumption offset through API.

Kafka Consumer API provides two methods for querying consumer consumption offset operations:

Committed (TopicPartition partition): this method returns an OffsetAndMetadata object that allows you to get the committed offset of the specified partition.

Position (TopicPartition partition): this method returns the position of the next pull location.

In addition to looking at the consumption offset, there are times when we need to specify offset artificially, such as skipping certain messages, or redo certain messages. Before 0. 8. 2, offset was stored in ZK, so you just need to use ZKCli to manipulate ZK. However, after 0.8.2, offset is stored in the _ _ consumer_offsets queue of kafka by default, which can only be modified through API:

Class KafkaConsumer Kafka allows specifying the position using seek (TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also available (seekToBeginning (TopicPartition …) And seekToEnd (TopicPartition …) Respectively).

Reference document: Kafka Consumer Offset Management

Kafka Consumer API provides a way to reset consumption offsets:

Seek (TopicPartition partition, long offset): this method is used to reset the consumption start position to the specified offset position.

SeekToBeginning (): consumption starts from the start of the message, corresponding to the offset reset policy

Auto.offset.reset=earliest .

SeekToEnd (): start consumption from the location corresponding to the latest message, that is, wait for the new message to be written before pulling. The corresponding offset reset policy is

Auto.offset.reset=latest .

Of course, you need to know the location of the offset to be reset. One way is to get the corresponding offset based on the timestamp. Then go to seek.

Deployment and configuration

Kafka is written in Scala, so as long as the JRE environment is installed, it is very simple to run. Download the official compiled package directly, decompress the configuration and run it directly.

I. kafka configuration

The configuration file is located in the server.properties under the config directory. The key configurations are as follows (some attributes are not available by default and need to be added by yourself):

Each machine in the broker.id:Kafka cluster (called broker) needs a separate and non-heavy idport: listen port delete.topic.enable: if set to true, topic is allowed to be deleted, otherwise message.max.bytes is not allowed: the maximum message size allowed is 1000012 (1m) by default. It is recommended to set it to 10000012 (10m). Replica.fetch.max.bytes: as above, the default is 1048576, and it is recommended to set it to 10048576. The directory where log.dirs:Kafka data files are stored, note that it is not a log file. Can be configured as: / home/work/kafka/data/kafka-logslog.cleanup.policy: expired data clearing policy, default is delete, can also be set to compactlog.retention.hours: data expiration time (hours), default is 1073741824, that is, one week. Expired data is cleared using log.cleanup.policy rules. Can be configured to the minute level with log.retention.minutes. Log.segment.bytes: data file sharding size. Default is 1073741824 (1G). Retention.check.interval.ms: the interval between cleaning threads to check whether the data is out of date, in ms. The default is 300000, that is, 5 minutes. Zookeeper.connect: machine name of the zookeeper cluster responsible for managing Kafka: Port number, separated by commas

TIPS sends and receives large messages

The following parameters need to be modified:

Broker:message.max.bytes

& replica.fetch.max.bytes

Consumer:fetch.message.max.bytes

For more information on parameters, please see the official documentation:

Http://kafka.apache.org/documentation.html#brokerconfigs

II. ZK configuration and startup

Then make sure that ZK is configured and started correctly. Kafka comes with ZK service, and the configuration file is in the config/zookeeper.properties file. The key configurations are as follows:

DataDir=/home/work/kafka/data/zookeeperclientPort=2181maxClientCnxns=0tickTime=2000initLimit=10syncLimit=5server.1=nj03-bdg-kg-offline-01.nj03:2888:3888server.2=nj03-bdg-kg-offline-02.nj03:2888:3888server.3=nj03-bdg-kg-offline-03.nj03:2888:3888

NOTES Zookeeper cluster deployment

The cluster deployment of ZK does two things:

Assign serverId: create a myid file in the dataDir directory that contains only a number from 1 to 255. this is the serverId of ZK.

Configure the cluster: the format is server. {id} = {host}: {port}: {port}, where {id} is the serverId of ZK mentioned above.

Then start:

Bin/zookeeper-server-start.sh-daemon config/zookeeper.properties.

3. Start kafka

Then you can start Kafka:JMX_PORT=8999 bin/kafka-server-start.sh-daemon config/server.properties, which is very simple.

TIPS

We added the JMX_PORT=8999 environment variable to the startup command, which exposes JMX monitoring items and facilitates monitoring.

Kafka monitoring and management

However, unlike RabbitMQ, or ActiveMQ,Kafka does not have a web management interface by default, only command-line statements, which is not very convenient, but you can install one, such as Yahoo's Kafka Manager: A tool for managing Apache Kafka. It supports many features:

Manage multiple clusters

Easy inspection of cluster state (topics, consumers, offsets, brokers, replica distribution, partition distribution)

Run preferred replica election

Generate partition assignments with option to select brokers to use

Run reassignment of partition (based on generated assignments)

Create a topic with optional topic configs (0.8.1.1 has different configs than 0.8.2 +)

Delete topic (only supported on 0.8.2 + and remember set delete.topic.enable=true in broker config)

Topic list now indicates topics marked for deletion (only supported on 0.8.2 +)

Batch generate partition assignments for multiple topics with option to select brokers to use

Batch run reassignment of partition for multiple topics

Add partitions to existing topic

Update config for existing topic

Optionally enable JMX polling for broker level and topic level metrics.

Optionally filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.

The installation process is quite simple, that is, you have to download a lot of things, which will take a long time. For more information, see kafka manager installation. However, none of these management platforms has the privilege management function.

It is important to note that the kafka-manager.zkhosts configured in Kafka Manager's conf/application.conf configuration file is for its own high availability, rather than pointing to the zkhosts pointed to by the Kafka cluster to be managed. So don't forget to manually configure the Kafka cluster information to manage (mainly the configuration name and zk address). Install and Evaluation of Yahoo's Kafka Manager .

Kafka Manager mainly provides a management interface, and monitoring depends on other applications, such as:

Burrow: Kafka Consumer Lag Checking. Linkedin open source cusumer log monitoring, written in go language, seems to have no interface, only HTTP API, you can configure email alarm.

Kafka Offset Monitor: A little app to monitor the progress of kafka consumers and their lag wrt the queue.

The purpose of both applications is to monitor the offset of Kafka.

Delete theme

Generally, there are two ways to delete a Kafka topic:

1. Manually delete the topic partition folder under the ${log.dir} directory of each node, and log in to the ZK client to delete the corresponding node of the topic to be deleted. The topic metadata is saved under / brokers/topics and / config/topics nodes.

2. Execute the kafka-topics.sh script to delete. If you want to delete the theme completely through this script, you need to make sure that delete.topic.enable=true is configured in the server.properties file loaded when you start Kafka. The configuration item defaults to false. Otherwise, executing the script does not actually delete the topic, but instead creates a topic with the same name as the topic to be deleted in the / admin/delete_topics directory of ZK, marking the topic as deleted.

Kafka-topic-delete-zookeeper server-1:2181,server-2:2181-topic test`

Execution result:

Topic test is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

At this point, if you want to delete the topic completely, you need to delete the corresponding files and nodes manually. When the configuration item is true, all file directories and metadata information corresponding to the topic are deleted.

Automatic removal of expired data

For traditional message queue, messages that have been consumed are generally deleted, while Kafka clusters retain all messages, whether they are consumed or not. Of course, because of disk limitations, it is not possible to retain all data permanently (and it is actually not necessary), so Kafka provides two strategies to delete old data. One is based on time, and the other is based on partition file size. You can configure $KAFKA_HOME/config/server.properties to let Kafka delete data from a week ago, or you can configure Kafka to delete old data when the partition file exceeds 1GB:

# # Log Retention Policy # # The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever * either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletionlog.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining# segments don't drop below log.retention.bytes.#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policieslog.retention.check.interval.ms=300000# By default the log cleaner is disabled and the log retention policy will default to# just delete segments after their retention expires.# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs# can then be marked for log compaction.log.cleaner.enable=false

It should be noted here that because the time complexity for Kafka to read a specific message is O (1), that is, it has nothing to do with file size, so deleting files here has nothing to do with Kafka performance, and the deletion strategy you choose only depends on the disk and specific requirements.

Some problems of Kafka

1. Only ensure that the messages in a single topic in a single partition are orderly, but not all partition messages in a single topic. If the application strictly requires orderly messages, then kafka may not be appropriate.

2. The consumption offset is tracked and submitted by consumers, but consumers do not often write this offset to kafka, because it is very expensive for broker to maintain these updates, which may cause messages to be consumed multiple times or not.

The specific analysis is as follows: the message may have been consumed, but the consumer hung up before confirming that the message had been consumed like the broker submission offset (commit offset), and then another consumer began to process the same partition, which would start with the last submitted offset, resulting in some messages being consumed repeatedly. But conversely, if the consumer submits the offset before batching the message, but dies while processing the message, the message is equivalent to "lost". In general, it is difficult to process messages and submit offsets to form an atomic operation, so there is no guarantee that all messages are processed just once.

3. The number of topics and partitions is limited

The number of topics that a Kafka cluster can handle is limited, and performance begins to degrade when it reaches about 1000 topics. These problems are basically related to the basic implementation decisions of Kafka. In particular, the amount of random IO on the broker increases dramatically as the number of topics increases, because the write operation of each topic partition is actually a separate file append (append) operation. As the number of partitions increases, the problem becomes more and more serious. If Kafka does not take over IO scheduling, the problem will be difficult to solve.

Of course, general applications do not have such a large number of topics and partitions. But if a single Kafka cluster is used as a multi-tenant resource, this problem will be exposed at this time.

4. Manual load balancing of partitions

Kafka's model is very simple, a theme partition is all saved on a broker, and there may be several broker as copies of that partition (replica). The same partition is not split and stored among multiple machines. With the increasing number of partitions, some machines in the cluster are unlucky and happen to be assigned several large partitions. Kafka has no mechanism to automatically migrate these partitions, so you have to do it yourself. Monitoring disk space, diagnosing which partition is causing the problem, and then determining an appropriate place to migrate the partition are manual administrative tasks that cannot be ignored in a Kafka cluster environment.

If the size of the cluster is small and the space required for data is small, this kind of management barely works. However, if the traffic increases rapidly or there is no first-class system administrator, the situation is completely out of control.

Note: if you add new nodes to the cluster, you must also manually migrate data to these new nodes, and Kafka does not automatically migrate partitions to balance load or storage space.

5. Follow copy (replica) only serves as a cold backup (solving HA problems) and cannot provide read service.

Unlike ES,replica shard, it provides read service at the same time to relieve the reading pressure of master. Kafka because the read service is stateful (to maintain commited offset), the follow replica is not involved in the read-write service. Just as a cold standby to solve the single point problem.

6. Messages can only be consumed sequentially, and messages can not be located randomly. When something goes wrong, it is not convenient to locate the problem quickly.

This is actually a common problem for all using messaging systems as asynchronous RPC. Suppose the sender sent a message, but the consumer said that I did not receive it, so how to investigate? Message queuing lacks the mechanism to randomly access messages, such as getting messages based on their key. This makes it difficult to troubleshoot such problems.

After reading the above, have you mastered the principle of Kafka and the method of actual combat analysis? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report