Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to deeply analyze Kafka

2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

How to deeply analyze Kafka, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.

Background introduction

Introduction to Kafka

Kafka is a distributed, publish / subscribe based messaging system. The main design objectives are as follows:

Provide message persistence with a time complexity of O (1), and guarantee constant time access performance even for data above TB level.

High throughput. Even on very cheap commercial machines, a single machine can support the transmission of 100K messages per second.

Support message partitioning between Kafka Server and distributed consumption, while ensuring the sequential transmission of messages within each partition

Support both offline data processing and real-time data processing

Why use Message Queue?

Decoupling

It is extremely difficult to predict what requirements the project will encounter in the future at the beginning of the project. The message queue inserts an implicit, data-based interface layer in the middle of the process, and both processes implement this interface. This allows you to extend or modify the processes on both sides independently, as long as you make sure they follow the same interface constraints

redundancy

In some cases, the process of processing data will fail. Unless the data is persisted, it will be lost. Message queues persist data until they have been fully processed, avoiding the risk of data loss. In the "insert-get-delete" paradigm used by many message queues, before deleting a message from the queue, you need to make it clear that the message has been processed and make sure that your data is safely saved until you have finished using it.

Expansibility

Because message queues decouple your processing, it's easy to increase the frequency of queuing and processing of messages; just add additional processing. There is no need to change the code or adjust the parameters. Expanding is as simple as turning up the power button.

Flexibility & peak processing power

In the case of a sharp increase in traffic, applications still need to continue to play a role, but such sudden traffic is not common; it is undoubtedly a huge waste to put resources on standby to be able to handle such peak visits. The use of message queuing enables key components to withstand sudden access pressure without completely collapsing due to sudden overloaded requests.

Recoverability

When some components of the system fail, the whole system will not be affected. Message queuing reduces the coupling between processes, so even if a process that processes messages dies, messages added to the queue can still be processed after the system is restored. This ability to retry or postpone processing requests is usually the difference between a slightly inconvenient user and a frustrated user.

Guarantee of service

The redundancy mechanism provided by the message queue ensures that the message can be actually processed as long as a process reads the queue. On this basis, IronMQ provides a "one-time delivery" guarantee. No matter how many processes are fetching data from the queue, each message can only be processed once. This is possible because getting a message simply "subscribes" the message and temporarily removes it from the queue. Unless the client explicitly indicates that the message has been processed, the message is put back in the queue and can be processed again after a configurable period of time.

Sequence guarantee

In most usage scenarios, the order of data processing is important. Message queues are inherently sorted and guarantee that the data will be processed in a specific order. IronMO ensures that messages are processed in FIFO (first-in, first-out) order, so the position of messages in the queue is to retrieve their location from the queue.

Buffer

In any important system, there will be elements that require different processing times. For example, loading an image takes less time than applying a filter. Message queuing uses a buffer layer to help tasks execute efficiently-the processing of writing to the queue is as fast as possible without being constrained by preparatory processing read from the queue. This buffer helps to control and optimize the speed at which data flows through the system.

Understanding data flow

In a distributed system, it is a huge challenge to get an overall impression of how long a user's operation will take and why. The message series facilitates the identification of underperforming processes or areas by how often the messages are processed, where the data flow is not optimized.

Asynchronous communication

Most of the time, you don't want to and don't need to process the message immediately. Message queuing provides an asynchronous processing mechanism that allows you to put a message on the queue, but does not process it immediately. You can put as many messages in the queue as you want, and then deal with them when you like.

Common Message Queue comparison

RabbitMQ

RabbitMQ is an open source message queue written in Erlang, which supports many protocols: AMQP,XMPP, SMTP, STOMP. Because of this, it is very heavyweight and more suitable for enterprise development. At the same time, the Broker architecture is implemented, which means that messages are queued in the central queue before being sent to the client. Good support for routing, load balancing or data persistence.

Redis

