Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to manage offset after spark streaming window aggregation operation

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces "how to manage offset after spark streaming window aggregation operation". In daily operation, I believe that many people have doubts about how to manage offset after spark streaming window aggregation operation. Xiaobian consulted all kinds of data and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubt of "how to manage offset after spark streaming window aggregation operation". Next, please follow the editor to study!

For spark streaming, the offset cannot be managed after the window operation, because the offset is stored in HasOffsetRanges. Only kafkaRDD inherits him, so if we convert KafkaRDD, we will no longer be able to get offset.

There is also the management of offset behind the window, which is also very troublesome, mainly because the window operation will contain several batches of RDD data, so to submit offset we only need to submit the offset of the most recent batch of kafkaRDD. How to get it?

For spark, the code execution location is divided into driver and executor, we want to get the offset on the driver side, submit the offset after the result is processed, or manage the offset directly with the result.

When it comes to driver-side execution, we just need to use transform to get the offset information, and then use commit in the output operation foreachrdd.

Package bigdata.spark.SparkStreaming.kafka010

Import java.util.Properties

Import org.apache.kafka.clients.consumer. {Consumer, ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming. {Seconds, StreamingContext} import org.apache.spark. {SparkConf, TaskContext}

Import scala.collection.JavaConverters._import scala.collection.mutable

Object kafka010NamedRDD {def main (args: Array [String]) {/ / to create a context with a batch time of 2s, add the environment variable val sparkConf = new SparkConf () .setAppName ("DirectKafkaWordCount") .setMaster ("local [*]") val ssc = new StreamingContext (sparkConf, Seconds (5))

Ssc.checkpoint ("/ opt/checkpoint")

/ use broker and topic to create DirectStream val topicsSet = "test" .split (","). ToSet val kafkaParams = Map [String, Object] ("bootstrap.servers"-> "mt-mdh.local:9093", "key.deserializer"-> classOf [StringDeserializer], "value.deserializer"-> classOf [StringDeserializer], "group.id"-> "test4", "auto.offset.reset"-> "latest" "enable.auto.commit"-> (false: java.lang.Boolean)

/ / No API provides offset val messages = KafkaUtils.createDirectStream [String, String] (ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe [String, String] (topicsSet, kafkaParams,getLastOffsets (kafkaParams, topicsSet)) / / var A:mutable.HashMap [String, Array [OffsetRange]] = new mutable.HashMap ()

Val trans = messages.transform (r = > {val offsetRanges = r.asInstanceOf [HasOffsetRanges] .offsetRanges A + = ("rdd1"-> offsetRanges) r}) .countByWindow (Seconds (10), Seconds (5)) trans.foreachRDD (rdd= > {

If (! rdd.isEmpty ()) {val offsetRanges = A.get ("rdd1"). Get// .asInstanceOf [HasOffsetRanges] .offsetRanges

Rdd.foreachPartition {iter = > val o: OffsetRange = offsetRanges (TaskContext.get.partitionId) println (s "${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}

Println (rdd.count ()) println (offsetRanges) / / manually submit offset, provided that automatic submission of messages.asInstanceOf [CanCommitOffsets] .submission Async (offsetRanges) is prohibited.

} / / A.- ("rdd1")}) / / launch flow ssc.start () ssc.awaitTermination ()} def getLastOffsets (kafkaParams: Map [String, Object], topics:Set [String]): Map [TopicPartition, Long] = {val props = new Properties () props.putAll (kafkaParams.asJava) val consumer = new KafkaConsumer [String String] (props) consumer.subscribe (topics.asJavaCollection) paranoidPoll (consumer) val map = consumer.assignment (). AsScala.map {tp = > println (tp+ "-" + consumer.position (tp)) tp-> (consumer.position (tp))}. ToMap println (map) consumer.close () map} def paranoidPoll (c: Consumer [String) String]: Unit = {val msgs = c.poll (0) if (! msgs.isEmpty) {/ / position should be minimum offset per topicpartition msgs.asScala.foldLeft (Map [TopicPartition, Long] ()) {(acc, m) = > val tp = new TopicPartition (m.topic, m.partition) val off = acc.get (tp) .map (o = > Math.min (o) M.offset) .getOrElse (m.offset) acc + (tp-> off)}. Foreach {case (tp, off) = > c.seek (tp, off)}

At this point, the study on "how to manage offset after spark streaming window aggregation operation" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report