Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use low-level consumer of kafka

2025-03-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "how to use low-level consumer of kafka". In daily operation, I believe many people have doubts about how to use low-level consumer of kafka. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to use low-level consumer of kafka". Next, please follow the editor to study!

When will you use this interface?

1) Read a message multiple times

2) Consume only a subset of the partitions in a topic in a process

3) Manage transactions to make sure a message is processed once and only once

2. Steps to use SimpleConsumer:

1) Find an active Broker and find out which Broker is the leader for your topic and partition

2) Determine who the replica Brokers are for your topic and partition

3) Build the request defining what data you are interested in

4) Fetch the data, Identify and recover from leader changes

First of all, you must know which topic and which partition to read.

Then, find the broker leader responsible for the partition, and then find the broker that holds the copy of the partition

In addition, write your own request and fetch the data

Finally, note that you need to identify and handle changes in broker leader

3. The code is as follows:

Package kafkatest.kakfademo

Import java.nio.ByteBuffer

Import java.util.ArrayList

Import java.util.Collections

Import java.util.HashMap

Import java.util.List

Import java.util.Map

Import kafka.api.FetchRequest

Import kafka.api.FetchRequestBuilder

Import kafka.api.PartitionOffsetRequestInfo

Import kafka.cluster.BrokerEndPoint

Import kafka.common.ErrorMapping

Import kafka.common.TopicAndPartition

Import kafka.javaapi.FetchResponse

Import kafka.javaapi.OffsetResponse

Import kafka.javaapi.PartitionMetadata

Import kafka.javaapi.TopicMetadata

Import kafka.javaapi.TopicMetadataRequest

Import kafka.javaapi.consumer.SimpleConsumer

Import kafka.message.MessageAndOffset

