In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "the creation method of Dstream". In the daily operation, I believe that many people have doubts about the creation method of Dstream. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "the creation method of Dstream". Next, please follow the editor to study!
1. RDD queue (understand)
During testing, you can create a DStream by using ssc.queueStream (queueOfRDDs), and each RDD pushed to this queue is treated as a DStream.
Case
Object SparkStreaming02_RDDQueue {def main (args: Array [String]): Unit = {/ / create profile object val conf: SparkConf = new SparkConf () .setAppName ("SparkStreaming02_RDDQueue") .setMaster ("local [*]") / / create SparkStreaming context object val ssc: StreamingContext = new StreamingContext (conf,Seconds (3)) / / create queue Inside is RDD val rddQueue: mutable. Queue [RDD [int]] = new mutable. Queue [RDD [int]] () / / collect data from the queue Get DS val queueDS: InputDStream [Int] = ssc.queueStream (rddQueue,false) / / process the collected data val resDS: DStream [(Int, Int)] = queueDS.map ((_, 1)). ReduceByKey (_ + _) / / print the result resDS.print () / / start the collector ssc.start () / / cycle to create RDD And put the created RDD into the queue for (I restart (s "Error connecting to $host:$port") E) return} finally {onStop ()} override def onStart (): Unit = {new Thread ("Socket Receiver") {setDaemon (true) override def run () {receive ()}. Start ()} override def onStop (): Unit = {synchronized {if (socket! = null) {socket.close () socket = null 3. Kafka data source (important) 1. Version selection
2. Kafka 0-8 Receive mode
Requirements: read data from Kafka through SparkStreaming, and do a simple calculation of the read data, and finally print it to the console.
Import dependency
Org.apache.spark spark-streaming-kafka-0-899 2.11 2.1.1
Write code 0-8Receive mode, offset maintenance in zk, after the program stops, continue to produce data, start the program again, can still continue to consume. Can be viewed through get / consumers/bigdata/offsets/ topic name / partition number
Object Spark04_ReceiverAPI {def main (args: Array [String]): Unit = {/ / 1. Create SparkConf val sparkConf: SparkConf = new SparkConf (). SetAppName ("Spark04_ReceiverAPI"). SetMaster ("local [*]") / / 2. Create StreamingContext val ssc = new StreamingContext (sparkConf, Seconds (3)) / / 3. Use ReceiverAPI to read Kafka data to create DStream val kafkaDStream: ReceiverInputDStream [(String, String)] = KafkaUtils.createStream (ssc, "hadoop202:2181,hadoop203:2181,hadoop204:2181", "bigdata", / / v indicates the number of Map ("mybak"-> 2)) / / 4. Calculate WordCount and print new KafkaProducer [String,String] (). Send (new ProducerRecord [] () val lineDStream: DStream [String] = kafkaDStream.map (_. _ 2) val word: DStream [String] = lineDStream.flatMap (_ .split (") val wordToOneDStream: DStream [(String, Int)] = word.map ((_, 1) val wordToCountDStream: DStream [(String, Int)] = wordToOneDStream.reduceByKey (_ + _) wordToCountDStream.print () / 5. Open task ssc.start () ssc.awaitTermination ()}} 3. Kafka 0-8 Direct mode
Requirements: read data from Kafka through SparkStreaming, and do a simple calculation of the read data, and finally print it to the console.
Import dependency
Org.apache.spark spark-streaming-kafka-0-899 2.11 2.1.1
Write code (automatically maintain offset1)
Offset is maintained in checkpoint, but the way you get StreamingContext needs to be changed, and messages will be lost in this way
Object Spark05_DirectAPI_Auto01 {def main (args: Array [String]): Unit = {/ / 1. Create SparkConf val sparkConf: SparkConf = new SparkConf (). SetAppName ("Spark05_DirectAPI_Auto01"). SetMaster ("local [*]") / / 2. Create StreamingContext val ssc = new StreamingContext (sparkConf, Seconds (3)) ssc.checkpoint ("D:\\ dev\\ workspace\\ my-bak\\ spark-bak\\ cp") / / 3. Prepare the Kafka parameter val kafkaParams: Map [String, String] = Map [String, String] (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG-> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG-> "bigdata") / / 4. Use DirectAPI to automatically maintain offset to read Kafka data to create DStream val kafkaDStream: InputDStream [(String, String)] = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, Set ("mybak")) / / 5. Calculate WordCount and print kafkaDStream.map (_. _ 2) .flatMap (_ .split (")) .map ((_, 1)) .reduceByKey (_ + _) .print () / / 6. Open task ssc.start () ssc.awaitTermination ()}}
Write code (automatically maintain offset2)
Offset is maintained in checkpoint. Get StreamingContext as getActiveOrCreate.
The disadvantages of this approach are:
Too many checkpoint small files
Checkpoint records the last timestamp, and when it starts again, it will execute the interval cycle again.
Object Spark06_DirectAPI_Auto02 {def main (args: Array [String]): Unit = {val ssc: StreamingContext = StreamingContext.getActiveOrCreate ("D:\\ dev\\ workspace\\ my-bak\\ spark-bak\\ cp", () = > getStreamingContext) ssc.start () ssc.awaitTermination ()} def getStreamingContext: StreamingContext = {/ / 1. Create SparkConf val sparkConf: SparkConf = new SparkConf (). SetAppName ("DirectAPI_Auto01"). SetMaster ("local [*]") / / 2. Create StreamingContext val ssc = new StreamingContext (sparkConf, Seconds (3)) ssc.checkpoint ("D:\\ dev\\ workspace\\ my-bak\\ spark-bak\\ cp") / / 3. Prepare the Kafka parameter val kafkaParams: Map [String, String] = Map [String, String] (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG-> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG-> "bigdata") / / 4. Use DirectAPI to automatically maintain offset to read Kafka data to create DStream val kafkaDStream: InputDStream [(String, String)] = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, Set ("mybak")) / / 5. Calculate WordCount and print kafkaDStream.map (_. _ 2) .flatMap (_ .split (")) .map ((_, 1)) .reduceByKey (_ + _) .print () / / 6. Return the result ssc}}
Write code (maintain offset manually)
Object Spark07_DirectAPI_Handler {def main (args: Array [String]): Unit = {/ / 1. Create SparkConf val conf: SparkConf = new SparkConf (). SetAppName ("DirectAPI_Handler"). SetMaster ("local [*]") / / 2. Create StreamingContext val ssc = new StreamingContext (conf, Seconds (3)) / / 3. Create the Kafka parameter val kafkaParams: Map [String, String] = Map [String, String] (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG-> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG-> "bigdata") / / 4. Get the location information of the last consumption val fromOffsets: Map [TopicAndPartition, Long] = Map [TopicAndPartition, Long] (TopicAndPartition ("mybak", 0)-> 13L, TopicAndPartition ("mybak", 1)-> 10L) / / 5. Use DirectAPI to maintain offset manually val kafakDStream: InputDStream [String] = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder, String] (ssc, kafkaParams, fromOffsets, (m: MessageAndMetadata [String, String]) = > m.message () / / 6. Define offset var offsetRanges = Array.empty [OffsetRange] / / 7 where the empty collection is used to store data. Save the currently consumed offset kafakDStream.transform {rdd = > offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges rdd}. ForeachRDD {rdd = > for (o "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG-> "bigdata191122", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG-> "org.apache.kafka.common.serialization.StringDeserializer" ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG-> classOf [StringDeserializer]) / / 4. Consumer Kafka data creation stream val kafkaDStream: InputDStream [ConsumerRecord [String, String]] = KafkaUtils.createDirectStream [String, String] (ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe [String, String] (Set ("test"), kafkaParmas)) / / 5. Calculate WordCount and print kafkaDStream.map (_ .value ()) .flatMap (_ .split (")) .map ((_, 1)) .reduceByKey (_ + _) .print () / / 6. Start the task ssc.start () ssc.awaitTermination ()}} 5. Summary of consumption Kafka data pattern
0-8 ReceiverAPI:
1) Special Executor reads data at different speeds
2) transfer data across machines, WAL
3) Executor reads data through multiple threads. If you want to increase parallelism, you need multiple stream union.
4) offset is stored in Zookeeper
0-8 DirectAPI:
1) Executor reads the data and calculates
2) increase the number of Executor to increase the parallelism of consumption
3) offset Stora
A) CheckPoint (create StreamingContext by getActiveOrCreate)
B) Manual maintenance (transactional storage systems)
C) getting offset must be in the operator called first: offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges
0-10 DirectAPI:
1) Executor reads the data and calculates
2) increase the number of Executor to increase the parallelism of consumption
3) offset Stora
In the theme of i.a.__consumer_offsets system
Ii.b. Manual maintenance (transactional storage system)
At this point, the study of "the method of creating Dstream" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.