In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-11 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The contents of this issue:
Direct Access
Kafka
In the previous installments, we talked about the source code interpretation of Spark Streaming applications with Receiver. But now more and more Spark Streaming applications are developed using No Receivers (Direct Approach). The advantages of No Receiver are as follows:
1. Greater degree of control freedom
two。 Semantic consistency
In fact, the way of No Receivers is more in line with the way we read data and manipulate data. Because Spark itself is a computing framework, it will have data sources at the bottom, if there is no Receivers, we directly manipulate the data sources, which is actually a more natural way. If you want to manipulate the data source, you must have a wrapper, which must be of type RDD. Take direct access to data in Kafka as an example:
Object DirectKafkaWordCount {def main (args: Array [String]) {val Array (brokers, topics) = args / / Create context with 2 second batch interval val sparkConf = new SparkConf (). SetAppName ("DirectKafkaWordCount") val ssc = new StreamingContext (sparkConf, Seconds (2)) / / Create direct kafka stream with brokers and topics val topicsSet = topics.split (","). ToSet val kafkaParams = Map [String, String] ("metadata.broker.list"-> brokers) val messages = KafkaUtils.createDirectStream [String, String, StringDecoder StringDecoder] (ssc, kafkaParams, topicsSet) / / Get the lines, split them into words, count the words and print val lines = messages.map (_. _ 2) val words = lines.flatMap (_ .split (")) val wordCounts = words.map (x = > (x, 1L)). ReduceByKey (_ + _) wordCounts.print () / / Start the computation ssc.start () ssc.awaitTermination ()}}
Spark Streaming encapsulates a KafkaRDD:
/ * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @ param kafkaParams Kafka * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set * with Kafka broker (s) specified in host1:port1,host2:port2 form. * @ param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @ param messageHandler function for translating each message into the desired type * / privatized [Kafka] class KafkaRDD [K: ClassTag, V: ClassTag, U val (host, port) = leaders (TopicAndPartition (o.topic, o.partition)) new KafkaRDDPartition (I, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)} .toArray}. Override def compute (thePart: Partition, context: TaskContext): Iterator [R] = {val part = thePart.asInstanceOf [KafkaRDDPartition] assert (part.fromOffset = part.untilOffset) {assert (item.offset = = part.untilOffset, errOvershotEnd (item.offset, part) finished = true null.asInstanceOf [R]} else {requestOffset = item.nextOffset messageHandler (part.topic, part.partition, item.message) Item.offset, keyDecoder, valueDecoder)}
The connect method of KafkaCluster is called:
Org/apache/spark/streaming/kafka/KafkaCluster.scala def connect (host: String, port: Int): SimpleConsumer = new SimpleConsumer (host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId)
The connect method of KafkaCluster returns a SimpleConsumer, and if you want to customize and control the consumption of kafka messages, you can customize the consumer of Kafka.
Let's look back and see:
KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
What is actually generated:
Def createDirectStream [K: ClassTag, V: ClassTag, KD val uo = untilOffsets (tp) OffsetRange (tp.topic, tp.partition, fo, uo.offset)} val description = offsetRanges.filter {offsetRange = > / / Don't display empty ranges. OffsetRange.fromOffset! = offsetRange.untilOffset} .map {offsetRange = > s "topic: ${offsetRange.topic}\ tpartition: ${offsetRange.partition}\ t" + s "offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"} .mkString ("\ n") / / Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map ("offsets"-> offsetRanges.toList) StreamInputInfo.METADATA_KEY_DESCRIPTION-> description) val inputInfo = StreamInputInfo (id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo (validTime, inputInfo) currentOffsets = untilOffsets.map (kv = > kv._1-> kv._2.offset) Some (rdd)}
The KafkaRDD instance is generated here.
Let's rethink the benefits of Direct access to Spark Streaming applications with Receiver and No Receiver:
1. No caching is required, and there will be no problems such as OOM (data cache is stored in Kafka)
two。 If you use Receiver, Executor is bound to Receiver and Worker, so it is not convenient to do distributed (you can also do it if you configure it). If you use the Direct way, directly RDD operation, the data is distributed on multiple Executor by default, which is naturally distributed.
3. The problem of data consumption, in the actual operation, if you use the Receiver way, if the data operation is too late to consume, after many times of Delay, the Spark Streaming program may crash. If it's the Direct way, it won't.
4. Complete semantic consistency, will not repeat consumption, and will only be consumed once.
Note:
1. DT big data DreamWorks Wechat official account DT_Spark
2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580
3. Sina Weibo: http://www.weibo.com/ilovepains
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.