In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "how to use kafka consumer". In daily operation, I believe many people have doubts about how to use kafka consumer. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to use kafka consumer". Next, please follow the editor to study!
Consumer as an important element of kafka, its common operation is not complex, to put it bluntly, nothing more than 2 points, 1, poll the data out, 2, mark the location. We found kafka's java api doc, found several official examples of consumer operations, and analyzed them one by one to see how many types of operations there are.
Automatic Offset Committing
Automatic Offset submission
This example shows a simple application of consumer api based on offset auto-submission.
Properties props = new Properties (); props.put ("bootstrap.servers", "localhost:9092"); props.put ("group.id", "test"); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000") Props.put ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer (props); consumer.subscribe (Arrays.asList ("foo", "bar")); while (true) {ConsumerRecords records = consumer.poll For (ConsumerRecord record: records) System.out.printf ("offset =% d, key =% s, value =% s", record.offset (), record.key (), record.value ());}
Enable.auto.commit means that offset will be automatically committed, and the interval between autocommits will be controlled by auto.commit.interval.ms.
The client connects to the server through the bootstrap.servers configuration, which can be one or more broker. It should be noted that this configuration is only used to allow the client to find our server cluster, without listing all the server addresses in the cluster.
In this example, the client, as a member of test group, subscribes to foo and bar2 topic.
(this direct translation is very poor, I will try to translate it according to my own understanding.) first of all, suppose that the two topic of foo and bar each have three partitions, and we start three processes on our machine, that is to say, in test group, there are three consumer, generally speaking, these three consumer will get a partitions of foo and bar respectively, this is the premise. The three consumer will periodically perform a poll action (implicitly a heartbeat is sent to tell the cluster that I am alive), so that the three consumer will continue to retain their access to their assigned partition. If a consumer fails, that is, the poll will no longer execute, the cluster will assign the partitions to other consumer after a period of time (session.timeout.ms).
The deserialization setting defines how to convert bytes, here we convert both key and value directly to string.
Manual Offset Control
Manual offset control
In addition to periodic auto-commit offset, users can also submit their offset after the message has been consumed.
In some cases, message consumption is associated with some processing logic, and in this way, we can manually submit the offset after the end of the processing logic.
In a nutshell, in this example, we want to consume at least 200 messages at a time and insert them into the database before submitting the offset. If the previous autocommit method is still used, it is possible that the message has been consumed but failed to insert into the database. This can be seen as a simple transaction encapsulation.
However, is there another possibility that there is an error after successfully inserting the database and before committing the offset, or that there is an error in submitting the offset itself, then some messages may be consumed repeatedly.
Personally, I think this paragraph is inexplicable. To put it simply, in this way, the message will not be lost, but there may be repeated consumption.
Properties props = new Properties (); props.put ("bootstrap.servers", "localhost:9092"); props.put ("group.id", "test"); props.put ("enable.auto.commit", "false"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000") Props.put ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer (props); consumer.subscribe (Arrays.asList ("foo", "bar")); final int minBatchSize = 200; List buffer = new ArrayList (); while (true) {ConsumerRecords records = consumer.poll For (ConsumerRecord record: records) {buffer.add (record);} if (buffer.size () > = minBatchSize) {insertIntoDb (buffer); consumer.commitSync (); buffer.clear ();}}
In the above example, we use commitSync to mark all messages; in some cases, we may want to have more precise control over offset, so in the following example, we can control the submission of offset separately in each partition.
Try {while (running) {ConsumerRecords records = consumer.poll (Long.MAX_VALUE); for (TopicPartition partition: records.partitions ()) {List partitionRecords = records.records (partition); for (ConsumerRecord record: partitionRecords) {System.out.println (record.offset () + ":" + record.value ()) } long lastOffset = partitionRecords.get (partitionRecords.size ()-1). Offset (); consumer.commitSync (Collections.singletonMap (partition, new OffsetAndMetadata (lastOffset + 1));} finally {consumer.close ();}
Note: the offset submitted should be next message, so the submission needs to be + 1. 1 based on the current last item.
Manual Partition Assignment
Manual partition assignment
In the previous example, we subscribed to a topic and then asked kafka to fairly distribute the different partitions in the topic within a consumer group. So, in some cases, we want to be able to specify the allocation relationship of the partitions.
If a process manages the state related to partition locally, it only needs to get the partition associated with it.
If a process itself has high availability, there is no need for kafka to detect errors and reassign partition, because the consumer process will restart on another device.
To use this mode, you can use the assign method instead of subscribe, specifying a partitions list.
String topic = "foo"; TopicPartition partition0 = new TopicPartition (topic, 0); TopicPartition partition1 = new TopicPartition (topic, 1); consumer.assign (Arrays.asList (partition0, partition1))
Once allocated, you can call poll in the loop to consume the message, as in the previous example. Manual partition allocation does not require group coordination, so when the consumption process fails, it does not trigger the reallocation of partition, and each consumer works independently, even if it belongs to the same group as other consumers. In order to avoid offset submission conflicts, in this case, we usually need to ensure that each consumer uses its own group id.
It should be noted that manual partition allocation and dynamic partition allocation through subscribe cannot be mixed.
At this point, the study on "how to use kafka consumer" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.