In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "how to use low-level consumer of kafka". In daily operation, I believe many people have doubts about how to use low-level consumer of kafka. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to use low-level consumer of kafka". Next, please follow the editor to study!
When will you use this interface?
1) Read a message multiple times
2) Consume only a subset of the partitions in a topic in a process
3) Manage transactions to make sure a message is processed once and only once
2. Steps to use SimpleConsumer:
1) Find an active Broker and find out which Broker is the leader for your topic and partition
2) Determine who the replica Brokers are for your topic and partition
3) Build the request defining what data you are interested in
4) Fetch the data, Identify and recover from leader changes
First of all, you must know which topic and which partition to read.
Then, find the broker leader responsible for the partition, and then find the broker that holds the copy of the partition
In addition, write your own request and fetch the data
Finally, note that you need to identify and handle changes in broker leader
3. The code is as follows:
Package kafkatest.kakfademo
Import java.nio.ByteBuffer
Import java.util.ArrayList
Import java.util.Collections
Import java.util.HashMap
Import java.util.List
Import java.util.Map
Import kafka.api.FetchRequest
Import kafka.api.FetchRequestBuilder
Import kafka.api.PartitionOffsetRequestInfo
Import kafka.cluster.BrokerEndPoint
Import kafka.common.ErrorMapping
Import kafka.common.TopicAndPartition
Import kafka.javaapi.FetchResponse
Import kafka.javaapi.OffsetResponse
Import kafka.javaapi.PartitionMetadata
Import kafka.javaapi.TopicMetadata
Import kafka.javaapi.TopicMetadataRequest
Import kafka.javaapi.consumer.SimpleConsumer
Import kafka.message.MessageAndOffset
Public class SimpleExample {
Public static void main (String args []) {
SimpleExample example = new SimpleExample ()
Long maxReads = 10
String topicName = "test"
Int partition = 0
List seeds = new ArrayList ()
Seeds.add ("hadoop0")
Int port = 9092
Try {
Example.run (maxReads, topicName, partition, seeds, port)
} catch (Exception e) {
System.out.println ("Oops:" + e)
E.printStackTrace ()
}
}
Private List m_replicaBrokers = new ArrayList ()
Public SimpleExample () {
M_replicaBrokers = new ArrayList ()
}
Public void run (long a_maxReads, String a_topic, int a_partition
List a_seedBrokers, int a_port) throws Exception {
/ / find the meta data about the topic and partition we are interested in
/ /
PartitionMetadata metadata = findLeader (a_seedBrokers, a_port, a_topic
A_partition)
If (metadata = = null) {
System.out
.println ("Can't find metadata for Topic and Partition. Exiting")
Return
}
If (metadata.leader ()) = = null) {
System.out
.println ("Can't find Leader for Topic and Partition. Exiting")
Return
}
String leadBroker = metadata.leader () .host ()
String clientName = "Client_" + a_topic + "_" + a_partition
SimpleConsumer consumer = new SimpleConsumer (leadBroker, a_port
100000, 64 * 1024, clientName)
Long readOffset = getLastOffset (consumer, a_topic, a_partition
Kafka.api.OffsetRequest.EarliestTime (), clientName)
Int numErrors = 0
While (a_maxReads > 0) {
If (consumer = = null) {
Consumer = new SimpleConsumer (leadBroker, a_port, 100000
64 * 1024, clientName)
}
/ / Note: this fetchSize of 100000 might need to be increased if
/ / large batches are written to Kafka
FetchRequest req = new FetchRequestBuilder () .clientId (clientName)
.addFetch (a_topic, a_partition, readOffset, 100000) .build ()
FetchResponse fetchResponse = consumer.fetch (req)
If (fetchResponse.hasError ()) {
NumErrors++
/ / Something went wrong!
Short code = fetchResponse.errorCode (a_topic, a_partition)
System.out.println ("Error fetching data from the Broker:"
+ leadBroker + "Reason:" + code)
If (numErrors > 5)
Break
If (code = = ErrorMapping.OffsetOutOfRangeCode ()) {
/ / We asked for an invalid offset. For simple case ask for
/ / the last element to reset
ReadOffset = getLastOffset (consumer, a_topic, a_partition
Kafka.api.OffsetRequest.LatestTime (), clientName)
Continue
}
Consumer.close ()
Consumer = null
LeadBroker = findNewLeader (leadBroker, a_topic, a_partition
A_port)
Continue
}
NumErrors = 0
Long numRead = 0
For (MessageAndOffset messageAndOffset: fetchResponse.messageSet (
A_topic, a_partition)) {
Long currentOffset = messageAndOffset.offset ()
If (currentOffset < readOffset) {
System.out.println ("Found an old offset:" + currentOffset
+ "Expecting:" + readOffset)
Continue
}
ReadOffset = messageAndOffset.nextOffset ()
ByteBuffer payload = messageAndOffset.message () .payload ()
Byte [] bytes = new byte [payload.limit ()]
Payload.get (bytes)
System.out.println (String.valueOf (messageAndOffset.offset ()
+ ":" + new String (bytes, "UTF-8"))
NumRead++
AxiomaxReads-
}
If (numRead = = 0) {
Try {
Thread.sleep (1000)
} catch (InterruptedException ie) {
}
}
}
If (consumer! = null)
Consumer.close ()
}
Public static long getLastOffset (SimpleConsumer consumer, String topic
Int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition (topic
Partition)
Map requestInfo = new HashMap ()
RequestInfo.put (topicAndPartition, new PartitionOffsetRequestInfo (
WhichTime, 1))
Kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest (
RequestInfo, kafka.api.OffsetRequest.CurrentVersion ()
ClientName)
OffsetResponse response = consumer.getOffsetsBefore (request)
If (response.hasError ()) {
System.out
.println ("Error fetching data Offset Data the Broker. Reason:"
+ response.errorCode (topic, partition))
Return 0
}
Long [] offsets = response.offsets (topic, partition)
Return offsets [0]
}
Private String findNewLeader (String a_oldLeader, String a_topic
Int a_partition, int a_port) throws Exception {
For (int I = 0; I < 3; iTunes +) {
Boolean goToSleep = false
PartitionMetadata metadata = findLeader (m_replicaBrokers, a_port
A_topic, a_partition)
If (metadata = = null) {
GoToSleep = true
} else if (metadata.leader ()) = = null) {
GoToSleep = true
} else if (a_oldLeader.equalsIgnoreCase (metadata.leader (). Host ())
& & I = = 0) {
/ / first time through if the leader hasn't changed give
/ / ZooKeeper a second to recover
/ / second time, assume the broker did recover before failover
/ / or it was a non-Broker issue
/ /
GoToSleep = true
} else {
Return metadata.leader () .host ()
}
If (goToSleep) {
Try {
Thread.sleep (1000)
} catch (InterruptedException ie) {
}
}
}
System.out
.println ("Unable to find new leader after Broker failure. Exiting")
Throw new Exception (
"Unable to find new leader after Broker failure Exiting")
}
Private PartitionMetadata findLeader (List a_seedBrokers
Int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null
Loop: for (String seed: a_seedBrokers) {
SimpleConsumer consumer = null
Try {
Consumer = new SimpleConsumer (seed, a_port, 100000, 64 * 1024
"leaderLookup")
List topics = Collections.singletonList (a_topic)
TopicMetadataRequest req = new TopicMetadataRequest (topics)
Kafka.javaapi.TopicMetadataResponse resp = consumer.send (req)
List metaData = resp.topicsMetadata ()
For (TopicMetadata item: metaData) {
For (PartitionMetadata part: item.partitionsMetadata ()) {
If (part.partitionId ()) = = a_partition) {
ReturnMetaData = part
Break loop
}
}
}
} catch (Exception e) {
System.out.println ("Error communicating with Broker [" + seed)
+ "] to find Leader for [" + a_topic + ","
+ a_partition + "] Reason:" + e)
} finally {
If (consumer! = null)
Consumer.close ()
}
}
If (returnMetaData! = null) {
M_replicaBrokers.clear ()
For (BrokerEndPoint replica: returnMetaData.replicas ()) {
M_replicaBrokers.add (replica.host ())
}
}
Return returnMetaData
}
}
IV. The consumption results are as follows:
At this point, the study on "how to use the low-level consumer of kafka" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.