In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Summary: this example is an example of SparkStreaming consuming kafka messages. The function is to extract, filter, transform and store the data in HDFS in real time. Example code package com.fwmagic.testimport com.alibaba.fastjson. {JSON, JSONException} import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql. {SaveMode, SparkSession} import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming. {Seconds StreamingContext} import org.slf4j.LoggerFactory/** * created by fwmagic * / object RealtimeEtl {privateval logger = LoggerFactory.getLogger (PVUV.getClass) def main (args: Array [String]): Unit = {System.setProperty ("HADOOP_USER_NAME", "hadoop") val conf = new SparkConf (). SetAppName ("RealtimeEtl"). SetMaster ("local [*]") val spark = SparkSession.builder (). Config (conf). GetOrCreate () val streamContext = new StreamingContext (spark.sparkContext) Seconds (5)) / / Direct connection is equivalent to Topic to direct connection to kafka / / "auto.offset.reset:earliest (restart consumption each time) Latest (read from the latest offset on restart) val kafkaParams = Map [String, Object] ("bootstrap.servers"-> "hd1:9092,hd2:9092,hd3:9092", "key.deserializer"-> classOf [StringDeserializer], "value.deserializer"-> classOf [StringDeserializer], "group.id"-> "fwmagic", "auto.offset.reset"-> "latest" "enable.auto.commit"-> (false: java.lang.Boolean) val topics = Array ("access") val kafkaDStream = KafkaUtils.createDirectStream [String, String] (streamContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe [String, String] (topics, kafkaParams)) / / if SparkStream and Kafka direct connection are used to integrate The generated kafkaDStream must call foreachRDD kafkaDStream.foreachRDD (kafkaRDD = > {if (! kafkaRDD.isEmpty () {/ / to get the offset of the current batch of RDD val offsetRanges = KafkaRDD.asInstanceOf [HasOffsetRanges] .offsetRanges / / take out the data val lines = kafkaRDD.map (_ .value ()) / / convert the lines string into the json object val logBeanRDD = lines. Map (line = > {var logBean: LogBean = null try {logBean = JSON.parseObject (line) ClassOf [LogBean])} catch {case e: JSONException = > {/ / logger record logger.error ("json parsing error! Line: "+ line, e)}} logBean}) / / filter val filteredRDD = logBeanRDD.filter (_! = null) / / convert RDD to DataFrame Because RDD is loaded with case class import spark.implicits._ val df = filteredRDD.toDF () df.show () / / write data to hdfs: hdfs://hd1:9000/360 df.repartition (1) .write.mode (SaveMode.Append) .parquet (args (0)) / / submit the offset of the current batch The offset is finally written to kafka KafkaDStream.asInstanceOf [CanCommitOffsets] .roomAsync (offsetRanges)}) / / start streamContext.start () streamContext.awaitTermination () streamContext.stop ()}} case class LogBean (time:String, longitude:Double, latitude:Double, openid:String, page:String) Evnet_type:Int) dependent environment (pom.xml) 4.0.0 com.fwmagic.360 fwmagic-360 1.0 1.8 2.11.7 2.2.2 2.7.7 UTF-8 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-sql_2.11 ${spark.version} Org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-10 hadoop.version 2.11 ${spark.version} org.apache.hadoop hadoop-client ${hadoop.version} Org.apache.hadoop hadoop-client ${hadoop.version} com.alibaba fastjson 1.2.39 net.alchim31. Maven scala-maven-plugin 3.2.2 org.apache.maven.plugins maven-compiler-plugin 3.5.1 Net.alchim31.maven scala-maven-plugin scala-compile-first process-resources add-source compile Scala-test-compile process-test-resources testCompile Org.apache.maven.plugins maven-compiler-plugin compile compile Org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *: * META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.