Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the project implementation method of Kafka+SparkStream+Hive?

2025-02-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly talks about "what is the project implementation method of Kafka+SparkStream+Hive". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "what is the project implementation method of Kafka+SparkStream+Hive"?

In the current project, the data of the kafka queue needs to be stored in the hive table in real time.

Import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.types. {StringType, StructField, StructType} import org.apache.spark.sql. {DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.streaming. {Durations, Seconds, StreamingContext} import org.apache.spark.streaming.dstream. {DStream, InputDStream} import org.apache.spark.streaming.kafka010. {CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies OffsetRange} import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe def main (args: Array [String]): Unit = {/ / val conf = new SparkConf () / / conf.setMaster ("local") / / conf.setAppName ("SparkStreamingOnKafkaDirect") val spark = SparkSession.builder (). AppName ("test"). Master ("local"). EnableHiveSupport (). GetOrCreate () Val ssc = new StreamingContext (spark.sparkContext Durations.seconds (3)) / / set log level ssc.sparkContext.setLogLevel ("Error") val kafkaParams = Map [String, Object] ("bootstrap.servers"-> "node01:9092,node02:9092,node03:9092", "key.deserializer"-> classOf [StringDeserializer], "value.deserializer"-> classOf [StringDeserializer], "group.id"-> "MyGroupId" / / when there is no initial offset Or the current offset does not exist How to handle data * earliest: automatically reset offset to minimum offset * latest: automatically reset offset to maximum offset [default] * none: no previous offset was found, throw an exception * / "auto.offset.reset"-> "earliest", / * when setting enable.auto.commit to false Will not automatically save the consumer offset. You need to manually submit * / "enable.auto.commit"-> (false: java.lang.Boolean) / / default to true) / / set Kafka's topic val topics = Array ("test") / / create a connection to Kafka after processing data asynchronously. Received data / * what the data received here looks like 2019-09-26 1569487411604 1235 Kafka Register 2019-09-26 1569487411604 1235497 Kafka Register 2019-09-26 1569487414838390778 Flink View * / val stream: InputDStream [ConsumerRecord [String, String]] = KafkaUtils.createDirectStream [String, String] (ssc, PreferConsistent, / / Subscribe [String, String] (topics) KafkaParams) / / process the received data Print out the received key and value The last one put back is valueval transStrem: DStream [String] = stream.map (record = > {val key_value = (record.key) Record.value) println ("receive message key =" + key_value._1) println ("receive message value =" + key_value._2) key_value._2}) / / the dynamically created Schema val structType is used here: StructType = StructType [string ctField] (StructField ("Date_", StringType, nullable = true), StructField ("Timestamp_", StringType, nullable = true) StructField ("UserID", StringType, nullable = true), StructField ("PageID", StringType, nullable = true), StructField ("Channel", StringType, nullable = true), StructField ("Action", StringType, nullable = true)) / / because foreachRDD can get rdd encapsulated in DStream You can explain the rdd in it. / * the code explains: first get a piece of data from foreach, then split the received data with "\ n" in the function map, put it into Row, and dynamically create Schema. Because we need to store the data in hive, we need Schema. Because map is a transformance operator, trigger spark.createDataFrame with rdd.count (): create a DataFrame, because you want to register a temporary table Must use DataFrame frame.createOrReplaceTempView ("T1"): register temporary table spark.sql ("use spark"): use hive's spark library result.write.mode (SaveMode.Append) .saveAsTable ("test_kafka"): put data into test_kafka * / transStrem.foreachRDD (one = > {val rdd: RDD [Row] = one.map ({a = >) Val arr = a.toString.split ("\ t") Row (arr (0). ToString Arr (1). ToString, arr (2). ToString, arr (3). ToString, arr (4). ToString, arr (5) .toString)}) rdd.count () val frame: DataFrame = spark.createDataFrame (rdd) StructType) / / println ("Scheme:" + frame.printSchema ()) frame.createOrReplaceTempView ("T1") / / spark.sql ("select * from T1"). Show () spark.sql ("use spark") spark.sql ("select * from T1"). Write.mode (SaveMode.Append) .saveAsTable ("test_kafka")}) / * * after the above business processing is completed, submit the consumer offset asynchronously. Here, set enable.auto.commit to false, that is, use kafka to manage the consumer offset * Note here, when you get the offset in each batch of topic, you must get it from the stream read from the source. Cannot be obtained from DStream after stream conversion. * / stream.foreachRDD {rdd = > val offsetRanges: Array [OffsetRange] = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges / / some time later, after outputs have completed stream.asInstanceOf [CanCommitOffsets] .roomAsync (offsetRanges)} ssc.start () ssc.awaitTermination () ssc.stop ()} so far, I believe you have a deeper understanding of "what is the project implementation method of Kafka+SparkStream+Hive". Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report