Redis is a NoSQL database based on Key-Value pairs, which is very active in development and maintenance. Although it is a Key-Value database storage system, it supports MQ functionality, so it can be used as a lightweight queuing service. For the entry and exit operations of RabbitMQ and Redis, each performed 1 million times, and the execution time was recorded every 100000 times. The test data are divided into four different sizes: 128Bytes, 512Bytes, 1K and 10K. The experimental results show that when joining the team, the performance of Redis is higher than that of RabbitMQ when the data is compared, but it is unbearably slow if the data size exceeds 10K; when leaving the team, Redis shows very good performance regardless of the size of the data, while the performance of RabbitMQ is much lower than that of Redis.

ZeroMQ

ZeroMQ claims to be the fastest message queuing system, especially for high-throughput demand scenarios. ZMQ can implement advanced / complex queues that RabbitMQ is not good at, but developers need to combine a variety of technical frameworks themselves, and the technical complexity is a challenge to the successful application of this MQ. ZeroMQ has a unique non-middleware mode, you do not need to install and run a message server or middleware, because your application will play this service role. You can simply reference the ZeroMQ library, install it using NuGet, and then you can happily send messages between applications. But ZeroMQ only provides non-persistent queues, which means that if there is an outage, data will be lost. Among them, ZeroMQ is used as the transport of data streams by default in Storm versions of Twitter prior to 0.9.0 (Storm supports both ZeroMQ and Netty as transport modules since version 0.9).

ActiveMQ

ActiveMQ is a sub-project under Apache. Similar to ZeroMQ, it can implement queues in proxy and peer-to-peer technologies. At the same time, similar to RabbitMQ, it can efficiently implement advanced application scenarios with a small amount of code.

Kafka/Jafka

Kafka, a sub-project of Apache, is a high-performance cross-language distributed publish / subscribe message queuing system, while Jafka is hatched on Kafka, that is, an upgraded version of Kafka. It has the following characteristics: fast persistence, which can persist messages under O (1) system overhead; high throughput, which can reach the throughput rate of 10W/s on an ordinary server; complete distributed system, Broker, Producer, Consumer all support distribution automatically and realize complex equilibrium automatically. Support Hadoop data parallel loading, for the same log data and offline analysis system as Hadoop, but require real-time processing limitations, this is a feasible solution. Kafka unifies online and offline message processing through Hadoop's parallel loading mechanism. Apache Kafka is a very lightweight messaging system compared to ActiveMQ. It is not only a very good performance, but also a well-working distributed system.

Kafka parsing

Terminology

Broker

A Kafka cluster contains one or more servers, which are called broker

Topic

Every message posted to the Kafka cluster has a category, which is called topic. (physically, messages with different topic are stored separately. Logically, messages from a topic are stored on one or more broker, but users only need to specify the topic of the message to produce or consume data regardless of where the data is stored.)

Partition

Parition is a physical concept, and each topic contains one or more partition, and the number of parition can be specified when the topic is created. Each partition corresponds to a folder in which the data and index files of the partition are stored

Producer

Responsible for releasing messages to Kafka broker

Consumer

Consumption message. Each consumer belongs to a specific consuer group (you can specify a group name for each consumer, or the default group if you do not specify group name). When using consumer high level API, a message of the same topic can only be consumed by one consumer within the same consumer group, but multiple consumer group can consume this message at the same time.

Kafka architecture

As shown in the figure above, a typical kafka cluster contains several producer (which can be page view generated by the front end of web, or server logs, system CPU, memory, etc.), several broker (Kafka supports horizontal scaling, generally, the higher the number of broker, the higher the cluster throughput), several consumer group, and a Zookeeper cluster. Kafka manages the cluster configuration through Zookeeper, elects leader, and rebalance when the consumer group changes. Producer publishes messages to broker,consumer using the push pattern subscribes and consumes messages from broker using the pull pattern.

Push vs. Pull

As a messaging system,Kafka follows the traditional way, messages are selected from producer to broker push and from consumer to broker pull. Some logging-centric system, such as Facebook's Scribe and Cloudera's Flume, adopt a very different push mode. In fact, push mode and pull mode have their own advantages and disadvantages.

