Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the advanced functional usage of spring-kafka

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article shows you what is the use of the advanced functions of spring-kafka. It is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

Preface

Kafka is a message queuing product based on Topic partitions design, which can achieve very high message sending performance. Spring created a project Spring-kafka that encapsulates Apache's Kafka-client for rapid integration of kafka in Spring projects. In addition to simply sending and receiving messages, Spring-kafka also provides many advanced features, so let's explore these uses one by one.

Simple integration

Introduce dependency

Org.springframework.kafkaspring-kafka2.2.6.RELEASE

Add configuration

Spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

Test send and receive

/ * * @ author: kl @ kailing.pub * @ date: 2019-5-30 * / @ SpringBootApplication @ RestController public class Application {private final Logger logger = LoggerFactory.getLogger (Application.class); public static void main (String [] args) {SpringApplication.run (Application.class, args);} @ Autowired private KafkaTemplate template GetMapping ("/ send/ {input}") public void sendFoo (@ PathVariable String input) {this.template.send ("topic_input", input);} @ KafkaListener (id = "webGroup", topics = "topic_input") public void listen (String input) {logger.info ("input value: {}", input);}}

After launching the application, enter: http://localhost:8080/send/kl in the browser. You can see the log output on the console: input value: "kl". The use of the foundation is as simple as that. Inject a KafkaTemplate when sending a message and add an @ KafkaListener annotation when receiving the message.

Spring-kafka-test embedded Kafka Server

However, the above code can be launched successfully, provided that you already have the service environment of Kafka Server. We know that Kafka is built by Scala + Zookeeper, and you can download the deployment package from the official website and deploy it locally. However, I want to tell you that in order to simplify the development process to verify Kafka-related functions, Spring-Kafka-Test has encapsulated Kafka-test to provide annotated one-click to open Kafka Server function, which is also super easy to use. The Kafka for all test cases later in this article is provided using this embedded service.

Introduce dependency

Org.springframework.kafkaspring-kafka-test2.2.6.RELEASEtest

Start the service

Let's use the Junit test case to start a Kafka Server service directly with four Broker nodes.

