Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Getting started with 13.spark streaming

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Brief introduction

   Spark Streaming is an extension of Spark core API, which can realize real-time streaming data processing with scalability, high throughput and fault-tolerant mechanism. Supports a variety of data sources, such as Kafka, Flume, Twitter, ZeroMQ, Kinesis, and TCP sockets.

   can use advanced functions such as map, reduce, join, and window to handle complex algorithms such as machine learning and graph computing. Finally, the processing results can be stored in the file system, database and dashboard.

Architecture and Abstract

   Spark Streaming receives real-time stream data and splits it into batches of data according to a certain time interval, then processes these batches of data through Spark Engine, and finally gets batches of result data after processing.

   Spark Streaming provides an abstract concept called DStream (discretized stream). DStream consists of a series of RDD representing continuous data streams in each batch. DStream can be created from input sources such as Kafka, Flume, Kinesis, and so on, or can be generated using advanced operator operations from other DStream.

All operations of    DStream are actually operations on all RDD in DStream. For example, in the word count case, the flatMap conversion operation is applied to each line RDD to generate the word RDD.

Architecture

Receiver:Spark Streaming's built-in data stream receiver or custom sink is used to receive a steady stream of data streams from a data source.

CurrentBuffer: used to cache the data stream received by the input stream receiver.

BlockIntervalTimer: a timer used to encapsulate cached data streams in CurrentBuffer into Block and put them into blocksForPushing queues.

BlocksForPushing: Block to be processed

BlockPushingThread: this thread takes an Block from the BlocksForPushing queue every 100ms and stores it on the storage system and caches it in the ReceivedBlockQueue queue.

Block Batch:Block batches, according to the batch interval, get a batch of Block from the ReceivedBlockQueue queue.

JobGenerator:Job generator, used to generate a Job for each batch of Block. DStream conversion operation

   DStream conversion operations are divided into two types: stateless (stateless) and stateful (stateful).

In a stateless transition operation, the processing of each batch does not depend on the data of the previous batch.

Stateful transition operations need to use data from previous batches or intermediate results to calculate data for the current batch. Stateless transition operation

The    stateless transformation operation is to apply a simple RDD transformation operation to each batch, transforming each RDD in the DStream.

Commonly used stateless transition operations

The function name functions the scala sample map () applies the specified function to each element in the DStream, and returns the DStreamds.map (x = > xan1) flatMap () consisting of the elements output by each element. Apply the specified function to each element in the DStream Returns DStreamds.flatMap (x = > x.split ("")) filter consisting of iterators output by each element returns DStreamds.filter (x = > x _ ray _ 1) repartition () which consists of filtered elements in a given DStream, changes the number of partitions of DStream ds.repartition (10) reduceByKey aggregates records with the same key in each batch ds.reduceByKey ((x _ ray) = > x _ ray) groupByKey groups records in each batch according to key ds.groupByKey ()

Use map () and reduceByKey () to count logs based on IP addresses in each time interval.

Scala// assumes that ApacheAccessingLog is the utility class used to parse entries from the Apache log val accessLogDStream = logData.map (line = > ApacheAccessingLog.parseFromLogLine (line)) val ipDStream = accessLogsDStream.map (entry = > (entry.getIpAddress (), 1) val ipCountsDStream = ipDStream.reduceByKey (xjiny) = > xcorrecy) java// assumes that ApacheAccessingLog is the utility class static final class IpTuple implements PairFunction {public Tuple2 call (ApacheAccessLog log) {return new Tuple2 (log.getIpAddress (), 1L) used to parse entries from the Apache log) }} JavaDStream accessLogDStream = logData.map (new ParseFromLogLine ()); JavaPairDStream ipDStream = accessLogDStream.mapToPair (new IpTuple ()); JavaPairDStream (String, Long) ipCountsDStream = ipDStream.reduceByKey (new LongSumReducer ())

Use the IP address as the key to connect the data requested to be counted and the amount of data transferred