The push model is difficult to adapt to consumers with different consumption rates, because the message delivery rate is determined by broker. The goal of the push pattern is to deliver messages as quickly as possible, but it is easy to cause consumer to be too late to process messages, typically characterized by denial of service and network congestion. On the other hand, pull mode can consume messages at an appropriate rate according to the consumption power of consumer.

Topic & Partition

A Topic can logically be thought of as an existing queue, and each consumer must specify its topic, which can be simply understood as indicating which queue to put the message into. In order to make the throughput of Kafka scale horizontally, the topic is physically divided into one or more partition, and each partition physically corresponds to a folder in which all messages and index files of the partition are stored.

Each log file is a "log entries" sequence, and each log entry contains a 4-byte integer (value N), followed by an N-byte message body. Each message has a unique 64-byte offset under the current partition, which indicates where the message starts. The consumption format for storage on disk is as follows:

Message length: 4 bytes (value: 1 / 4 / n)

"magic" value: 1 byte

Crc: 4 bytes

Payload: n bytes

This "log entries" is not made up of a single file, but is divided into multiple segment, each segment named offset of the segment*** message and ".kafka". There will also be an index file that indicates the offset range of the log entry contained under each segment, as shown in the following figure.

Because each message is append into the partition, it is sequentially written to disk, so it is very efficient. (it has been proved that sequential disk writing is more efficient than random write memory, which is an important guarantee of Kafka's high throughput.)

When each message is sent to the broker, the partition to which it is stored is selected according to the paritition rules. If the partition rules are set properly, all messages can be evenly distributed to different partition, thus achieving horizontal scaling. If a topic corresponds to a file, the machine on which the file is located will become the performance bottleneck of the topic, and partition solves this problem. When you create a topic, you can specify the number of partition in $KAFKA_HOME/config/server.properties (shown below), or you can modify the number of parition after the topic is created.

# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. Num.partitions=3

When sending a message, you can specify that the key,producer of the message determines which parition to send the message to based on the key and partition mechanism. The paritition mechanism can be implemented by specifying the paritition of producer. The parameter class specifies that the class must implement the kafka.producer.Partitioner interface. In this example, if the key can be parsed to an integer, the corresponding integer is offset by the total number of partition, and the message is sent to the partition corresponding to that number. (each parition will have a serial number.)

Import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class JasonPartitioner implements Partitioner {public JasonPartitioner (VerifiableProperties verifiableProperties) {} @ Override public int partition (Object key, int numPartitions) {try {int partitionNum = Integer.parseInt ((String) key); return Math.abs (Integer.parseInt ((String) key)% numPartitions) } catch (Exception e) {return Math.abs (key.hashCode () numPartitions);}

If you use the class in the above example as the partition.class, and send 20 messages (key is 0mem1mem2 and 3 respectively) to topic2 (including 4 partition) with the following code.

Public void sendMessage () throws InterruptedException {for (int I = 1; I {this broker id}). If the creation is successful (only one can be created successfully), the broker will become a controller, and if the creation is not successful, the broker will wait for a command from the new controller.

Consumer group

(all descriptions in this section are based on consumer hight level API rather than low level API).

Each consumer instance belongs to a consumer group, and each message is consumed by only one consumer instance in the same consumer group. (different consumer group can consume the same message simultaneously)

Many traditional message queue will delete the message after the message is consumed, on the one hand, avoid repeated consumption, on the other hand, it can ensure that the length of the queue is relatively small, and improve efficiency. As mentioned above, Kafka does not delete consumed messages. In order to achieve the semantics that traditional message queue messages are consumed only once, Kafka guarantees that only one consumer in the same consumer group will consume one message. Unlike traditional message queue, Kafka also allows different consumer group to consume the same message at the same time, which provides support for diversified processing of messages. In fact, one of the design ideas of Kafka is to provide both offline processing and real-time processing. According to this feature, real-time streaming systems such as Storm can be used for real-time online processing of messages, while batch processing systems such as Hadoop can be used for offline processing, and data can be backed up to another data center in real time at the same time. You only need to make sure that the consumer used in these three operations is in different consumer group. The following figure shows a simplified deployment of Kafka in Linkedin.

In order to show the features of Kafka consumer group more clearly, the author made a test. Create a topic (named topic1), create a consumer instance that belongs to group1, and create three consumer instances that belong to group2, and then send a message with a key of 1Magazine 3r to topic1 via producer. It was found that the consumer belonging to group1 received all these three messages, and the three consumer in group2 received the message that key was 1Jing 2 Jing 3 respectively. This is shown in the following figure.

Consumer Rebalance

(the content described in this section is based on Kafka consumer high level API)

Kafka guarantees that only one consumer in the same consumer group will send a message. In fact, Kafka guarantees that in a steady state, each consumer instance will only consume data from one or more specific partition, while the data from a partition will only be consumed by a specific consumer instance. The disadvantage of this design is that it is impossible for the consumer in the same consumer group to consume data evenly, and the advantage is that each consumer does not have to communicate with a large number of broker, which reduces the communication overhead, reduces the difficulty of allocation, and makes the implementation easier. In addition, because the data in the same partition is ordered, this design ensures that the data in each partition is also consumed in an orderly manner.

If the number of consumer in a consumer group is less than the number of partition, at least one consumer will consume the data of multiple partition. If the number of consumer is the same as the number of partition, then exactly one consumer consumes the data of one partition. If the number of consumer is more than the number of partition, some consumer will not be able to consume any message under the topic.

As shown in the following example, if the topic1 has three partition in total, when the group1 has only one consumer (named consumer1), the consumer can consume all the data of the three partition.

After adding a consumer (consumer2), one of the consumer (consumer1) can consume 2 partition's data, and the other consumer (consumer2) can consume the data of another partition.

After adding another consumer (consumer3), each consumer can consume one partition of data. Consumer1 consumption partition0,consumer2 consumption partition1,consumer3 consumption partition2

After adding another consumer (consumer4), three of the consumer can consume one partition of data, and the other consumer (consumer4) cannot consume any topic1 data.

At this point, turn off consumer1, and the remaining consumer can consume one partition data respectively.

Then turn off consumer2, and the remaining consumer3 can consume 2 partition,consumer4 and 1 partition.

Then turn off consumer3, and the remaining consumer4 can consume three partition of topic1 at the same time.

The consumer rebalance algorithm is as follows:

Sort PT (all partitions in topic T)

Sort CG (all consumers in consumer group G)

Let i be the index position of Ci in CG and let N=size (PT) / size (CG)

Remove current entries owned by Ci from the partition owner registry

Assign partitions from iN to (iTunes 1) Nmuri 1 to consumer Ci

Add newly assigned partitions to the partition owner registry

At present, the control strategy of consumer rebalance is completed by each consumer through Zookeeper. The specific control methods are as follows:

Register itself in the consumer id registry under its group.

Register a watch on changes under the consumer id registry.

Register a watch on changes under the broker id registry.

If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.

Force itself to rebalance within in its consumer group.

Under this strategy, each addition or decrease of consumer or broker triggers the consumer rebalance. Because each consumer is only responsible for adjusting the partition consumed by itself, in order to ensure the consistency of the entire consumer group, when a consumer triggers a rebalance, all other consumer in that consumer group should also trigger rebalance.

The above method is used in the current (2015-01-19) * * version (0.8.2) Kafka. But this approach has a downside:

Herd effect

Any addition or decrease of broker or consumer will trigger the rebalance of all consumer

Split Brain

Each consumer determines which partition down separately through Zookeeper, so different consumer may "see" different view from Zookeeper, which will result in wrong reblance attempt. And it's possible that all consumer think that rebalance is done, but it may not be so.

According to the official Kafka documentation, Kafka authors are considering using the Central Coordinator (coordinator) in the unreleased 0.9.x version. The general idea is to elect a broker as the coordinator and let it watch Zookeeper to determine whether there is an increase or decrease in partition or consumer, then generate a rebalance command and check whether these rebalance have been successfully executed in all relevant consumer, retry if they are not successful, and consider this rebalance successful if successful (this process is very similar to replication controller, so I wonder why replication controller was not designed in a similar way to solve the consumer rebalance problem). The process is as follows:

Message Deliver guarantee

From the above introduction, the reader must have tomorrow how producer and consumer work, and how Kafka does replication, and then we will discuss how Kafka ensures that messages are transmitted between producer and consumer. There are several possible delivery guarantee:

At most once messages may be lost, but will never be transmitted repeatedly

At least one messages are never lost, but may be transmitted repeatedly

Exactly once each message is sure to be transmitted once and only once, which is what the user wants in many cases.

Kafka's delivery guarantee semantic is very straightforward. When producer sends a message to broker, once the message is commit, the factor replication exists, it will not be lost. However, if the communication is interrupted due to network problems encountered by producer after sending data to broker, then producer cannot determine whether the message has been commit. This is a bit like inserting data into a database table that automatically generates primary key. Although Kafka cannot determine what happened during a network failure, producer can generate something similar to primary key, idempotent retry multiple times in the event of a failure, thus achieving Exactly one. As of now (Kafka version 0.8.2, 2015-01-25), this feature has not yet been implemented and is expected to be implemented in future versions of Kafka. (so by default, a message from producer and broker ensures At least once, but At most once can be achieved by setting producer to send asynchronously.)

The next discussion is the delivery guarantee semantic of messages from broker to consumer. (Kafka consumer high level API only). After the consumer reads the message from the broker, it can select commit, which stores the offset of the message read by the consumer under the partition in the Zookeeper. The next time the consumer reads the partition, it starts with the next entry. If not commit, the start position of the next read will be the same as the start position after the last commit. Of course, you can set consumer to autocommit, that is, consumer automatically commit as soon as it reads the data. If you only discuss the process of reading messages, then Kafka ensures Exactly once. But in fact, consumer does not end after reading the data, but needs further processing, and the order of data processing and commit largely determines the delivery guarantee semantic of messages from broker and consumer.

After reading the message, commit before processing the message. In this mode, if consumer crash before processing the message after commit, the message that has just been submitted but not processed will not be read after the next restart, which corresponds to At most once.

After reading the message, process it first and then commit. In this mode, if the finished message is consumer crash before commit, the message that has just not been commit will be processed the next time you restart the work. In fact, the message has already been processed. This corresponds to At least once. In many usage scenarios, messages have a primary key, so message processing is often idempotent, that is, processing this message multiple times is equivalent to processing it only once, so it can be considered as Exactly once. (this statement feels a bit far-fetched, after all, it is not a mechanism provided by Kafka itself, and primary key itself does not guarantee the idempotence of operations. And in fact, we say delivery guarantee semantic is about how many times it is processed, not what the result is. Because there are so many ways of processing, our system should not regard the characteristics of the process (such as whether idempotent or not, as the feature of Kafka itself).

If you want to achieve Exactly once, you need to coordinate the output of offset and the actual operation. The practice of Classic is to introduce two-phase commit. It would be more concise and versatile if you could keep offset and operation input in the same place. This approach may be better because many output systems may not support two-phase commit. For example, after consumer gets the data, it may put the data to HDFS. If you write the * * offset and the data itself to HDFS, you can ensure that either the output of the data and the update of the offset are completed or not, and Exactly once is realized indirectly. (currently, as far as high level API is concerned, offset is stored in Zookeeper and cannot be stored in HDFS, while low level API's offset is maintained by itself and can be stored in HDFS.)

In summary, Kafka guarantees At least once by default and allows At most once to be implemented by setting producer asynchronous commits. Exactly once requires collaboration with the target storage system, and fortunately the offset provided by Kafka can be used in this way very directly and easily.

Benchmark

The lessons learned from paper are not profound, and you have to put some things into practice. The author hopes to personally test the performance of Kafka, rather than looking for some test data from the Internet. So the author did a detailed Kafka0.8 performance test two months before the release, but unfortunately the test report was accidentally lost. Fortunately, I found the bechmark of Jay Kreps, one of the founders of Kafka, on the Internet. The following descriptions are based on this benchmark. (the benchmark is based on Kafka0.8.1)

Test environment

The benchmark uses six machines, and the machine configuration is as follows

Intel Xeon 2.5 GHz processor with six cores

Six 7200 RPM SATA drives

32GB of RAM

1Gb Ethernet

Three of the six machines are used to build Kafka broker clusters, and the other three are used to install Zookeeper and generate test data. All six drive are mounted directly in a non-RAID manner. In fact, kafka's requirements for machines are similar to those of Hadoop.

Producer throughput

This test only measures the throughput of producer, that is, the data is only persisted and there is no consumer read data.

1 producer thread, no replication

In this test, a topic with six partition and no replication was created. Then a thread generates 50 million short (payload100 bytes long) messages as quickly as possible. The test result is 821557 records/second (78.3MB/second).

Short messages are used because this usage scenario is more difficult for messaging systems. Because if MB/second is used to represent throughput, sending long messages will undoubtedly make the test results better.

Throughout the test, the MB/second is calculated by multiplying the number of delivery messages per second by the length of the payload. The meta-information of the message is not taken into account, so the actual network usage will be larger than this. For this test, an additional 22 bytes need to be transferred each time, including an optional key, message length description, CRC, and so on. In addition, it also includes some request-related overhead, such as topic,partition,acknowledgement and so on. This makes it difficult to judge whether the network card limit has been reached, but it should be more reasonable to count these overhead in the throughput. Therefore, we have basically reached the limit of the network card.

A preliminary look at this result will suggest that it is much higher than expected, especially considering that Kafka persists the data to disk. In fact, if you use a random access data system, such as RDBMS, or key-velue store, the expected frequency of access is about 5000 to 50000 requests per second, which is similar to the number of remote requests that a good RPC layer can accept. There are two reasons why the test is much better than this.

Kafka ensures that the process of writing disks is linear disk I * * O, and the * throughput of the 6 cheap disks used in the test is 822MB/second, which is much higher than that of 1Gb network cards. Many messaging systems treat persisting data to disk as an expensive thing because their operations on disk are not linear.

At each stage, Kafka uses batch processing as much as possible. If you want to understand the importance of batching in Istroke O operations, you can refer to David Patterson's "Latency Lags Bandwidth"

1 producer thread, 3 asynchronous replication

This test is basically the same as the previous test, except that there are three replica per partition (so the total amount of data transferred and written to disk on the network has tripled). Each broker should not only write the partition as leader, but also read (read data from leader) and write (write data to disk) as the partition of follower. The test result is 786980 records/second (75.1MB/second).

In this test, the replication is asynchronous, which means that after the broker receives the data and writes it to the local disk, it acknowledge producer instead of waiting for all the replica to complete the replication. In other words, if you leader crash, you may lose some unbacked-up data. But it will also make message acknowledgement with less latency and better real-time performance.

This test shows that replication can be very fast. The write capacity of the entire cluster may be only 1/3 due to three times the replication, but the throughput is still good enough for each producer.

1 producer thread, 3 synchronous replication

The only difference between this test and the previous test is that the replication is synchronized, and each message is set to committed only after it has been copied by all the replica in the in sync collection (in which case the broker sends an acknowledgement to the producer). In this mode, Kafka can guarantee that even if leader crash, there will be no data loss. The test result is 421823 records/second (40.2MB/second).

There is no fundamental difference between Kafka synchronous replication and asynchronous replication. Leader will always track follower replica to monitor whether they are still alive, and only messages that all replica in the in sync collection acknowledge can be consumed by consumer. The waiting for follower affects the throughput. This situation can be improved by increasing the batch size, but this test does not make this adjustment in order to avoid specific optimizations that affect the comparability of test results.

3 producer,3 and asynchronous replication

This test is equivalent to copying one producer above to three different machines (running multiple instances on one machine will not help the increase in throughput, because the network card is basically saturated), and the three producer send data at the same time. The throughput of the whole cluster is 2024032 records/second.

Producer Throughput Vs. Stored Data

A potential danger of a messaging system is that it performs well when the data can be stored in memory, but when the amount of data is too large to be completely stored in memory (and then many messaging systems delete the data that has been consumed. However, when the consumption speed is slower than the production speed, it will still cause the data to accumulate), the data will be transferred to disk, resulting in a decline in throughput, which in turn makes the system unable to receive data in time. This is very bad, but in fact, the purpose of using queue in many scenarios is to solve the problem of inconsistent data consumption rate and production rate.

However, Kafka does not have this problem, because Kafka always persists data to disk with O (1) time complexity, so its throughput is not affected by the amount of data stored on disk. In order to verify this feature, a long-time test of a large amount of data has been done. The following figure shows the relationship between throughput and the amount of data.

There is some variance in the figure above, and it is clear that throughput is not affected by the amount of data stored on disk. In fact, as you can see from the above figure, when the amount of disk data reaches 1TB, there is no significant difference between the throughput and when the disk data is only a few hundred MB.

This variance is managed by Linux Ibind O, which caches the data and then batches the flush. The test results in the above figure are obtained after doing some tuning on the Kafka cluster in the production environment. These tuning methods can be found here.

Consumer throughput

It is important to note that replication factor does not affect the throughput testing of consumer, because consumer only reads data from the leader of each partition, regardless of replicaiton factor. Similarly, consumer throughput has nothing to do with synchronous or asynchronous replication.

1 consumer

This test consumes 50 million messages from a topic with 6 partition,3 replication. The test result is 940521 records/second (89.7MB/second).

As you can see, Kafkar's consumer is very efficient. It reads file blocks directly from broker's file system. Kafka uses sendfile API to transmit directly through the operating system without copying data to user space. The test actually reads the data from the beginning of the log, so it does the real Imax O. In a production environment, consumer can directly read the data that producer has just written (it may still be in the cache). In fact, if you run I _ stat in a production environment, you can see that there is basically no physical "read". In other words, the throughput of consumer in production environment will be higher than that in this test.

3 consumer

Copy the above consumer to three different machines and run them in parallel (consuming data from the same topic). The test result is 2615968 records/second (249.5MB/second).

As expected, the throughput of consumer increases almost linearly.

Producer and Consumer

The above test simply tests producer and consumer separately, while this test runs both producer and consumer, which is closer to the usage scenario. In fact, in the current replication system, follower is equivalent to consumer working.

This test uses both 1 producer and 1 consumer on a topic with 6 partition and 3 replica, and uses asynchronous replication. The test result is 795064 records/second (75.8MB/second).

As you can see, the result of this test is almost the same as that of testing 1 producer alone. So consumer is very lightweight.

Effect of message length on throughput

All of the above tests are based on short messages (payload 100bytes), which, as mentioned above, are a more difficult way to use Kafka, and you can expect records/second to decrease but MB/second to improve as the length of the message increases. The following figure is a diagram of the relationship between records/second and message length.

As we expected, the number of messages that can be sent per second decreases as the length of the message increases. But if you look at the total size of messages sent per second, it increases as the length of the message increases, as shown in the following figure.

As can be seen from the above figure, when the message length is 10 bytes, because it takes too much time to acquire the lock because of joining the queue frequently, CPU becomes a bottleneck and cannot make full use of the bandwidth. But starting at 100 bytes, we can see that bandwidth usage tends to be saturated (although MB/second still increases with the length of the message, the increase is getting smaller and smaller).

End-to-end Latency

Throughput was discussed above, but what about the latency of message transmission? In other words, how long does it take for a message to travel from producer to consumer? This test creates 1 producer and 1 consumer and repeats the timing. The result is 2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile).

(there is no indication of how many partition there are in topic, nor how many replica,replication are synchronous or asynchronous. In fact, this will greatly affect the messages sent by producer by commit's latency, and only committed messages can be consumed by consumer, so it will eventually affect end-to-end latency)

Reproduce the benchmark

If you want to reproduce this benchmark test on your machine, you can refer to the configuration of this test and the commands used.

In fact, Kafka Distribution provides producer performance testing tools, which can be started through the bin/kafka-producer-perf-test.sh script. The commands used are as follows

Producer Setup bin/kafka-topics.sh-zookeeper esv4-hcl197.grid.linkedin.com:2181-create-topic test-rep-one-partitions 6-replication-factor 1 bin/kafka-topics.sh-zookeeper esv4-hcl197.grid.linkedin.com:2181-create-topic test--partitions 6-replication-factor 3 Single thread No replication bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test7 50000000 100-1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 Single-thread Async 3x replication bin/kafktopics.sh-zookeeper esv4-hcl197.grid.linkedin.com:2181-create-topic test-partitions 6-replication-factor 3 bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test6 50000000 100-1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 Single-thread Sync 3x replication bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100-1 acks=-1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000 Three Producers 3x async replication bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100-1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 Throughput Versus Stored Data bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100-1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 Effect of message size for i in 10 100 1000 10000 100000 Do echo "" echo $I bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test $((1000 million 1024 million 1024 Universe)) $I-1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=128000 done Consumer Consumer throughput bin/kafka-consumer-perf-test.sh-zookeeper esv4-hcl197.grid.linkedin.com:2181-messages 50000000-topic test-threads 1 3 Consumers On three servers Run: bin/kafka-consumer-perf-test.sh-zookeeper esv4-hcl197.grid.linkedin.com:2181-messages 50000000-topic test-threads 1 End-to-end Latency bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency esv4-hcl198.grid.linkedin.com:9092 esv4-hcl197.grid.linkedin.com:2181 test 5000 Producer and consumer bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100-1 acks=1 bootstrap.servers=esv4 -hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 bin/kafka-consumer-perf-test.sh-- zookeeper esv4-hcl197.grid.linkedin.com:2181-- messages 50000000-- topic test-- threads 1

The broker configuration is as follows

# # Server Basics # The id of the broker. This must be set to a unique integer for each broker. Broker.id=0 # # Socket Server Settings # The port the socket server listens on port=9092 # Hostname the broker will bind to and advertise to producers and consumers. # If not set, the server will bind to all interfaces and advertise the value returned from # from java.net.InetAddress.getCanonicalHostName (). # host.name=localhost # The number of threads handling network requests num.network.threads=4 # The number of threads doing disk Iram O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 # # Log Basics # The directory under which to store log files log.dirs=/grid/a/dfs-data/kafka-logs / grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs # The number of logical partitions per topic per server. More partitions allow greater parallelism # for consumption, but also mean more files. Num.partitions=8 # # Log Flush Policy # The following configurations control the flush of data to disk. This is the most # important performance knob in kafka. # There are a few important trade-offs here: # 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. # 2. Latency: Data is not made available to consumers until it is flushed (which adds latency) # 3. Throughput: The flush is generally the most expensive operation. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # Per-topic overrides for log.flush.interval.ms # log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 # # Log Retention Policy # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever * either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. # log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. Log.segment.bytes=536870912 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.cleanup.interval.mins=1 # # Zookeeper # Zookeeper connection string (see zookeeper docs for details) # This is a comma separated host:port pairs, each corresponding to a zk # server. E.g. "127.0.0.1virtual 3000127.0.0.1" 1127.0.0.1 ". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. Zookeeper.connect=esv4-hcl197.grid.linkedin.com:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 # metrics reporter properties kafka.metrics.polling.interval.secs=5 kafkakafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/kafka_metrics # Disable csv reporting by default. Is it helpful for kafka.csv.metrics.reporter.enabled=false replica.lag.max.messages=10000000 to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report