@ RunWith (SpringRunner.class) @ SpringBootTest (classes = ApplicationTests.class) @ EmbeddedKafka (count = 4 System.in.read ports = {9092 System.in.read 9093) public class ApplicationTests {@ Testpublic void contextLoads () throws IOException {System.in.read ();}}

As above: all you need is an annotation @ EmbeddedKafka to start a fully functional Kafka service. Isn't that cool? If only comments are written without parameters by default, a random port Broker is created, and the specific port and some default configuration items are output in the startup log. However, these configuration items in the Kafka installation package configuration file can be configured in the annotation parameters. The configurable parameters in the @ EmbeddedKafka annotation are described in detail below:

Number of value:broker nodes

Count: just like value, it is also the number of configured broker nodes.

ControlledShutdown: controls the shutdown switch, which is mainly used to reduce the unavailable time of the Partition on this Broker when the Broker shuts down unexpectedly

Kafka is a highly available service with multi-Broker architecture. One Topic corresponds to multiple partition, and one Partition can have multiple replicas of Replication. These Replication replicas are stored in multiple Broker for high availability. However, although there are multiple partition replica sets, there is only one current working replica set, and the default is that the first allocated replica set [preferred copy] is Leader, which is responsible for writing and reading data. When we upgrade the Broker or update the Broker configuration, we need to restart the service. At this point, we need to transfer the partition to the available Broker. The following three situations are involved

Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community

Directly close Broker: when Broker is closed, the Broker cluster will re-elect the master operation and select a new Broker as the Partition Leader. During the election, the Partition on this Broker will be temporarily unavailable.

Enable controlledShutdown: when Broker is turned off, Broker itself will first try to transfer the Leader role to another available Broker

Use the command line tool: use bin/kafka-preferred-replica-election.sh to manually trigger PartitionLeader role transfer

Ports: list of ports, which is an array. Corresponding to the count parameter, if there are several Broker, you need to correspond to several port numbers.

BrokerProperties:Broker parameter setting, which is an array structure, supports setting Broker parameters as follows:

@ EmbeddedKafka (brokerProperties = {"log.index.interval.bytes = 4096", "num.io.threads = 8"})

KerPropertiesLocation:Broker parameter file settings

The function is the same as the brokerProperties above, except that Kafka Broker has as many as 182configurable parameters, and the configuration like this is definitely not the best solution, so it provides the ability to load local configuration files, such as:

@ EmbeddedKafka (brokerPropertiesLocation = "classpath:application.properties")

By default, if Topic does not exist when sending messages using KafkaTemplate, a new Topic will be created. The default number of partitions and copies will be set to the following Broker parameter

Create a new Topic

Num.partitions = 1 # default number of Topic partitions num.replica.fetchers = 1 # default number of copies

Create a Topic when the program starts

/ * * @ author: kl @ kailing.pub * @ date: 2019-5-31 * / @ Configuration public class KafkaConfig {@ Bean public KafkaAdmin admin (KafkaProperties properties) {KafkaAdmin admin = new KafkaAdmin (properties.buildAdminProperties ()); admin.setFatalIfBrokerNotAvailable (true); return admin;} @ Bean public NewTopic topic2 () {return new NewTopic ("topic-kl", 1, (short) 1);}}

If Kafka Broker is supported (1.0.0 or later), if you find that the number of Partition of the existing Topic is less than the set number of Partition, a new Partition partition will be added. Several common uses for KafkaAdmin are as follows:

SetFatalIfBrokerNotAvailable (true): the default value is False, which does not affect the initialization of the Spring context when Broker is not available. If you think that Broker is not available and affect normal business, set this value to True.

SetAutoCreate (false): the default value is True, that is, the instantiated NewTopic object is automatically created when Kafka is instantiated.

Initialize (): when setAutoCreate is false, we need to call the initialize () method of admin displayed by our program to initialize the NewTopic object

Create in the code logic

Sometimes we don't know how many Partition a Topic needs when we start the program, but we can't use the default setting of Broker directly, so we need to use the AdminClient of Kafka-Client to deal with it. The KafkaAdmin encapsulated by Spring above is also handled using AdminClient. Such as:

@ Autowired private KafkaProperties properties; @ Test public void testCreateToipc () {AdminClient client = AdminClient.create (properties.buildAdminProperties ()); if (client! = null) {try {Collection newnewTopics = new ArrayList (1); newTopics.add (new NewTopic ("topic-kl", 1, (short) 1)); client.createTopics (newTopics);} catch (Throwable e) {e.printStackTrace () } finally {client.close ();}

Ps: other ways to create Topic

The above ways to create Topic are provided that your spring boot version is above 2.x, because the spring-kafka2.x version only supports the spring boot2.x version. These api are not available in the 1.x version. The following is an additional way to create a Topic through Kafka_2.10 in a program

Introduce dependency

Org.apache.kafka kafka_2.10 0.8.2.2

Create in api mode

@ Test public void testCreateTopic () throws Exception {ZkClient zkClient = new ZkClient ("127.0.0.1 ZkClient zkClient 2181", 3000, 3000, ZKStringSerializer$.MODULE$) String topicName = "topic-kl"; int partitions = 1; int replication = 1; AdminUtils.createTopic (zkClient,topicName,partitions,replication,new Properties ());}

Note that the last construction input parameter of ZkClient is an interface implementation of serialization and deserialization. The blogger tests that if it is left empty, the data of the created Topic on ZK is problematic, and the default Kafka implementation is also very simple, that is, string UTF-8 encoding is done. ZKStringSerializer$ is an interface instance that has been implemented in Kafka, and it is a concomitant object of Scala. You can get an instance by directly calling MODULE$ in Java.

Command creation

@ Test public void testCreateTopic () {String [] options= new String [] {"--create", "--zookeeper", "127.0.0.1 options= new String 2181", "--replication-factor", "3", "--partitions", "3", "--topic", "topic-kl"}; TopicCommand.main (options);}

A probe into the KafkaTemplate of message sending

Get the sending result

Asynchronous acquisition

Template.send (",") .addCallback (new ListenableFutureCallback () {@ Override public void onFailure (Throwable throwable) {. } @ Override public void onSuccess (SendResult objectObjectSendResult) {.... })

Synchronous acquisition

ListenableFuture future = template.send ("topic-kl", "kl"); try {SendResult result = future.get ();} catch (Throwable e) {e.printStackTrace ();}

Kafka transaction message

By default, KafkaTemplate instances automatically generated by Spring-kafka do not have the ability to send transactional messages. The transaction feature needs to be activated using the following configuration. After the transaction is activated, all messages can only be sent in the method in which the transaction occurs, otherwise an exception with no transaction transaction will be thrown

Spring.kafka.producer.transaction-id-prefix=kafka_tx.

When sending messages have transactional requirements, for example, all messages are considered successful only when they are sent successfully, such as the following example: if an exception occurs after the first consumption is sent and before the second message is sent, then the first message that has been sent will also be rolled back. And under normal circumstances, assuming that the message is dormant for a period of time after the message is sent, the consumer will receive the message only after the execution of the transaction method is completed.

@ GetMapping ("/ send/ {input}") public void sendFoo (@ PathVariable String input) {template.executeInTransaction (t-> {t.send ("topic_input", "kl"); if ("error" .equals (input)) {throw new RuntimeException ("failed");} t.send ("topic_input", "ckl"); return true;});}

Similarly, adding @ Transactional annotations to the method takes effect when the transaction feature is activated

@ GetMapping ("/ send/ {input}") @ Transactional (rollbackFor = RuntimeException.class) public void sendFoo (@ PathVariable String input) {template.send ("topic_input", "kl"); if ("error" .equals (input)) {throw new RuntimeException ("failed");} template.send ("topic_input", "ckl");}

The transaction message of Spring-Kafka is based on the transaction message function provided by Kafka. The default configuration of Kafka Broker is set for three or more Broker highly available services. Here, for the sake of simplicity and convenience in testing, we use the embedded service to create a new single Broker Kafka service, and there are some problems: such as

1. If the transaction log replica set is greater than the number of Broker, the following exception will be thrown:

Number of alive brokers'1 'does not meet the required replication factor' 3' for the transactions state topic (configured via 'transaction.state.log.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.

Default Broker configuration transaction.state.log.replication.factor=3, single node can only be adjusted to 1

2. If the number of copies is less than the number of copy synchronization queues, the following exception will be thrown

Number of insync replicas for partition _ _ transaction_state-13 is [1], below required minimum [2]

Default Broker configuration transaction.state.log.min.isr=2, single node can only be adjusted to 1

ReplyingKafkaTemplate gets a reply from the message

ReplyingKafkaTemplate is a subclass of KafkaTemplate. In addition to inheriting the methods of the parent class, a new method sendAndReceive is added to implement the message sending\ reply semantics.

RequestReplyFuture sendAndReceive (ProducerRecord record)

That is, I send a message that can get the result returned to me by the consumer. Just like the traditional RPC interaction. When the sender of the message needs to know the specific consumption of the message consumer, it is very suitable for this api. For example, if you send a batch of data in a message, you need to know what data the consumer has successfully processed. The following code demonstrates how to integrate and use ReplyingKafkaTemplate

/ * * @ author: kl @ kailing.pub * @ date: 2019-5-30 * / @ SpringBootApplication @ RestController public class Application {private final Logger logger = LoggerFactory.getLogger (Application.class); public static void main (String [] args) {SpringApplication.run (Application.class, args);} @ Bean public ConcurrentMessageListenerContainer repliesContainer (ConcurrentKafkaListenerContainerFactory containerFactory) {ConcurrentMessageListenerContainer repliesContainer = containerFactory.createContainer ("replies"); repliesContainer.getContainerProperties () .setGroupId ("repliesGroup") RepliesContainer.setAutoStartup (false); return repliesContainer;} @ Bean public ReplyingKafkaTemplate replyingTemplate (ProducerFactory pf, ConcurrentMessageListenerContainer repliesContainer) {return new ReplyingKafkaTemplate (pf, repliesContainer);} @ Bean public KafkaTemplate kafkaTemplate (ProducerFactory pf) {return new KafkaTemplate (pf);} @ Autowired private ReplyingKafkaTemplate template GetMapping ("/ send/ {input}") @ Transactional (rollbackFor = RuntimeException.class) public void sendFoo (@ PathVariable String input) throws Exception {ProducerRecord record = new ProducerRecord ("topic-kl", input); RequestReplyFuture replyFuture = template.sendAndReceive (record); ConsumerRecord consumerRecord = replyFuture.get (); System.err.println ("Return value:" + consumerRecord.value ()) } @ KafkaListener (id = "webGroup", topics = "topic-kl") @ SendTo public String listen (String input) {logger.info ("input value: {}", input); return "successful";}}

A probe into the usage of Spring-kafka message consumption

The use of @ KafkaListener

The ability of @ KafkaListener to receive messages has been demonstrated in simple integration, but the features of @ KafkaListener are more than that. Other common features with more usage scenarios are as follows:

Displays a message specifying which Topic and partitions are consumed

Set the offset for each Topic and partition initialization

Set the concurrency of consumption threads

Set message exception handler

@ KafkaListener (id = "webGroup", topicPartitions = {@ TopicPartition (topic = "topic1", partitions = {"0", "1"}), @ TopicPartition (topic = "topic2", partitions = "0", partitionOffsets = @ PartitionOffset (partition = "1", initialOffset = "100")}, concurrency = "6", errorHandler = "myErrorHandler") public String listen (String input) {logger.info ("input value: {}") Input) Return "successful";}

Other annotation parameters are easy to understand. ErrorHandler needs to specify that setting this parameter requires the implementation of an interface KafkaListenerErrorHandler. And the configuration in the note is the Name of your custom implementation instance in the context of spring. For example, the above configuration is errorHandler = "myErrorHandler". There should be such an instance in the launch of spring:

/ * * @ author: kl @ kailing.pub * @ date: 2019-5-31 * / @ Service ("myErrorHandler") public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {Logger logger = LoggerFactory.getLogger (getClass ()); @ Override public Object handleError (Message message, ListenerExecutionFailedException exception) {logger.info (message.getPayload (). ToString ()); return null } @ Override public Object handleError (Message message, ListenerExecutionFailedException exception, Consumer consumer) {logger.info (message.getPayload () .toString ()); return null;}}

Manual Ack mode

Manual ACK mode, where the submission offset is controlled by the business logic. For example, when the program consumes, it has this kind of semantics. In particular, if the ack is not confirmed, that is, the offset is not submitted, then you can only use manual Ack mode to do it. To enable manual operation, you need to turn off autocommit first, and then set the consumption mode of consumer.

Spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode=manual

After the above settings are set, when consuming, you only need to join the Acknowledgment in the @ KafkaListener listening method. When ack.acknowledge () is executed, the offset is submitted.

@ KafkaListener (id = "webGroup", topics = "topic-kl") public String listen (String input, Acknowledgment ack) {logger.info ("input value: {}", input); if ("kl" .equals (input)) {ack.acknowledge ();} return "successful";}

@ KafkaListener annotation listener lifecycle

The life cycle of @ KafkaListener annotated listeners can be controlled, and by default, the parameter autoStartup of @ KafkaListener = "true". That is, automatically start consumption, but you can also interfere with KafkaListenerEndpointRegistry to interfere with his life cycle. KafkaListenerEndpointRegistry has three action methods such as start (), pause (), resume () / start, stop, and continue. The following code demonstrates this feature in detail.

/ * * @ author: kl @ kailing.pub * @ date: 2019-5-30 * / @ SpringBootApplication @ RestController public class Application {private final Logger logger = LoggerFactory.getLogger (Application.class); public static void main (String [] args) {SpringApplication.run (Application.class, args);} @ Autowired private KafkaTemplate template @ GetMapping ("/ send/ {input}") @ Transactional (rollbackFor = RuntimeException.class) public void sendFoo (@ PathVariable String input) throws Exception {ProducerRecord record = new ProducerRecord ("topic-kl", input); template.send (record);} @ Autowired private KafkaListenerEndpointRegistry registry; @ GetMapping ("/ stop/ {listenerID}") public void stop (@ PathVariable String listenerID) {registry.getListenerContainer (listenerID). Pause () } @ GetMapping ("/ resume/ {listenerID}") public void resume (@ PathVariable String listenerID) {registry.getListenerContainer (listenerID) .resume ();} @ GetMapping ("/ start/ {listenerID}") public void start (@ PathVariable String listenerID) {registry.getListenerContainer (listenerID) .start () } @ KafkaListener (id = "webGroup", topics = "topic-kl", autoStartup = "false") public String listen (String input) {logger.info ("input value: {}", input); return "successful";}}

In the above code, listenerID is the id value "webGroup" in @ KafkaListener. After the project is started, execute the following url respectively, and you can see the effect.

Send a message first: http://localhost:8081/send/ckl. Because autoStartup = "false", you will not see a message entering the listener.

Then start the listener: http://localhost:8081/start/webGroup. You can see a message coming in.

The effects of suspending and continuing consumption can be tested using similar methods.

SendTo message forwarding

We have seen @ SendTo in the previous message sending response application. In fact, in addition to sending response semantics, the @ SendTo annotation can also take a parameter to specify the Topic queue to be forwarded. Common scenarios such as multiple processing of a message and inconsistent cup and other resources consumed by different processing can be solved through consumer deployed across different Topic and on different hosts. Such as:

KafkaListener (id = "webGroup", topics = "topic-kl") @ SendTo ("topic-ckl") public String listen (String input) {logger.info ("input value: {}", input); return input + "hello!";} @ KafkaListener (id = "webGroup1", topics = "topic-ckl") public void listen2 (String input) {logger.info ("input value: {}", input);}

Application of message retry and Dead letter queue

In addition to the manual Ack mode mentioned above to control the message offset, Spring-kafka also encapsulates the semantics of the retryable consumer message, that is, it can be set to retry the message when the consumption data is abnormal. And you can set the number of retries to allow the message to enter the scheduled Topic. In the dead letter queue. The following code demonstrates this effect:

@ Autowired private KafkaTemplate template; @ Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory (ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory kafkaConsumerFactory, KafkaTemplate template) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); configurer.configure (factory, kafkaConsumerFactory); / / maximum retry factory.setErrorHandler (new SeekToCurrentErrorHandler (new DeadLetterPublishingRecoverer (template), 3); return factory } @ GetMapping ("/ send/ {input}") public void sendFoo (@ PathVariable String input) {template.send ("topic-kl", input);} @ KafkaListener (id = "webGroup", topics = "topic-kl") public String listen (String input) {logger.info ("input value: {}", input); throw new RuntimeException ("dlt") } @ KafkaListener (id = "dltGroup", topics = "topic-kl.DLT") public void dltListen (String input) {logger.info ("Received from DLT:" + input);}

The above application, listening for a message on topic-kl, will trigger a run-time exception, and then the listener will try to call it three times, when the maximum number of retries is reached. The message will be discarded and retried in the dead letter queue. The rule for the Topic of the dead letter queue is that the business Topic name is + ".DLT". If the name of the above business Topic is "topic-kl", then the Topic of the corresponding dead letter queue is "topic-kl.DLT".

The above content is what is the advanced functional usage of spring-kafka? have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report