Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use message queuing

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces the knowledge of "how to use message queue". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

1 Overview 1.1 basic Concepts 1.1.1 Broker Agent

Published messages are saved in a set of servers called Kafka clusters. Each server in the cluster is a Broker.

1.1.2 Topic theme

By classifying messages through the Topic mechanism, each Topic can be considered as a queue.

1.1.3 Partition Partition

Each Topic can have multiple partitions, designed primarily to improve concurrency. Different Partition under the same Topic can receive messages concurrently, and at the same time, it can also be used for consumers to pull messages concurrently. There are as many concurrency as there are Partition.

On Kafka servers, partitions exist in the form of file directories. In each partition directory, Kafka splits the partition into multiple segment files (LogSegment) according to the configuration size and configuration period, and each segment is composed of three parts:

-Log file: * .log-displacement index file: * .index-time index file: * .timeindex

* .log is used to store the data content of the message itself, * .index stores the location of the message in the file (including the logical offset and physical storage offset of the message), and * .timeindex stores the mapping between the message creation time and the corresponding logical address.

The purpose of splitting a partition into multiple segments is to control the size of the storage file. It can be easily mapped to memory through the operating system mmap mechanism to improve the efficiency of writing and reading. At the same time, another advantage is that when the system wants to clear expired data, it can delete expired segment files directly.

If the location information of each message is saved in index, the size of the index file itself can easily become very large. So Kafka designed index as a sparse index to reduce the size of the index file.

1.1.4 Replication copy

Number of message redundancy. Cannot exceed the number of Broker in the cluster.

1.2 basic Operations 1.2.1 Topic related # create Topic #-avoid using [_] and [.] for topic topic names #-- number of replication-factor copies (cannot exceed the number of broker nodes) #-- number of partitions partitions (concurrent). / bin/kafka-topics.sh-- create\-- topic UserDataQueue\-- replication-factor 3\-- partitions 5\-- bootstrap-server localhost:9092,localhost:9093,localhost:9094# View Topic./bin/kafka-topics.sh-- list\-- bootstrap-server localhost:9092,localhost:9093 Localhost:9094# modify Topic# delete Topic1.2.2 Message related # send message #-topic specify target Topic./bin/kafka-console-producer.sh\-- topic UserDataQueue\-- bootstrap-server localhost:9092,localhost:9093,localhost:9094# pull message #-- from-beginning start from scratch (get existing full data). / bin/kafka-console-consumer.sh\-- topic UserDataQueue\-- bootstrap-server localhost:9092,localhost:9093 Localhost:9094\-- from-beginning2 cluster configuration

Kafka clusters depend on Zookeeper.

2.1 Zookeeper configuration and startup # parameters to be modified # the directory where the snapshot is stored.dataDir=/kafka/zkdata# the port at which the clients will connectclientPort=2182# startup. / bin/zookeeper-server-start.sh-daemon / kafka/zookeeper.properties2.2 Kafka configuration and startup # parameters to be modified # The id of the broker. This must be set to a unique integer for each broker.broker.id=1 # ID in the same cluster must be unique # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName () if not configured.# FORMAT:# listeners= listener_name://host_name:port# EXAMPLE:# listeners=PLAINTEXT:// your.host.name:9092listeners=PLAINTEXT://localhost:9092 # if the same host, the port number cannot be the same # A comma separated list of directories under which to store log fileslog.dirs=/kafka/data01 # log storage directory Need to isolate # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk# server. E.g. "127.0.0.1 3000127.0.0.1" # You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=localhost:2182 # Zookeeper connection address See 2.1 zk configuration # Kafka launch # broker-1./bin/kafka-server-start.sh-daemon / kafka/server01.properties# broker-2./bin/kafka-server-start.sh-daemon / kafka/server02.properties# broker-3./bin/kafka-server-start.sh-daemon / kafka/server03.properties2.3 Zookeeper Visualization

PrettyZoo is a Zookeeper graphical management client based on Apache Curator and JavaFX.

As you can see from the figure below, all three Broker of the cluster start up normally.

2.4 Kafka Visualization and Monitoring 2.4.1 AKHQ

Kafka visualization system for managing Topic,Topic messages, consumer groups, etc., related documents: https://akhq.io/

2.4.2 Kafka Eagle

A simple and efficient monitoring system. Related documentation: http://www.kafka-eagle.org/index.html

Kafka Eagle has its own large monitoring screen.

3 integrate with Spring Boot

Spring Boot version: 2.4.4.

Official example: https://github.com/spring-projects/spring-kafka/tree/main/samples

3.1 Spring Boot3.1.1 add dependency implementation 'org.springframework.kafka:spring-kafka'3.1.2 configuration file spring: kafka: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 producer: client-id: kfk-demo retries: 33.1.3 message send @ RestControllerpublic class IndexController {@ Autowired KafkaTemplate kafkaTemplate; @ GetMapping public String index () {int rdm = new Random () .nextInt (1000) KafkaTemplate.send ("UserDataQueue", new UserData (", rdm)); return" hello world ";} @ GetMapping (" index2 ") public String index2 () {/ / send string mode kafkaTemplate.send (" UserDataTopic ", new Gson () .toJson (" apple ", 23)); return" ok " } 3.1.4 message reception @ Component@KafkaListener (id = "kfk-demo-userdata", topics = {"UserDataQueue"}, groupId = "kfk-demo-group", clientIdPrefix = "kfk-demo-client") public class KfkListener {@ KafkaHandler public void process (@ Payload UserData ud, @ Header (KafkaHeaders.RECEIVED_TOPIC) String topic) Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println (String.format ("topic:% s, partition:% d, userData:% s", topic, partition, ud)) } @ KafkaHandler (isDefault = true) public void process (Object obj) {System.out.println (obj);}} / / receive string mode @ Slf4j@Component@KafkaListener (id = "kfk-demo2", topics = {"UserDataTopic"}) public class KfkUserDataTopicListener {@ KafkaHandler public void process (String userDataStr) {UserData userData = new Gson (). FromJson (userDataStr, UserData.class) Log.info ("username: {}, age: {}", userData.getUsername (), userData.getAge ());} 3.1.5 Topic automatically create @ Configurationpublic class KafkaConfig {@ Bean public NewTopic userDataTopic () {return new NewTopic ("UserDataTopic", 3, (short) 1);}} "how to use message queue" ends here, thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report