In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, the editor will bring you about big data's development of Spark Streaming data processing and writing Kafka. The article is rich in content and analyzes and describes for you from a professional point of view. I hope you can get something after reading this article.
Introduction to 1.Spark Streaming
Spark Streaming reads data from various input sources and groups the data into small batches. New batches are created at uniform intervals. At the beginning of each time interval, a new batch is created, and data received within that interval is added to the batch. At the end of the time interval, the batch stops growing, and the size of the time interval is determined by the parameter of batch interval. The batch interval is generally set between 500 milliseconds and a few seconds, which is configured by the developer. Each input batch forms a RDD that is processed as a Spark job and generates other RDD. The result of the processing can be passed to the external system in batch, and the programming abstraction of Spark Streaming is the discretization stream, that is, DStream. It is a RDD sequence, and each RDD represents data in a time slice in the data stream. In addition, window operations and state transitions are added, and others are similar to batch processing.
The difference from StructedStreaming
StructedStreaming was born after 2.x and is mainly used to deal with structured data. In addition to batch processing with Spark Streaming, it also implements the task of long-running. It is mainly understood that the processing time can be the time of data production rather than the time of receiving data. You can take a closer look at the following table:
Stream processing mode SparkStreamingStructed Streaming execution mode Micro BatchMicro batch / StreamingAPIDstream/streamingContextDataset/DataFrame,SparkSessionJob generation mode Timer timer generation jobTrigger trigger support data source Socket,filstream,kafka,zeroMq,flume,kinesisSocket,filstream,kafka,ratesourceexecuted-basedExecuted based on dstream apiExecuted based on sparksqlTime basedProcessing TimeProcessingTime & eventTImeUIBuilt-inNo
For stream processing, Flink is often used in the production environment, and the data source is now mainly kafka, so this paper deals with structured logs for the scenario of Spark Streaming, that is, ETL stream, and inputs the results into the Kafka queue.
The running process of 2.Spark Sreaming
1. After the client submits the Spark Streaming job, start Driver,Driver and start Receiver,Receiver to receive data from the data source.
2. Each job contains multiple Executor, and each Executor running task,SparkStreaming as a thread contains at least one receiver task (in general)
3. After receiving the data, Receiver generates Block, reports the BlockId to Driver, and then backs it up to another Executor
4. ReceiverTracker maintains the BlockId reported by Reciver
5. Driver starts JobGenerator regularly, generates logical RDD according to the relationship of Dstream, then creates Jobset and gives it to JobScheduler
6. JobScheduler is responsible for scheduling the Jobset, and gives it to DAGScheduler,DAGScheduler to generate the corresponding Stages according to the logical RDD. Each stage contains one or more Task, and the TaskSet is submitted to the TaskSchedule.
7. TaskScheduler is responsible for dispatching Task to Executor and maintaining the running state of Task.
The reading mode of common data sources
Regular data flow:
Val rdd: RDD [String] = ssc.sparkContext.makeRDD (strArray) val wordDStream: ConstantInputDStream [String] = new ConstantInputDStream (ssc, rdd)
Socket:
Val rdd: RDD [String] = ssc.sparkContext.makeRDD (strArray) val wordDStream: ConstantInputDStream [String] = new ConstantInputDStream (ssc, rdd)
RDD queue:
Val queue = new queue [RDD [Int]] () val queueDStream: InputDStream [Int] = ssc.queueStream (queue)
Folder:
Val lines: DStream [String] = ssc.textFileStream ("data/log/") 3. Case description
In production, the common process is as follows: batch process the original Kafka logs, such as request management logs, and use Spark Streaming to change the data cleaning to a certain format and then import it into Kafka. In order to ensure exact-once, offer will save itself, mainly in redis-offset.
Data address: link: https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ extraction code: hell
3.1Raw Kafka log
The sample.log format is as follows:
We put it in the file first to simulate the xx.log in the production environment.
3.2Create two topic and create KafkaProducer to marry you and write data to mytopic1
One for the original log data, and one for the processed log.
Kafka-topics.sh-zookeeper localhost:2181/myKafka-create-topic mytopic1-partitions 1-replication-factor 1kafka-topics.sh-zookeeper localhost:2181/myKafka-create-topic mytopic2-partitions 1-replication-factor 1
Start the redis service:
. / redis-server redis.conf
View mytopic1 data
Kafka-console-consumer.sh-bootstrap-server linux121:9092-topic mytopic1-from-beginning3.3 code implementation
The first part is to process the original file data and write it to mytopic1
Package com.hoult.Streaming.workimport java.util.Propertiesimport org.apache.kafka.clients.producer. {KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializerimport org.apache.log4j. {Level, Logger} import org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf SparkContext} object FilerToKafka {def main (args: Array [String]): Unit = {Logger.getLogger ("org") .setLevel (Level.ERROR) val conf = new SparkConf () .setAppName (this.getClass.getCanonicalName.init) .setMaster ("local [*]") val sc = new SparkContext (conf) / / define kafka producer parameter val lines: RDD [String] = sc.textFile ("data/sample.log") / define kafka producer parameter Number val prop = new Properties () prop.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG "linux121:9092") prop.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf [StringSerializer]) prop.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf [StringSerializer]) / / send the read data to mytopic1 lines.foreachPartition {iter = > / / KafkaProducer val producer = new KafkaProducer [String, String] (prop) iter.foreach {line = > val record = new ProducerRecord [String, String] ("mytopic1") Line) producer.send (record)} producer.close ()}
In the second part, streaming reads the data of mytopic1 and writes it to mytopic2
Package com.hoult.Streaming.workimport java.util.Propertiesimport com.hoult.Streaming.kafka.OffsetsWithRedisUtilsimport org.apache.kafka.clients.consumer. {ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer. {KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization. {StringDeserializer, StringSerializer} import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010. {ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies OffsetRange} import org.apache.spark.streaming. {Seconds, StreamingContext} / * process Kafka data per second Generate structured data Enter another Kafka topic * / object KafkaStreamingETL {val log = Logger.getLogger (this.getClass) def main (args: Array [String]): Unit = {Logger.getLogger ("org") .setLevel (Level.ERROR) val conf = new SparkConf (). SetAppName (this.getClass.getCanonicalName) .setMaster ("local [*]") val ssc = new StreamingContext (conf) Seconds (5)) / / topic val topics to be consumed: Array [String] = Array ("mytopic1") val groupid = "mygroup1" / / define kafka related parameters val kafkaParams: Map [String, Object] = getKafkaConsumerParameters (groupid) / / obtain offset val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis (topics, groupid) / / create DStream val dstream: InputDStream [ConsumerRecord [String, String]] = KafkaUtils.createDirectStream (ssc, LocationStrategies.PreferConsistent) / / read data from kafka ConsumerStrategies.Subscribe [String, String] (topics, kafkaParams, fromOffsets) / / send the converted data to another topic dstream.foreachRDD {rdd = > if (! rdd.isEmpty) {val offsetRanges: Array [OffsetRange] = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges rdd.foreachPartition (process) / / Save offset to Redis OffsetsWithRedisUtils.saveOffsetsToRedis (offsetRanges) Groupid)}} / / start job ssc.start () ssc.awaitTermination ()} def process (iter: Iterator [ConsumerRecord [String, String]]) = {iter.map (line = > parse (line.value)) .filter (! _ .isEmpty) / / .foreach (println) .foreach (line = > sendMsg2Topic (line) "mytopic2")} def parse (text: String): String = {try {val arr = text.replace (",") .split (",") if (arr.length! = 15) return "" arr.mkString ("|")} catch {case e: Exception = > log.error ("error parsing data!" , e) ""}} def getKafkaConsumerParameters (groupid: String): Map [String, Object] = {Map [String, Object] (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG-> "linux121:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG-> classOf [StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG-> classOf [StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG-> groupid ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG-> (false: java.lang.Boolean), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG-> earliest)} def getKafkaProducerParameters (): Properties = {val prop = new Properties () prop.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf [StringSerializer]) prop.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ClassOf [StringSerializer]) prop} def sendMsg2Topic (msg: String, topic: String): Unit = {val producer = new KafkaProducer [String, String] (getKafkaProducerParameters ()) val record = new ProducerRecord [String, String] (topic, msg) producer.send (record)}}
The third part, the tool of reading and writing offset from redis.
Package com.hoult.Streaming.kafkaimport java.utilimport org.apache.kafka.common.TopicPartitionimport org.apache.spark.streaming.kafka010.OffsetRangeimport redis.clients.jedis. {Jedis, JedisPool, JedisPoolConfig} import scala.collection.mutableobject OffsetsWithRedisUtils {/ / define Redis parameter privateval redisHost = "linux121" privateval redisPort = 6379 / / get connections to Redis privateval config = new JedisPoolConfig / / maximum number of idle config.setMaxIdle (5) / / maximum number of connections config.setMaxTotal (10) privateval pool = new JedisPool (config, redisHost) RedisPort, 10000) private def getRedisConnection: Jedis = pool.getResource privateval topicPrefix = "kafka:topic" / / Key:kafka:topic:TopicName:groupid private def getKey (topic: String, groupid: String) = s "$topicPrefix:$topic:$groupid" / / obtain offsets def getOffsetsFromRedis from key (topics: Array [String], groupId: String): Map [TopicPartition, Long] = {val jedis: Jedis = getRedisConnection val offsets: Array [mutable.Map [TopicPartition, Long]] = topics.map {topic = > val key = getKey (topic) GroupId) import scala.collection.JavaConverters._ jedis.hgetAll (key) .asScala .map {case (partition, offset) = > new TopicPartition (topic, partition.toInt)-> offset.toLong}} / / return resource jedis.close () offsets.flatten.toMap} / / Save offsets to Redis def saveOffsetsToRedis (offsets: Array [OffsetRange] GroupId: String): Unit = {/ / get connection val jedis: Jedis = getRedisConnection / / Organization data offsets.map {range = > (range.topic, (range.partition.toString, range.untilOffset.toString))} .groupBy (_. _ 1) .foreach {case (topic, buffer) = > val key: String = getKey (topic, groupId) import scala.collection.JavaConverters._ val maps: util.Map [String String] = buffer.map (_. _ 2). ToMap.asJava / / Save data jedis.hmset (key, maps)} jedis.close ()} def main (args: Array [String]): Unit = {val topics = Array ("mytopic1") val groupid = "mygroup1" val x: Map [TopicPartition, Long] = getOffsetsFromRedis (topics, groupid) x.foreach (println)} 3.4.Demo
Start redis. / redis-server. / redis.conf
Start kafka and create topic sh scripts/kafka.sh start 3.2.Create two topic and create KafkaProducer to marry you and write data to mytopic1
Start FilerToKafka and KafkaStreamingETL
Considerations for 4.spark-streamin
Spark-streaming cannot read the file. When reading the local file, you should note that it will not read the text that already exists in the file, but will only read the data transferred into the folder during the monitoring period. Moreover, this article also requires that the operation that must be changed and saved after the next time of its group is after the moment of the start of the monitoring. If you want to send a text to the monitored folder, you have to open the text first, enter a few spaces randomly, or enter, or other operations that do not affect the content of the text, and then save it. Finally, send it to the folder, so that it can detect the incoming text.
The above is the editor for you to share the development of big data Spark Streaming processing data and writing Kafka, if you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.