In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces how springboot integrates kafka, which has a certain reference value. Interested friends can refer to it. I hope you can learn a lot after reading this article.
Why use kafka?
Cut peaks and fill valleys. Buffer the upstream and downstream instantaneous burst traffic, protect the "fragile" downstream system from collapse, and avoid causing the full-link service "avalanche".
The system is decoupled. The loose coupling between sender and receiver simplifies the development cost to a certain extent and reduces the unnecessary direct dependence between systems.
Asynchronous communication: message queuing allows users to queue a message without immediately processing it.
Recoverability: even if a process that processes messages dies, messages added to the queue can still be processed after the system is restored.
Business scenario
Some non-core logic of synchronous business processes is not very time-critical and can be executed asynchronously by decoupling.
System log collection, collection and synchronization to kafka, generally using ELK combination play method
Some big data platforms for data transfer between systems
Basic architecture
Kafka runs on a cluster of one or more servers, and partitions can be distributed across cluster nodes
1. Producer production message, sent to Broker
2. The Broker with Leader status receives the message and writes it to the corresponding topic. Within a partition, these messages are indexed and stored with a timestamp
3. After receiving the Broker of the Leader status, send the Broker of the Follow status as a backup copy
4. Consumer consumer processes can subscribe from partitions and consume messages
Common terms
Broker . Responsible for receiving and processing requests sent by the client, as well as persisting messages. Although multiple Broker processes can run on the same machine, it is more common to run different Broker on different machines
Topic: Topic. The topic is the logical container that carries the message, which is often used to distinguish the specific business in practice.
Partition: Partition. An orderly and immutable sequence of messages. There can be multiple partitions under each topic.
Message: the message here refers to the main object processed by Kafka.
Message displacement: Offset. Represents the location information of each message in the partition, which is a monotonously increasing and constant value.
Copy: Replica. The same message in Kafka can be copied to multiple places to provide data redundancy, which are called replicas. Copies are also divided into leader copies and follower copies, each with different roles. Each partition can be configured with multiple copies for high availability. N copies of a partition must be on N different Broker.
Leader: the "master" copy of multiple copies per partition, the object that the producer sends data, and the object of consumer consumption data, are all Leader.
Follower: a "slave" copy of multiple replicas per partition, synchronizing data from Leader in real time and keeping synchronization with Leader data. When a Leader fails, a Follower also becomes the new Leader.
Producer: Producer. An application that publishes new messages to a topic.
Consumer: Consumer. An application that subscribes to new messages from a topic.
Consumer shift: Consumer Offset. It indicates the progress of consumer consumption, and each consumer has his own consumer displacement. Offset is saved in the internal topic on the broker side, not in clients.
Consumer group: Consumer Group. A group of multiple consumer instances that consume multiple partitions at the same time to achieve high throughput.
Rebalance: Rebalance. The process by which other consumer instances automatically reassign subscription topic partitions after one consumer instance in the consumer group dies. Rebalance is an important means for Kafka consumers to achieve high availability.
Code demonstration
External dependencies:
Add Kafka dependencies to pom.xml:
Org.springframework.kafka spring-kafka
Because the version number specified by spring-boot-starter-parent is 2.1.5. Spring boot will uniformly manage the version number of the external framework, and the version introduced by spring-kafka is 2.2.6.RELEASE
Configuration file:
Configure the parameters of Kafka in the configuration file application.yaml, as shown below:
When Spring: kafka: bootstrap-servers: localhost:9092 producer: retries: 3 # producer fails to send Number of retries batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer # serialization handling class for producer message key and message value value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: tomge-consumer-group # default consumer group id auto-offset-reset: earliest enable-auto-commit: true auto- Commit-interval: 100 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
The corresponding configuration class org.springframework.boot.autoconfigure.kafka.KafkaProperties is used to initialize kafka-related bean instance objects and register them in the spring container.
Send a message:
As an integrated framework supporting rapid development, Spring Boot also provides a number of template tool classes named-Template for message communication. For Kafka, this utility class is KafkaTemplate.
KafkaTemplate provides a series of send methods to send messages. A typical send method definition is shown in the following code:
Public ListenableFuture send (String topic, @ Nullable V data) {. Omit}
The production side provides a restful interface to simulate sending a message to create a new user.
@ GetMapping ("/ add_user") public Object add () {try {Long id = Long.valueOf (new Random (). NextInt (1000); User user = User.builder (). Id (id) .username ("TomGE"). Age (29). Address ("Shanghai"). Build (); ListenableFuture listenableFuture = kafkaTemplate.send (addUserTopic, JSON.toJSONString (user)) / / A callback method is provided to monitor the subsequent processing of messages (new ListenableFutureCallback () {@ Override public void onFailure (Throwable throwable) {System.out.println ("failed to send messages," + throwable.getMessage ()). } @ Override public void onSuccess (SendResult sendResult) {/ / the topic String topic to which the message is sent = sendResult.getRecordMetadata () .topic (); / / the partition to which the message is sent int partition = sendResult.getRecordMetadata () .partition () / / offset long offset = sendResult.getRecordMetadata () .offset () in the partition; System.out.println ("message sent successfully, topc:%s, partition:% s, offset:%s", topic, partition, offset);}}); return "message sent successfully" } catch (Exception e) {e.printStackTrace (); return "message delivery failed";}}
In fact, the Kafka used by developers allows automatic creation of Topic by default. The default number of partitions when creating Topic is 1, and the default number of partitions can be modified through the num.partitions=1 in the server.properties file. The automatic creation feature is usually turned off in the production environment, and the Topic needs to be created by the operation and maintenance personnel first.
Consumption message:
In Kafka, messages are pushed to consumers through the server, and when consumers of Kafka consume messages, they need to provide a Listener to listen to a Topic to get messages, which is the only way for Kafka to consume messages.
Define a consumer class, add @ KafkaListener annotation to the method that handles the specific message business logic, and configure the topic to be consumed. The code is as follows:
@ Componentpublic class UserConsumer {@ KafkaListener (topics = "add_user") public void receiveMesage (String content) {System.out.println ("consumption message:" + content);}}
Is it very simple to add kafka dependencies, use KafkaTemplate and @ KafkaListener annotations to complete the production and consumption of messages? in fact, SpringBoot has done a lot of work behind it.
Thank you for reading this article carefully. I hope the article "how springboot integrates kafka" shared by the editor will be helpful to everyone. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.