In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Today, the editor will bring you an article about Apache's Kafka. The editor thinks it is very practical, so I will share it for you as a reference. Let's follow the editor and have a look.
1. Kafka Overview 1.1 introduction to kafka
Apache Kafka is a fast, scalable, high-throughput, fault-tolerant distributed publish-subscribe message system. Written in Scala and Java language, Kafka can deliver messages from one endpoint to another. Compared with traditional message intermediaries (such as ActiveMQ, RabbitMQ), Kafka has the characteristics of high throughput, built-in partition, supporting message replicas and high fault tolerance, so it is very suitable for large-scale message processing applications.
Kafka official website: http://kafka.apache.org/
The main design objectives of Kafka are as follows:
The ability of message persistence is provided in the way of time complexity O (1), which can guarantee the access performance of constant time even for data above TB level. High throughput. Even on very cheap commercial machines, it is possible to support the transmission of 100K messages per second on a single machine. Support message partitioning between Kafka Server, and distributed consumption, while ensuring the sequential transmission of messages within each partition. Both offline data processing and real-time data processing are supported. Support for online horizontal scaling
Kafka is commonly used in two broad categories of applications:
Establish a real-time streaming data pipeline to reliably obtain data between systems or applications to build real-time streaming applications to transform or respond to data streams
To understand how Kafka performs these operations, let's delve deeper into the functionality of Kafka from scratch.
First of all, a few concepts:
Kafka runs as a cluster on one or more servers that can span multiple data centers. The Kafka cluster stores the stream of records in categories called topics. Each record consists of a key, a value and a timestamp to form a 1.2 kafka architecture.
1.3Application scenarios of kafka
There are a lot of kafka application scenarios, so let's give a few of our most common scenarios
1.3.1 tracking of user activity
Users' different activity messages on the website are posted to different topic centers, and then these messages can be monitored and processed in real time. Of course, it can also be loaded into the Hadoop or offline processing data warehouse to portray the user. Such as Taobao, Tmall, JD.com these large e-commerce platforms, all the activities of users should be tracked.
1.3.2 Log collection
1.3.3 current limiting and peak clipping
1.3.4 High throughput implementation
Compared with other MQ, the most important feature of Kafka is high throughput. In order to increase storage capacity, Kafka writes all messages to low-speed and high-capacity hard drives. In theory, this will result in performance loss, but in fact, Kafka can still maintain ultra-high throughput, and its performance is not affected. It mainly uses the following ways to achieve high throughput.
Sequential read and write: Kafka writes messages to the partition partition, and messages in the partition are read and written sequentially. Sequential reading and writing is faster than random reading and writing. Zero copy: producers and consumers use zero copy to implement messages in Kafka. Bulk send: Kafka allows batch send mode. Message compression: Kafka allows compression of message collections. 1.4 the advantages of kafka 1. Decoupling:
It is extremely difficult to predict what requirements the project will encounter in the future at the beginning of the project. The message system inserts an implicit data-based interface layer in the middle of the processing process, and both sides of the processing process should implement this interface. This allows you to extend or modify the processes on both sides independently, as long as you make sure they follow the same interface constraints.
2 redundancy: (copy)
In some cases, the process of processing data will fail. Unless the data is persisted, it will be lost. Message queues persist data until they have been fully processed, avoiding the risk of data loss. In the insert-get-delete paradigm used by many message queues, before deleting a message from the queue, your processing system needs to clearly indicate that the message has been processed to ensure that your data is safely saved until you have finished using it.
3 Extensibility
Because message queues decouple your processing, it is easy to increase the frequency of queuing and processing of messages, as long as you add additional processing. There is no need to change the code or adjust the parameters. Expanding is as simple as turning up the power button.
4 flexibility & peak processing power
In the case of a sharp increase in traffic, applications still need to continue to play a role, but such sudden traffic is not common; it is undoubtedly a huge waste to put resources on standby to be able to handle such peak visits. The use of message queuing enables key components to withstand sudden access pressure without completely collapsing due to sudden overloaded requests.
5. Recoverability
When some components of the system fail, the whole system will not be affected. Message queuing reduces the coupling between processes, so even if a process that processes messages dies, messages added to the queue can still be processed after the system is restored.
6. Sequence guarantee
In most usage scenarios, the order of data processing is important. Most message queues are sorted by nature, and it is guaranteed that the data will be processed in a specific order. Kafka guarantees the ordering of messages within a Partition.
7. Buffer
In any important system, there will be elements that require different processing times. For example, loading an image takes less time than applying a filter. Message queuing uses a buffer layer to help tasks perform as efficiently as possible-the processing of writing to the queue is as fast as possible. This buffer helps to control and optimize the speed at which data flows through the system.
8. Asynchronous communication
In many cases, users do not want or need to process messages immediately. Message queuing provides an asynchronous processing mechanism that allows users to put a message on the queue, but does not process it immediately. Put as many messages as you want in the queue, and then process them when needed.
1.5 kafka compared with other MQ 1. RabbitMQ
RabbitMQ is an open source message queue written in Erlang, which supports many protocols: AMQP,XMPP, SMTP, STOMP. Because of this, it is very heavyweight and more suitable for enterprise development. At the same time, the Broker architecture is implemented, which means that messages are queued in the central queue before being sent to the client. Good support for routing, load balancing or data persistence.
2. Redis
Redis is a NoSQL database based on Key-Value pairs, which is very active in development and maintenance. Although it is a Key-Value database storage system, it supports MQ functionality, so it can be used as a lightweight queuing service. For the entry and exit operations of RabbitMQ and Redis, each performed 1 million times, and the execution time was recorded every 100000 times. The test data are divided into four different sizes: 128Bytes, 512Bytes, 1K and 10K. The experimental results show that when joining the team, the performance of Redis is higher than that of RabbitMQ when the data is compared, but it is unbearably slow if the data size exceeds 10K; when leaving the team, Redis shows very good performance regardless of the size of the data, while the performance of RabbitMQ is much lower than that of Redis.
3. ZeroMQ
ZeroMQ claims to be the fastest message queuing system, especially for high-throughput demand scenarios. ZeroMQ can implement advanced / complex queues that RabbitMQ is not good at, but developers need to combine a variety of technical frameworks themselves, and the technical complexity is a challenge to the successful application of this MQ. ZeroMQ has a unique non-middleware mode, you do not need to install and run a message server or middleware, because your application will play this server role. You can simply reference the ZeroMQ library, install it using NuGet, and then you can happily send messages between applications. But ZeroMQ only provides non-persistent queues, which means that if there is an outage, data will be lost. Among them, ZeroMQ is used as the transport of data streams by default in Storm versions of Twitter prior to 0.9.0 (Storm supports both ZeroMQ and Netty as transport modules since version 0.9).
4. ActiveMQ
ActiveMQ is a sub-project under Apache. Similar to ZeroMQ, it can implement queues in proxy and peer-to-peer technologies. At the same time, similar to RabbitMQ, it can efficiently implement advanced application scenarios with a small amount of code.
5. Kafka/Jafka
Kafka, a sub-project of Apache, is a high-performance cross-language distributed publish / subscribe message queuing system, while Jafka is hatched on Kafka, that is, an upgraded version of Kafka. It has the following characteristics: fast persistence, which can persist messages under O (1) system overhead; high throughput, which can reach the throughput rate of 10W/s on an ordinary server; completely distributed system, Broker, Producer, Consumer all support distribution natively and realize load balancing automatically. Support Hadoop data parallel loading, for the same log data and offline analysis system as Hadoop, but require real-time processing limitations, this is a feasible solution. Kafka unifies online and offline message processing through Hadoop's parallel loading mechanism. Apache Kafka is a very lightweight messaging system compared to ActiveMQ. It is not only a very good performance, but also a well-working distributed system.
1.6 several important roles of kafka 1.6.1 kafka as a storage system
Any message queue that allows the publication of unrelated messages effectively acts as a storage system for running messages. What makes Kafka different is that it is a very good storage system.
Data written to Kafka is written to disk and replicated for fault tolerance. Kafka allows the producer to wait for an acknowledgement so that the write will not complete even if the write to the server fails.
Kafka's disk structure is well scalable-whether there is 50 KB or 50 TB of persistent data on the server, Kafka will do the same.
Because you take storage seriously and allow clients to control their read locations, you can think of Kafka as a dedicated distributed file system dedicated to high-performance, low-latency commit log storage, replication, and propagation.
1.6.2 kafka as a messaging system
How does Kafka's flow concept compare with traditional enterprise messaging systems?
Traditionally, there are two models for messaging: queuing and publish-subscribe. In the queue, a group of consumers can read content from the server, and each record will be transferred to one of them. Broadcast to all consumers in the publish-subscribe record. Each of these two models has its advantages and disadvantages. The advantage of queuing is that it allows you to divide data processing into multiple consumer instances, thereby expanding the amount of processing. Unfortunately, the queue is not multi-user-a process reads the lost data. Publish-subscribe allows you to broadcast data to multiple processes, but because each message is delivered to each subscriber, processing cannot be extended.
Kfka's concept of consumer group summarizes these two concepts. Like queues, a consumer group allows you to divide processing into a set of processes (members of the consumer group). Like publish subscriptions, Kafka allows you to broadcast messages to multiple consumer groups.
The advantage of the Kafka model is that each topic has these attributes-- it can extend the scope of processing and is a multi-subscriber-- without having to choose one of them.
Compared with the traditional messaging system, Kafka also has a stronger ordering guarantee.
Traditional queues keep records on the server sequentially, and if multiple users consume from the queue, the server will distribute the records in the order in which the records are stored. However, although the server distributes records sequentially, they are passed asynchronously to consumers, so they may arrive out of order on different users. This actually means that the order of records is lost in the case of parallel use. Messaging systems usually solve this problem through the concept of "proprietary consumers", which allows only one process to be used from the queue, but this of course means that there is no parallelism in processing.
Kafka did better. By having the concept of parallelism (that is, partitioning) within the topic, Kafka can provide ordering assurance and load balancing in the user process pool. This is achieved by assigning partitions in the theme to consumers in the consumer group so that each partition is fully consumed by one consumer in the group. By doing so, we ensure that the consumer is the only reader for the partition and use the data sequentially. Because there are many partitions, you can still balance the load on many consumer instances. Note, however, that the consumer instance in the consumer group cannot exceed the partition.
1.6.3 kafka for stream processing
It is not enough to read, write and store data streams in order to achieve real-time processing of convection.
In Kafka, a stream processor is anything that takes a continuous data stream from an input topic, performs some processing on that input, and generates a continuous data stream to output the topic.
For example, a retail application can accept an input stream of sales and shipments and output a reorder and price adjustment stream calculated based on this data.
You can use producer and consumer API for simple processing directly. However, for more complex transformations, Kafka provides a fully integrated Streams API. This allows you to build applications that perform unimportant processing that calculate the aggregation of flows or connect them together.
This feature helps solve the challenges faced by such applications: dealing with unordered data, reprocessing input when code changes, performing state calculations, and so on.
Stream API is based on the core primitive provided by Kafka: it uses producer and consumer API for input, uses Kafka for state storage, and uses the same group mechanism to implement fault tolerance between stream processor instances.
2. Interpretation of key terms in kafka 2.1 Topic
Theme. In Kafka, a category attribute is used to classify the class of the message, which is called topic. Topic is equivalent to the classification label of messages and is a logical concept.
Physically, messages with different Topic are stored separately. Logically, a Topic message is stored on one or more broker, but users only need to specify the Topic of the message to produce or consume data, regardless of where the data is stored.
# # 2.2 Partition
Partition. Messages in topic are split into one or more partition, which is a physical concept that corresponds to one or more directories on the system. Messages within partition are ordered, but messages between partition are disordered.
2.3 Segment
Duan. The partition is further subdivided into several segment, each segment file of equal size.
2.4 Broker
A Kafka cluster contains one or more servers, and each server node is called a broker.
Broker stores data for topic. If a topic has N partition and the cluster has N broker, then each broker stores one partition of that topic.
If a topic has N partition and the cluster has N broker, then N broker stores one partition of the topic, and the remaining M broker does not store the partition data of the topic.
If a topic has N partition and the number of broker in the cluster is less than N, then one broker stores one or more partition of that topic. In the actual production environment, try to avoid this situation, which can easily lead to Kafka cluster data imbalance.
2.5 Producer
The producer, that is, the publisher of the message. Producers publish data to the topic of their choice. The producer is responsible for choosing which record to assign to which partition in the topic. That is, a message produced by the producer will be written to a certain partition.
# # 2.6 Consumer
Consumers. Messages can be read from broker.
A consumer can consume multiple topic messages
A consumer can consume messages in multiple partition in the same topic
One partiton allows multiple consumer consumption at the same time
2.7 Consumer Group
Consumer group is a scalable and fault-tolerant consumer mechanism provided by kafka. There can be multiple consumers in the group that share a common ID, group ID. All consumers in the group coordinate to consume all partitions of the subscription topic.
Kafka guarantees that only one consumer in the same consumer group will consume a message. In fact, Kafka guarantees that in a steady state, each consumer instance will only consume one or more specific partition, and the data of a partition will only be consumed by a specific consumer instance.
Below, we use a picture on the official website to identify the corresponding relationship between the number of consumer and the number of partition.
A Kafka cluster of two servers with four P0-P3 with two user groups. Consumer group A has two consumer instances and group B has four.
In fact, I didn't understand this consumer group before. My own summary is:
Partitoin to group in topic is a publish-subscribe communication mode, that is, a partition message of topic will be consumed by all group, belonging to one-to-many mode; group to consumer is a point-to-point communication mode, belonging to one-to-one mode.
For example: if you do not use group, start 10 consumer to consume a topic, and these 10 consumer can get all the data of the topic, which is equivalent to any message in this topic being consumed 10 times.
With group, messages with groupid,topic when connecting are distributed to 10 consumer, and each message is consumed only once
2.8 Replizcas of partition
Partition copy. A copy is a backup of a partition, a backup of a partition created to prevent message loss.
2.9 Partition Leader
Each partition has multiple copies, of which one and only one is the partition that is currently responsible for reading and writing messages as Leader,Leader. That is, all read and write operations can only occur on the Leader partition.
2.10 Partition Follower
All Follower need to synchronize messages from Leader, and Follower and Leader always keep messages synchronized. The relationship between Leader and Follower is a master-slave relationship, not a master-slave relationship.
2.11 ISR
ISR,In-Sync Replicas, which refers to the copy synchronization list. The ISR list is maintained by Leader.
AR,Assigned Replicas, which refers to all copies of a partition, that is, a list of assigned copies. OSR,Outof-Sync Replicas, that is, the list of unsynchronized copies. AR = ISR + OSR2. 12 offset
Offset. Each message has a unique 64-byte offset under the current Partition, which is equivalent to the offset of the first message in the current partition.
2.13 Broker Controller
Of the multiple broker in the Kafka cluster, one will be elected controller to manage the status of partition and replicas in the entire cluster.
Only Broker Controller registers Watcher with zookeeper. Other broker and partitions do not need to be registered. That is, zookeeper only needs to listen to the status change of Broker Controller.
2.14 HW and LEO
HW,HighWatermark, high water level, indicates the maximum partition offset that can be consumed by Consumer. HW ensures the consistency of messages in the Kafka cluster. To be exact, it ensures the consistency of data between Follower and Leader of partition.
LEO,Log End Offset, the offset of the last message of the log. The message is written to the log file of Kafka, which is the offset of the last message currently written in Partition.
For newly written messages from leader, consumer cannot be consumed immediately. Leader waits for the message to be synchronized by the partition follower in all ISR before it updates the HW before the message can be consumed by consumer.
I believe you are still confused after reading the above concepts. OK, let's use a picture to express the relationship between the two.
2.15 zookeeper
Zookeeper is responsible for the maintenance and coordination of broker and the election of Broker Controller.
In previous versions of kafka0.9, offset was managed by zk.
Conclusion: zk is responsible for the election of Controller and Controller is responsible for the election of leader.
2.16 Coordinator
Coordinator generally refers to the group Coordinator process that runs on each broker and is used to manage the members of the Consumer Group, mainly for offset displacement management and Rebalance. A Coordinator can manage multiple consumer groups at the same time.
2. 17 Rebalance
When the quantity in the consumer group changes, or the number of partition in the topic changes, the ownership of the partition will be transferred among the consumers, that is, the partition will be redistributed, a process called rebalancing Rebalance.
Rebalancing can bring high performance, high availability and scalability to consumer groups and broker, but consumers cannot read messages during rebalancing, that is, the entire broker cluster is unavailable for a short period of time. Therefore, unnecessary rebalancing should be avoided.
# # 2.18 offset commit
Consumer takes a batch of messages from broker and writes them to buffer for consumption. After consuming messages within a specified period of time, it automatically submits the offset of its consumed messages to broker to record which messages have been consumed. Of course, if the consumption is not finished within the time limit, it will not submit the offset.
3. The working principle and process of kafka 3.1 message writing algorithm
It is a complicated process that the message sender sends the message to broker and forms the final log that can be consumed by consumers.
Producer first finds the leaderproducer of the partition from the zookeeper, sends the message to the leaderleader, connects the message to the local log, and notifies the followers in the followersISR of the ISR to pull the message from the leader, writes the local log, sends the ackleader to leader after receiving the ack of the followers in all ISR, adds HW and sends ack to producer, indicating that the message has been written successfully 3.2 message routing policy
When publishing a message through API, the producer publishes the message with Record. The inclusion of key and value,value in Record is our real message itself, and key is used to route the Partition where the message is to be stored. The Partition to which the message is written is not random, but there is a routing policy.
If partition is specified, it is written directly to the specified partition
If partition is not specified but key is specified, the module is selected by modulating the hash value of key and the number of partition.
The result is the partition index to be selected
If neither partition nor key is specified, a partition is selected using a polling algorithm. 3.3 HW truncation mechanism
If the partition leader receives a new message, the other Follower in the ISR is in the process of synchronization, and the leader goes down before the synchronization is completed. A new leader needs to be elected at this point. If there is no HW truncation mechanism, it will lead to the inconsistency of leader and follower data in partition.
When the original Leader is down and then resumes, roll back its LEO to its downtime HW, and then synchronize the data with the new Leader, so as to ensure that the data in the old Leader is consistent with the new Leader. This mechanism is called HW truncation mechanism.
3.4 Reliability of message transmission
When producers send messages to kafka, they can choose the level of reliability they want. It is set by the value of the request.required.acks parameter.
0 value
Send asynchronously. The producer sends a message to the kafka without the need for kafka feedback on the successful ack. This method has the highest efficiency, but the lowest reliability. There may be a case of message loss.
Message loss occurs during transmission. Message loss occurs within the broker. There is a situation where the order of the messages written to the kafka is not consistent with the production order. 1 valu
Send synchronously. The partition leader that the producer sends the message to the kafka,broker sends the ack as soon as it receives the message (there is no need to wait for the Follower synchronization in the ISR). After receiving the message, the producer knows that the message is sent successfully, and then sends the message again. If the ack of kafka has not been received, the producer will assume that the message has failed and will resend the message.
For Producer, if you do not receive the ACK, you can confirm that the message failed and then resend it; however, even if you receive the ACK, there is no guarantee that the message will be sent successfully. Therefore, in this case, the loss of messages may also occur.
-1 value
Send synchronously. After the producer sends a message to the kafka,kafka and receives the message, it waits until all the copies in the ISR list synchronize the message before sending a successful ack to the producer. If the ack of kafka has not been received, the message is considered to have failed, and the message will be automatically resent. In this way, messages will be received repeatedly.
3.5 Analysis of consumers' consumption process
The producer sends the message to topitc, and the consumer can consume it. The consumption process is as follows:
Consumer submits a connection request to broker, and the broker connected to it will send it the communication URL of broker controller, that is, the listeners address in the configuration file; when consumer specifies the topic to be consumed, it will send a consumption request to broker controller; broker controller will assign one or more partition leader to consumer and send the current offset of that partition to consumer;consumer. It will consume the messages according to the partition assigned by broker controller. When consumer consumes the message, consumer will send a message that has been consumed to broker, that is, the offset; of the message will update the corresponding _ _ consumer_offset after broker receives the offset of consumer; the above process will be repeated until the consumer stops requesting consumption; Consumer can reset offset, so that messages stored on broker can be consumed flexibly. 3.6 Partition Leader Election scope
When the leader goes down, broker controller selects a follower from the ISR to be the new leader. What if there are no other copies in ISR? The leader election range can be set by the value of unclean.leader.election.enable.
False
You must wait until all the copies in the ISR list are alive before holding a new election. The reliability of the strategy is guaranteed, but the availability is low.
True
In the case of no copy in the ISR list, any host without downtime can be selected as the new leader. This policy is highly available, but its reliability is not guaranteed.
3.7 the solution to the problem of repeated consumption is the same consumer.
When Consumer causes consumption timeout due to low consumption power, repeated consumption may be formed.
When a certain data is consumed but is about to be submitted to offset, and the consumption time expires, broker considers that the message has not been consumed successfully. At this time, there will be the problem of repeated consumption.
The solution: extend the offset submission time.
Different consumer repeat consumption
When the Consumer consumes messages but goes down before the offset is submitted, the messages that have already been consumed will be consumed repeatedly.
The solution: change automatic submission to manual submission.
3.8 solve the problem of kafka repeated consumption from the aspect of architecture design
In fact, when we are developing, when we design the program, for example, considering some abnormal situations such as network failure, we will set the number of retries of the message.
There may be other news duplicates, so how should we solve it?
Here are three options:
3.8.1 option 1: save and query
Set a unique uuid for each message. We need to store a uuid for all messages. When we consume messages, we first check the persistence system to see if this has been consumed before. If not, we are spending. If it has been consumed, discard it. The following figure shows this scheme.
3.8.2 option 2: using idempotents
Idempotent (Idempotence) is defined mathematically in this way. If a function f (x) satisfies: F (f (x)) = f (x), then the function f (x) satisfies idempotency.
This concept is extended to the computer field and is used to describe an operation, method, or service. The characteristic of an idempotent operation is that the effect of arbitrary execution is the same as that of one execution. An idempotent method, using the same parameters, makes multiple calls and one call, and has the same impact on the system. So, for idempotent methods, you don't have to worry that repeated execution will cause any changes to the system.
Let's give an example to illustrate. Without considering the concurrency, "set teacher X's account balance to 1 million yuan", the impact on the system after one execution is that teacher X's account balance becomes 1 million yuan. As long as the parameter provided is 1 million yuan unchanged, no matter how many times it is executed, teacher X's account balance will always be 1 million yuan and will not change. This operation is an idempotent operation.
To take another example, "add teacher X's balance to 1 million yuan", this operation is not idempotent. Each time it is executed, the account balance will increase by 1 million yuan. The impact of multiple execution and one execution on the system (that is, the balance of the account) is not the same.
So, through these two examples, we can think that if the business logic of the system consuming messages is idempotent, then we don't have to worry about message repetition, because the same message, consuming once and consuming multiple times have exactly the same impact on the system. In other words, it can be considered that consuming many times is equal to consuming once.
So, how to achieve idempotent operation? The best way is to start from the business logic design, the consumption of business logic is designed to have idempotent operations. However, not all businesses can be designed to be naturally idempotent, so some methods and techniques are needed to achieve idempotency.
Here we introduce a commonly used method: using the unique constraints of the database to achieve idempotency.
For example, we just mentioned the example of a non-idempotent transfer: add the balance of Mr. X's account to 1 million yuan. In this example, we can transform the business logic to make it idempotent.
First of all, we can limit that each transfer bill can only be changed once per account. In distributed systems, there are many ways to implement this restriction. The simplest thing is to create a transfer flow table in the database, which has three fields: transfer bill ID, account ID and change amount, and then combine the transfer bill ID and account ID fields to create a unique constraint. In this way, there can be at most one record in the table for the same transfer bill ID and account ID.
In this way, the logic of our consumption message can be as follows: "add a transfer record to the transfer flow table, and then update the user balance asynchronously according to the transfer record." In the operation of adding a transfer record to the transfer flow table, because we pre-define the unique constraint of "account ID transfer ID" in this table, only one record can be inserted into the same transfer bill and the same account, and subsequent repeated insertions will fail, thus realizing an idempotent operation.
3.8.3 option 3: set prerequisites
Another idempotent way to set preconditions for updated data is to set a precondition for data changes, update the data if the conditions are met, otherwise refuse to update the data, and change the data that needs to be judged in the precondition at the same time.
In this way, when the operation is repeated, because the data to be judged in the pre-condition has been changed when the data is updated for the first time, and the pre-condition is not met, the update data operation will not be repeated.
For example, as we just said, the operation of "increasing the balance of teacher X's account by 1 million yuan" does not satisfy idempotency. We can add a precondition to this operation and change it into: "if the current balance of teacher X's account is 5 million yuan, add the balance to 1 million yuan," this operation has idempotence.
Corresponding to the use in the message queue, you can bring the current balance in the message body when sending the message, and judge whether the current balance in the database is equal to the balance in the message when consuming, and only equal to perform the change operation.
But what if the data we want to update is not a numerical value, or what if we want to do a more complex update operation? What is used as the pre-judgment condition? A more general method is to add a version number attribute to your data. Before changing the data each time, compare whether the version number of the current data is the same as the version number in the message, and refuse to update the data if it is inconsistent. Update the data while the version number + 1, you can also achieve idempotency.
4. Kafka cluster building
In our work, in order to ensure the high availability of the environment and prevent single point, kafka appears as a cluster. Let's build a set of kafka cluster environment.
We download kafka from the official website at http://kafka.apache.org/downloads, to download the version we need. It is recommended to use a stable version.
4.1 set up cluster 1. Download and decompress
Cd / usr/local/src
Wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
Mkdir / data/servers
Tar xzvf kafka_2.11-2.4.0.tgz-C / data/servers/
Cd / data/servers/kafka_2.11-2.4.0
two。 Modify the configuration file
The configuration file $KAFKA_HOME/config/server.properties of kafka mainly modifies the following items
Make sure that the id on each machine is different broker.id=0 configuration server monitoring address listeners=PLAINTEXT://192.168.51.128:9092 kafka log directory log.dirs=/data/servers/kafka_2.11-2.4.0/logs # kafka set the number of partitons connection address num.partitions=1 zookeeper, if you have your own zookeeper cluster, please directly use your own zookeeper cluster zookeeper.connect=192.168.51.128:2181
Because I am doing experiments on this machine, all of them use different ports of the same host, and on-line, they are different machines, you can refer to them.
We use kafka's zookeeper here to start only one node, but in the real production process, we need a zookeeper cluster, just build it ourselves. Later, we will also publish a zookeeper tutorial. Please follow us.
3. Copy 3 configuration files # create the corresponding log directory mkdir-p / data/servers/kafka_2.11-2.4.0/logs/9092mkdir-p / data/servers/kafka_2.11-2.4.0/logs/9093mkdir-p / data/servers/kafka_2.11-2.4.0/logs/9094# copy three configuration files cp server.properties server_9092.properties cp server.properties server_9093.properties cp server.properties server_9094. Properties modifies the id of file # 9092 corresponding to different ports to 0 9093 id is 1, 9094 id is 2 broker.id=0 # configure server monitoring address, write different port listeners=PLAINTEXT://192.168.51.128:9092 # kafka log directory in the impassable configuration file, the directory is also the connection address of the number of partitons set by different port log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092 # kafka, if you have your own zookeeper cluster Please directly use the self-built zookeeper cluster zookeeper.connect=192.168.51.128:2181 to modify the zookeeper configuration file dataDir=/data/servers/zookeeperserver.1=192.168.51.128:2888:3888 and then create the zookeeper myid file echo "1" > / data/servers/zookeeper/myid5. Start zookeeper
Use kafka's built-in zookeeper
Cd / data/servers/kafka_2.11-2.4.0/binzookeeper-server-start.sh-daemon.. / config/zookeeper.properties netstat-anp | grep 2181 launches the operation topic of kafka./kafka-server-start.sh-daemon. / config/server_9092.properties. / kafka-server-start.sh-daemon. / config/server_9093.properties. / kafka-server-start.sh-daemon. / config/server_9094.properties 4.2 kafka
Let's first take a look at the parameters commonly used to create topic.
-- create creates topic
-- delete deletes topic
-- alter modifies the name of topic or the number of partition
-- list View topic
-- describe views the details of topic
-- topic specifies the name of topic
-- zookeeper specifies the connection address of the zookeeper
The parameter hint does not approve of this use.
DEPRECATED, The connection string for
The zookeeper connection in the form host:port. Multiple hosts can be
Given to allow fail-over.
-- bootstrap-server: specify the connection address of kafka. This is recommended.
Prompt message display for parameters
REQUIRED: The Kafka server to connect
To. In case of providing this, a direct Zookeeper connection won't be required.
-- replication-factor: number of backups per partiton
The replication factor for each
Partition in the topic being
Created. If not supplied, defaults
To the cluster default.
-- partitions: specifies the number of partitions for the topic
Example:
Cd / data/servers/kafka_2.11-2.4.0/bin# create topic test1kafka-topics.sh-- create-- bootstrap-server=192.168.51.128:9092,10.231.128.96:9093192.168.51.128:9094-- replication-factor 1-- partitions 1-- topic test1# create topic test2kafka-topics.sh-- create-- bootstrap-server=192.168.51.128:9092,10.231.128.96:9093192.168.51.128:9094-- replication -factor 1-- partitions 1-- topic test2# view topickafka-topics.sh-- list-- bootstrap-server=192.168.51.128:9092,10.231.128.96:9093192.168.51.128:9094 2. Automatically create topic
In our work, if we do not want to manage topic, we can manage it through the configuration file of kafka. We can let kafka create topic automatically. We need to add the following configuration file to our kafka configuration file.
Auto.create.topics.enable=true
If you want to delete topic for the purpose of physical deletion, you also need to configure it.
Delete.topic.enable=true4. Send a message
They can produce messages through client commands.
Let's first take a look at some parameters commonly used in kafka-console-producer.sh.
-- topic specifies topic
-- timeout timeout
-- sync sends messages asynchronously
-- broker-list official website prompt: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2. This parameter is required
Kafka-console-producer.sh-- broker-list 192.168.51.128 topic test1 9092192.168.51.128VR 9093192.168.51.128 topic test1 consumption message
Let's first look at the parameters of kafka-console-consumer.sh.
-- topic specifies topic
-- group specifies the consumer group
-- from-beginning: specifies to consume from the beginning, and if not, to consume from the current
-- bootstrap-server: the connection address of kafka
Kafka-console-consumer.sh-- bootstrap-server 192.168.51.128-beginning 9092192.168.51.128-- topic test1
4.3 kafka's log
There are two types of kafka logs:
The first kind of log: it is the startup log of our kafka, that is, we troubleshoot problems and check the log of error messages.
The second kind of log is our data log, kafka is that our data is stored in the form of log, and our second kind of log is our partiton and segment.
So let's talk about backup and partitioning.
If we create a partition and a backup, then test should have only one test-0 on three machines or three data directories (the subscript of the partition starts at 0)
If we create N partitions, we will find that on three servers, test_0-n
If we create M backups, we will find that each of test_0 to test_n is M
5. Kafaka API5.1 uses kafaka native api1. Consumers automatically submit:
Define your own producer
Import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;/** * @ ClassName MyKafkaProducer * @ Description TODO * @ Author lingxiangxiang * @ Date 3:37 PM * @ Version 1.0 * * / public class MyKafkaProducer {private org.apache.kafka.clients.producer.KafkaProducer producer Public MyKafkaProducer () {Properties properties = new Properties (); properties.put ("bootstrap.servers", "192.168.51.128VR 9092192.168.51.128properties.put"); properties.put ("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") / / set properties.put for batch delivery ("batch.size", 16384); / / the waiting time for batch delivery is 50ms, which exceeds 50ms. If the batch size is less than 50ms, send properties.put ("linger.ms", 50); this.producer = new org.apache.kafka.clients.producer.KafkaProducer (properties);} public boolean sendMsg () {boolean result = true Try {/ / send normally, test2 is topic, 0 represents partition, 1 represents key, hello world is the content of the message sent final ProducerRecord record = new ProducerRecord ("test2", 0,1, "hello world"); producer.send (record) / / call producer.send (record, new Callback () {@ Override public void onCompletion (RecordMetadata recordMetadata, Exception e) {System.out.println (recordMetadata.topic ()); System.out.println (callback ()) System.out.println (recordMetadata.offset ());}}); / / define a class producer.send (record, new MyCallback (record));} catch (Exception e) {result = false;} return result }} define the callback function import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.RecordMetadata;/** * @ ClassName MyCallback * @ Description TODO * @ Author lingxiangxiang * @ Date sent successfully by the producer 3:51 PM * @ Version 1.0 * * / public class MyCallback implements Callback {private Object msg; public MyCallback (Object msg) {this.msg = msg } @ Override public void onCompletion (RecordMetadata metadata, Exception e) {System.out.println ("topic =" + metadata.topic ()); System.out.println ("partiton =" + metadata.partition ()); System.out.println ("offset =" + metadata.offset ()); System.out.println (msg);}} producer test class:
In the producer test class, I encountered a pit, that is, in the end, I did not add sleep, that is, there was no problem in how to check my code, but in the end, I couldn't send a successful message, so I added a sleep at last, because the main function main has been executed and exited, but the message has not been sent, so it needs to wait. Of course, you may not encounter such a problem in the production environment, hehe, the code is as follows
Import static java.lang.Thread.sleep;/** * @ ClassName MyKafkaProducerTest * @ Description TODO * @ Author lingxiangxiang * @ Date 3:46 PM * @ Version 1.0 * * / public class MyKafkaProducerTest {public static void main (String [] args) throws InterruptedException {MyKafkaProducer producer = new MyKafkaProducer (); boolean result = producer.sendMsg (); System.out.println ("send msg" + result); sleep (1000) }} Consumer Class: import kafka.utils.ShutdownableThread;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Collections;import java.util.Properties;/** * @ ClassName MyKafkaConsumer * @ Description TODO * @ Author lingxiangxiang * @ Date 4:12 PM * @ Version 1.0 * * / public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer consumer Public MyKafkaConsumer () {super ("KafkaConsumerTest", false); Properties properties = new Properties (); properties.put ("bootstrap.servers", "192.168.51.128 properties.put 9092192.168.51.128)); properties.put (" group.id "," mygroup "); properties.put (" enable.auto.commit "," true ") Properties.put ("auto.commit.interval.ms", "1000"); properties.put ("session.timeout.ms", "30000"); properties.put ("heartbeat.interval.ms", "10000"); properties.put ("auto.offset.reset", "earliest"); properties.put ("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer") Properties.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer (properties);} @ Override public void doWork () {consumer.subscribe (Arrays.asList ("test2")); ConsumerRecordsrecords = consumer.poll (1000); for (ConsumerRecord record: records) {System.out.println ("topic =" + record.topic ()) System.out.println ("partition =" + record.partition ()); System.out.println ("key =" + record.key ()); System.out.println ("value =" + record.value ()) The consumer's test class: / * @ ClassName MyConsumerTest * @ Description TODO * @ Author lingxiangxiang * @ Date 4:23 PM * @ Version 1.0 * * / public class MyConsumerTest {public static void main (String [] args) {MyKafkaConsumer consumer = new MyKafkaConsumer (); consumer.start (); System.out.println ("=");}}
two。 Consumers synchronize manual submission
The previous consumers consume messages in broker by automatically submitting offset, but automatic submission may lead to repeated consumption of messages. Therefore, in the production environment, it is often necessary to manually submit the offset to solve the problem of repeated consumption.
Manual submission can be divided into synchronous submission, asynchronous submission, and asynchronous joint submission. These commit methods are simply different from the doWork () method, and their constructors are the same. So let's first modify the constructor based on the previous consumer class, and then implement three different submission methods.
The synchronous submission method is that the consumer submits the offset to the broker and waits for the broker to respond successfully. If no response is received, it will be resubmitted until the response is obtained. In the process of waiting, consumers are blocked. It seriously affects the throughput of consumers.
Modify the previous MyKafkaConsumer.java, mainly to modify the following configuration import kafka.utils.ShutdownableThread;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Collections;import java.util.Properties / * @ ClassName MyKafkaConsumer * @ Description TODO * @ Author lingxiangxiang * @ Date 4:12 PM * @ Version 1.0 * * / public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer consumer; public MyKafkaConsumer () {super ("KafkaConsumerTest", false); Properties properties = new Properties (); properties.put ("bootstrap.servers", "192.168.51.128 Date 9092192.168.51.128Switzerland 9093192.168.51.128Switzerland 9094") Properties.put ("group.id", "mygroup"); / / to manually submit properties.put ("enable.auto.commit", "false"); / / properties.put ("auto.commit.interval.ms", "1000"); properties.put ("session.timeout.ms", "30000"); properties.put ("heartbeat.interval.ms", "10000") Properties.put ("auto.offset.reset", "earliest"); properties.put ("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer (properties);} @ Override public void doWork () {consumer.subscribe ("test2")) ConsumerRecordsrecords = consumer.poll (1000); for (ConsumerRecord record: records) {System.out.println ("topic =" + record.topic ()); System.out.println ("partition =" + record.partition ()); System.out.println ("key =" + record.key ()); System.out.println ("value =" + record.value ()) / / manually synchronize submission of consumer.commitSync ();} 3. Consumers submit manually asynchronously
Manual synchronous submission requires waiting for a successful response from broker, which is too inefficient and affects the throughput of consumers. Asynchronous commit means that the consumer does not have to wait for a successful response after submitting the offset to the broker, so it increases the consumer's throughput.
Import kafka.utils.ShutdownableThread;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Collections;import java.util.Properties;/** * @ ClassName MyKafkaConsumer * @ Description TODO * @ Author lingxiangxiang * @ Date 4:12 PM * @ Version 1.0 * * / public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer consumer Public MyKafkaConsumer () {super ("KafkaConsumerTest", false); Properties properties = new Properties (); properties.put ("bootstrap.servers", "192.168.51.128false 9092192.168.51.128false"); properties.put ("group.id", "mygroup") / / to manually submit properties.put ("enable.auto.commit", "false"); / / properties.put ("auto.commit.interval.ms", "1000"); properties.put ("session.timeout.ms", "30000"); properties.put ("heartbeat.interval.ms", "10000"); properties.put ("auto.offset.reset", "earliest") Properties.put ("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer (properties);} @ Override public void doWork () {consumer.subscribe (Arrays.asList ("test2")); ConsumerRecordsrecords = consumer.poll (1000) For (ConsumerRecord record: records) {System.out.println ("topic =" + record.topic ()); System.out.println ("partition =" + record.partition ()); System.out.println ("key =" + record.key ()); System.out.println ("value =" + record.value ()) / / manual synchronous submission / / consumer.commitSync (); / manual asynchronous submission / / consumer.commitAsync (); / / manual asynchronous submission with callback public consumer.commitAsync ((offsets, e)-> {if (e! = null) {System.out.println ("number of submissions, offsets =" + offsets) System.out.println ("exception =" + e);}});} 5.2 springboot uses kafka
Now in everyone's development process, many of them use springboot projects, which have been launched directly. If you still use native API, it is a bit low, ah, so how does kafka and springboot combine?
Maven configuration org.apache.kafka kafka-clients 2.1.1 add profile
Add the following configuration information to application.properties:
Kafka connection address
Spring.kafka.bootstrap-servers = 192.168.51.128Viru 9092 Magi 10.231.128.96Vera 9093192.168.51.128Vera 9094
Producer spring.kafka.producer.acks = 0spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.retries = 3spring.kafka.producer.batch-size = 4096spring.kafka.producer.buffer-memory = 33554432spring.kafka.producer.compression-type = gzip Consumer spring.kafka.consumer.group-id = mygroupspring.kafka.consumer.auto-commit-interval = 5000spring.kafka.consumer .heartbeat-interval = 3000spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.auto-offset-reset = earliestspring.kafka.consumer.enable-auto-commit = true# listenner Identify the number of consumer monitors spring.kafka.listener.concurrency = name of consumer topic kafka.topic1 = topic1 producer import lombok.extern.slf4j.Slf4j Import org.springframework.beans.factory.annotation.Value;import org.springframework.kafka.core.KafkaTemplate;@Service@Slf4jpublic class MyKafkaProducerServiceImpl implements MyKafkaProducerService {@ Resource private KafkaTemplate kafkaTemplate; / / read configuration file @ Value ("${kafka.topic1}") private String topic; @ Override public void sendKafka () {kafkaTemplate.send (topic, "hell world");}} Consumer
@ Component
@ Slf4jpublic class MyKafkaConsumer {@ KafkaListener (topics = "${kafka.topic1}") public void listen (ConsumerRecord record) {Optional kafkaMessage = Optional.ofNullable (record.value ()); if (kafkaMessage.isPresent ()) {log.info ("- record =" + record) Log.info ("- message =" + kafkaMessage.get ());}
After reading the above, do you have a general understanding of Apache's Kafka? If you want to know more about the content of the article, welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.