Scalaval ipBytesDStream = accessLogsDStream.map (entry = > (entry.getIpAddress (), entry.getContentSize () val ipBytesSumDStream = ipBytesDStream.reduceByKey ((XMagne y) = > xonomy) val ipBytesRequestCountDStream = ipCountsDStream.join (ipBytesSumDStream) javaJavaPairDStream ipBytesDStream = accessLogsDStream.mapToPair (new IpContentTuple ()); JavaPairDStream ipBytesSumDStream = ipBytesDStream.reduceByKey (new LongSumReducer ()); JavaPairDStream ipBytesRequestCountDStream = ipCountsDStream.join (ipBytesSumDStream)

Use the transform () operation to implement a custom conversion operation to extract outliers from the log record.

Scalaval outlierDStream = accessLogsDStream.transform {rdd = > extractOutliers (rdd)} javaJavaPairDStream ipRawDStream = accessLogsDStream.transform (new Function () {public JavaPairRDD call (JavaRDD rdd) {return extractOutliers (rdd);}}); stateful transition operation

The stateful transition operation of    DStream is an operation that tracks data across time intervals, and data from previous batches is also used to calculate results in new batches.

There are two main types of    stateful transition operations: sliding window and updateStateByKey (). The former operates with a time phase as a sliding window, while the latter is used to track the state changes of each key.

Set checkpoint

   stateful transition operations require checkpointing mechanisms to be turned on in StreamingContext to ensure fault tolerance.

Ssc.checkpoint ("hdfs://...") Brief introduction of window-based conversion operation

   window-based operations calculate the results of the entire window by integrating the results of multiple batches over a longer interval than the StreamingContext batch.

The window-based conversion operation of    requires two parameters, namely window length and sliding time. Both are integral multiples of the batch interval.

Window length: controls the data of the most recent windowDuration/batchInterval batches calculated at a time.

Sliding step size: the default value is equal to the batch interval. Used to control the interval at which the new DStream is calculated. Simple case

Use window () to count windows

Scalaval accessLogsWindow = accessLogsDStream.window (Seconds (30), Seconds (10)) val windowCounts = accessLogsWindow.count () javaJavaDStream accessLogsWindow = accessLogsDStream.window (Durations.seconds (30), Duration.seconds (10)); JavaDStream windowCounts = accessLogsWindow.count ()

Count the number of visits to each IP address using reduceByKeyAndWindow

Scalaval ipDStream = accessLogsDStream.map (logEntry = > (logEntry.getIpAddress (), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow ({(XMaginy) = > xquoy}, / / add the element {(xmemy) = > xmery} in the new batch entering the window, / / remove the element Seconds (30) in the old batch that leaves the window / / window length Seconds (10) / / sliding step) javaclass ExtractIp extends PairFunction {public Tuple2 call (ApacheAccessLog entry) {return new Tuple2 (entry.getIpAddress (), 1L) }} class AddLongs extends Function2 () {public Long call (Long v1, Long v2) {return v1 + v2;}} class SubtractLongs extends Function2 () {public Long call (Long v1, Long v2) {return v1-v2;}} JavaPairDStream ipAddressPairDStream = accessLogsDStream.mapToPair (new ExtractIp ()) JavaPairDStream ipCountDStream = ipAddressPairDStream.reduceByKeyAndWindow (new AddLongs (), / / add the element new SubtractLongs () in the new batch entering the window, / / remove the element Durations.seconds (30) in the old batch leaving the window, / / window length Durations.seconds (10) / / sliding step)

Count windows using countByWindow and countByValueAndWindow

Scala

Val ipDStream = accessLogsDStream.map {entry = > entry.getIpAddress ()} val ipAddre***equestCount = ipDStream.countByValueAndWindow (Seconds (30), Seconds (10)) val requestCount = accessLogsDStream.countByWindow (Seconds (30), Seconds (10)) javaJavaDStream ip = accessLogsDStream.map (new Function () {public String call (ApacheAccessLog entry) {return entry.getIpAddress ();}}); JavaDStream requestCount = accessLogsDStream.countByWindow (Dirations.seconds (30), Durations.seconds (10)) JavaPairDStream ipAddre***equestCount = ip.countByValueAndWindow (Dirations.seconds (30), Durations.seconds (10)); introduction to updateStateByKey conversion operation

   updateStateByKey provides the ability to maintain state across batches for DStream in the form of key-value pairs.

   updateStateByKey provides a update (events, oldState) function that receives events related to a key and the corresponding state before the key, and returns the new state corresponding to the key.

Events: list of events received in the current batch oldState: an optional state object stored in Option; if a key does not have its previous state, this value is empty. NewState: returned by the function and also exists in the form of Option; you can return an empty Option to delete the state. Simple case

   uses updateStateByKey () to track the count of HTTP response codes in log messages.

Scaladef updateRunningSum (values: Seq [Long], state: Option [Long]) = {Some (state.getOrElse (0L) + values.size)} val responseCodeDStream = accessLogsDStream.map (log = > (log.getResponseCode (), 1L)) val responseCodeCountDStream = responseCodeDStream.updateStateByKey (updateRunningSum _) javaclass UpdateRunningSum implements Function2 {public Optional call (List nums, Optional current) {long sum = current.or (0L); return Optional.of (sum + nums.size ());}} JavaPairDStream responseCodeCountDStream = accessLogsDStream.mapToPair (new PairFunction () {public Tuple2 call (ApacheAccessLog log) {return new Tuple2 (log.getResponseCode (), 1L);}}) .updateStateByKey (new UpdateRunningSum ()); DStream Action Operation

The    DStream action operation is the same as the RDD action operation. For example, save DStream as a SequenceFile file.

Scalaval writableIpAddre***equestCount = ipAddre***equestCount.map {(ip, count) = >

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report