Public class SimpleExample {

Public static void main (String args []) {

SimpleExample example = new SimpleExample ()

Long maxReads = 10

String topicName = "test"

Int partition = 0

List seeds = new ArrayList ()

Seeds.add ("hadoop0")

Int port = 9092

Try {

Example.run (maxReads, topicName, partition, seeds, port)

} catch (Exception e) {

System.out.println ("Oops:" + e)

E.printStackTrace ()

}

}

Private List m_replicaBrokers = new ArrayList ()

Public SimpleExample () {

M_replicaBrokers = new ArrayList ()

}

Public void run (long a_maxReads, String a_topic, int a_partition

List a_seedBrokers, int a_port) throws Exception {

/ / find the meta data about the topic and partition we are interested in

/ /

PartitionMetadata metadata = findLeader (a_seedBrokers, a_port, a_topic

A_partition)

If (metadata = = null) {

System.out

.println ("Can't find metadata for Topic and Partition. Exiting")

Return

}

If (metadata.leader ()) = = null) {

System.out

.println ("Can't find Leader for Topic and Partition. Exiting")

Return

}

String leadBroker = metadata.leader () .host ()

String clientName = "Client_" + a_topic + "_" + a_partition

SimpleConsumer consumer = new SimpleConsumer (leadBroker, a_port

100000, 64 * 1024, clientName)

Long readOffset = getLastOffset (consumer, a_topic, a_partition

Kafka.api.OffsetRequest.EarliestTime (), clientName)

Int numErrors = 0

While (a_maxReads > 0) {

If (consumer = = null) {

Consumer = new SimpleConsumer (leadBroker, a_port, 100000

64 * 1024, clientName)

}

/ / Note: this fetchSize of 100000 might need to be increased if

/ / large batches are written to Kafka

FetchRequest req = new FetchRequestBuilder () .clientId (clientName)

.addFetch (a_topic, a_partition, readOffset, 100000) .build ()

FetchResponse fetchResponse = consumer.fetch (req)

If (fetchResponse.hasError ()) {

NumErrors++

/ / Something went wrong!

Short code = fetchResponse.errorCode (a_topic, a_partition)

System.out.println ("Error fetching data from the Broker:"

+ leadBroker + "Reason:" + code)

If (numErrors > 5)

Break

If (code = = ErrorMapping.OffsetOutOfRangeCode ()) {

/ / We asked for an invalid offset. For simple case ask for

/ / the last element to reset

ReadOffset = getLastOffset (consumer, a_topic, a_partition

Kafka.api.OffsetRequest.LatestTime (), clientName)

Continue

}

Consumer.close ()

Consumer = null

LeadBroker = findNewLeader (leadBroker, a_topic, a_partition

A_port)

Continue

}

NumErrors = 0

Long numRead = 0

For (MessageAndOffset messageAndOffset: fetchResponse.messageSet (

A_topic, a_partition)) {

Long currentOffset = messageAndOffset.offset ()

If (currentOffset < readOffset) {

System.out.println ("Found an old offset:" + currentOffset

+ "Expecting:" + readOffset)

Continue

}

ReadOffset = messageAndOffset.nextOffset ()

ByteBuffer payload = messageAndOffset.message () .payload ()

Byte [] bytes = new byte [payload.limit ()]

Payload.get (bytes)

System.out.println (String.valueOf (messageAndOffset.offset ()

+ ":" + new String (bytes, "UTF-8"))

NumRead++

AxiomaxReads-

}

If (numRead = = 0) {

Try {

Thread.sleep (1000)

} catch (InterruptedException ie) {

}

}

}

If (consumer! = null)

Consumer.close ()

}

Public static long getLastOffset (SimpleConsumer consumer, String topic

Int partition, long whichTime, String clientName) {

TopicAndPartition topicAndPartition = new TopicAndPartition (topic

Partition)

Map requestInfo = new HashMap ()

RequestInfo.put (topicAndPartition, new PartitionOffsetRequestInfo (

WhichTime, 1))

Kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest (

RequestInfo, kafka.api.OffsetRequest.CurrentVersion ()

ClientName)

OffsetResponse response = consumer.getOffsetsBefore (request)

If (response.hasError ()) {

System.out

.println ("Error fetching data Offset Data the Broker. Reason:"

+ response.errorCode (topic, partition))

Return 0

}

Long [] offsets = response.offsets (topic, partition)

Return offsets [0]

}

Private String findNewLeader (String a_oldLeader, String a_topic

Int a_partition, int a_port) throws Exception {

For (int I = 0; I < 3; iTunes +) {

Boolean goToSleep = false

PartitionMetadata metadata = findLeader (m_replicaBrokers, a_port

A_topic, a_partition)

If (metadata = = null) {

GoToSleep = true

} else if (metadata.leader ()) = = null) {

GoToSleep = true

} else if (a_oldLeader.equalsIgnoreCase (metadata.leader (). Host ())

& & I = = 0) {

/ / first time through if the leader hasn't changed give

/ / ZooKeeper a second to recover

/ / second time, assume the broker did recover before failover

/ / or it was a non-Broker issue

/ /

GoToSleep = true

} else {

Return metadata.leader () .host ()

}

If (goToSleep) {

Try {

Thread.sleep (1000)

} catch (InterruptedException ie) {

}

}

}

System.out

.println ("Unable to find new leader after Broker failure. Exiting")

Throw new Exception (

"Unable to find new leader after Broker failure Exiting")

}

Private PartitionMetadata findLeader (List a_seedBrokers

Int a_port, String a_topic, int a_partition) {

PartitionMetadata returnMetaData = null

Loop: for (String seed: a_seedBrokers) {

SimpleConsumer consumer = null

Try {

Consumer = new SimpleConsumer (seed, a_port, 100000, 64 * 1024

"leaderLookup")

List topics = Collections.singletonList (a_topic)

TopicMetadataRequest req = new TopicMetadataRequest (topics)

Kafka.javaapi.TopicMetadataResponse resp = consumer.send (req)

List metaData = resp.topicsMetadata ()

For (TopicMetadata item: metaData) {

For (PartitionMetadata part: item.partitionsMetadata ()) {

If (part.partitionId ()) = = a_partition) {

ReturnMetaData = part

Break loop

}

}

}

} catch (Exception e) {

System.out.println ("Error communicating with Broker [" + seed)

+ "] to find Leader for [" + a_topic + ","

+ a_partition + "] Reason:" + e)

} finally {

If (consumer! = null)

Consumer.close ()

}

}

If (returnMetaData! = null) {

M_replicaBrokers.clear ()

For (BrokerEndPoint replica: returnMetaData.replicas ()) {

M_replicaBrokers.add (replica.host ())

}

}

Return returnMetaData

}

}

IV. The consumption results are as follows:

At this point, the study on "how to use the low-level consumer of kafka" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report