In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what is the Spark Streaming programming method". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn what the Spark Streaming programming method is.
Stateful Computing updateStateByKey
The previous article introduced common stateless transition operations, such as in the case of WordCount, where the output is only related to the data of the current batch interval and does not depend on the calculation of the previous batch interval. Spark Streaming also provides stateful operations: updateStateByKey, which maintains a state and updates information at the same time. This operation reads the calculation result of the previous batch interval and then applies its result to the current batch interval data statistics. The source code is as follows:
Def updateStateByKey [S: ClassTag] (
UpdateFunc: (Seq [V], Option [S]) = > Option [S]
): DStream [(K, S)] = ssc.withScope {
UpdateStateByKey (updateFunc, defaultPartitioner ())
}
This operator can only be used on the DStream of key-value pairs and needs to receive a state update function updateFunc as a parameter. Use cases are as follows:
Object StateWordCount {
Def main (args: Array [String]): Unit = {
Val conf = new SparkConf ()
.setMaster ("local [2]")
.setAppName (StateWordCount.getClass.getSimpleName)
Val ssc = new StreamingContext (conf, Seconds (5))
/ / checkpoint must be enabled, otherwise an error will be reported.
Ssc.checkpoint ("file:///e:/checkpoint")"
Val lines = ssc.socketTextStream ("localhost", 9999)
/ / status update function
Def updateFunc (newValues: Seq [Int], stateValue: Option [Int]): Option [Int] = {
Var oldvalue = stateValue.getOrElse (0) / / get the status value
/ / traverse the current data and update the status
For (newValue (W1))
.updateStateByKey (updateFunc)
Count.print ()
Ssc.start ()
Ssc.awaitTermination ()
}
}
Scream hint: the above code must turn on checkpoint, or it will report an error:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint ()
Shortcomings of updateStateByKey
Running the above code will find that even if there is no data source input, Spark will update the status for the new batch interval, that is, if there is no data source input, it will continue to output the previous calculation status results.
UpdateStateByKey can return all previous historical data, including new, changed and unchanged, within a specified batch interval. Because updateStateByKey must do checkpoint when using, when the amount of data is too large, checkpoint will occupy a large amount of data, which will affect the performance and the efficiency is not high.
MapwithState
MapwithState is another stateful operator provided by Spark, which overcomes the shortcomings of updateStateByKey and is introduced since Spark 1.5. The source code is as follows
Def mapWithState [StateType: ClassTag, MappedType: ClassTag] (
Spec: StateSpec [K, V, StateType, MappedType]
): MapWithStateDStream [K, V, StateType, MappedType] = {
New MapWithStateDStreamImpl [K, V, StateType, MappedType] (
Self
Spec.asInstanceOf [StateSpecImpl [K, V, StateType, MappedType]]
)
}
MapWithState returns only the value of the key that has changed, but not the Key that has not changed. By doing so, you can only care about the key that has changed, and if there is no data input, you will not return the data of the key that has not changed. In this way, even if the amount of data is large, checkpint will not take up too much storage and be more efficient as updateBykey does (recommended in production environments).
Object StatefulNetworkWordCount {
Def main (args: Array [String]): Unit = {
Val sparkConf = new SparkConf ()
.setAppName ("StatefulNetworkWordCount")
.setMaster ("local [2]")
Val ssc = new StreamingContext (sparkConf, Seconds (5))
Ssc.checkpoint ("file:///e:/checkpoint")"
Val lines = ssc.socketTextStream ("localhost", 9999)
Val words = lines.flatMap (_ .split (""))
Val wordDstream = words.map (x = > (x, 1))
/ * *
* word: current key value
* one: the value corresponding to the current key
* state: status value
, /
Val mappingFunc = (batchTime: Time, word: String, one: Option [Int], state: State [Int]) = > {
Val sum = one.getOrElse (0) + state.getOption.getOrElse (0)
Println (s "> batchTime = $batchTime")
Println (s "> word = $word")
Println (s "> one = $one")
Println (s "> state = $state")
Val output = (word, sum)
State.update (sum) / / Update the status value of the current key
Some (output) / / returns the result
}
/ / build StateSpec through StateSpec.function
Val spec = StateSpec.function (mappingFunc)
Val stateDstream = wordDstream.mapWithState (spec)
StateDstream.print ()
Ssc.start ()
Ssc.awaitTermination ()
}
} time-based window operation
Spark Streaming provides two types of window operations, scrolling windows and sliding windows. The specific analysis is as follows:
Scroll window (Tumbling Windows)
The schematic diagram of the scrolling window is as follows: the scrolling window only needs to pass in a fixed time interval, and the scrolling window does not overlap.
The source code is as follows
/ * *
* @ param windowDuration: the length of the window; must be an integral multiple of batch interval.
, /
Def window (windowDuration: Duration): DStream [T] = window (windowDuration, this.slideDuration sliding window (Sliding Windows)
The schematic diagram of the sliding window is as follows: the sliding window only needs to pass in two parameters, one is the length of the window, the other is the sliding interval. It can be seen that there is overlap in sliding windows.
The source code is as follows
/ * *
* @ param windowDuration window length; must be an integral multiple of batching interval
*
* @ param slideDuration sliding interval; must be an integral multiple of batching interval
, /
Def window (windowDuration: Duration, slideDuration: Duration): DStream [T] = ssc.withScope {
New WindowedDStream (this, windowDuration, slideDuration)
}
Window operation
Window (windowLength, slideInterval)
explain
Based on the windowed batch data generated by the source DStream, a new Dstream is calculated.
Source code
Def window (windowDuration: Duration): DStream [T] = window (windowDuration, this.slideDuration)
Def window (windowDuration: Duration, slideDuration: Duration): DStream [T] = ssc.withScope {
New WindowedDStream (this, windowDuration, slideDuration)
}
CountByWindow (windowLength, slideInterval)
Returns the number of elements in a sliding window
Source code
/ * *
* @ param windowDuration window length, which must be a multiple of batch interval
* @ param slideDuration sliding interval, which must be a multiple of batch interval
* the underlying call is reduceByWindow
, /
Def countByWindow (
WindowDuration: Duration
SlideDuration: Duration): DStream [Long] = ssc.withScope {
This.map (_ = > 1L). ReduceByWindow (_ + _, _-_, windowDuration, slideDuration)
}
explain
ReduceByWindow (func, windowLength, slideInterval)
Returns a single element stream. Use the function func to aggregate the elements of the stream at sliding intervals to create this single-element flow. The function func must satisfy the association law, so that it can support parallel computing.
Source code
Def reduceByWindow (
ReduceFunc: (t, T) = > T
WindowDuration: Duration
SlideDuration: Duration
): DStream [T] = ssc.withScope {
This.reduce (reduceFunc) .window (windowDuration, slideDuration) .reduce (reduceFunc)
}
explain
ReduceByKeyAndWindow (func, windowLength, slideInterval, [numTasks])
When applied to a DStream consisting of a key-value pair, a new DStream consisting of a key-value pair is returned. The value of each key is aggregated by the given reduce function (func function). Note: by default, this operator takes advantage of Spark's default number of concurrent tasks to group. You can specify different tasks by setting the numTasks parameter
Source code
Def reduceByKeyAndWindow (
ReduceFunc: (v, V) = > V
WindowDuration: Duration
SlideDuration: Duration
): DStream [(K, V)] = ssc.withScope {
ReduceByKeyAndWindow (reduceFunc, windowDuration, slideDuration, defaultPartitioner ())
}
explain
ReduceByKeyAndWindow (func, invFunc, windowLength, slideInterval, [numTasks])
The more efficient reduceByKeyAndWindow, the reduce value of each window, is calculated incrementally based on the reduce value of the previous window; it reduce the new data entering the sliding window and reverse reduce the old data that leaves the window. However, it can only be used for reversible reduce functions, that is, those reduce functions all have a corresponding inverse reduce function (passed in with the InvFunc argument) Note: checkpointing must be enabled
Source code
Def reduceByKeyAndWindow (
ReduceFunc: (v, V) = > V
InvReduceFunc: (v, V) = > V
WindowDuration: Duration
SlideDuration: Duration
Partitioner: Partitioner
FilterFunc: (K, V) = > Boolean
): DStream [(K, V)] = ssc.withScope {
Val cleanedReduceFunc = ssc.sc.clean (reduceFunc)
Val cleanedInvReduceFunc = ssc.sc.clean (invReduceFunc)
Val cleanedFilterFunc = if (filterFunc! = null) Some (ssc.sc.clean (filterFunc)) else None
New ReducedWindowedDStream [K, V] (
Self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc
WindowDuration, slideDuration, partitioner
)
}
explain
CountByValueAndWindow (windowLength, slideInterval, [numTasks])
explain
When applied to a DStream consisting of a key-value pair, a new DStream consisting of a key-value pair is returned. The corresponding value of each key is how often they appear in the sliding window.
Source code
Def countByValueAndWindow (
WindowDuration: Duration
SlideDuration: Duration
NumPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering [T] = null)
: DStream [(T, Long)] = ssc.withScope {
This.map ((_, 1L). ReduceByKeyAndWindow (
(X: Long, y: Long) = > x + y
(X: Long, y: Long) = > x-y
WindowDuration
SlideDuration
NumPartitions
(X: (t, Long)) = > x.room2! = 0L
)
}
Use case val lines = ssc.socketTextStream ("localhost", 9999)
Val count = lines.flatMap (_ .split (""))
.map (w = > (w, 1))
.reduceByKeyAndWindow ((W1: Int, w2: Int) = > W1 + w2, Seconds (30), Seconds (10))
.print ()
/ / Scroll window
/ * lines.window (Seconds (20))
.flatMap (_ .split (""))
.map (_, 1))
.reduceByKey (_ + _)
.print () * /
Persistence
Persistence is a way to improve the performance of Spark applications. The second article | Spark core programming Guide explains how to use RDD persistence. In fact, DStream also supports persistence, also using the persist () and cache () methods. Persistence is usually used in stateful operators, such as window operations. By default, although there is no explicit call to the persistence method, the underlying layer has done the persistence operation for the user, as can be seen in the following source code.
Private [streaming]
Class WindowedDStream [T: ClassTag] (
Parent: DStream [T]
_ windowDuration: Duration
_ slideDuration: Duration)
Extends DStream [T] (parent.ssc) {
/ / omit the code.
/ / Persist parent level by default, as those RDDs are going to be obviously reused.
Parent.persist (StorageLevel.MEMORY_ONLY_SER)
}
Note: unlike RDD persistence, DStream's default persistence level serializes data in memory, as you can see from the following source code:
/ * * given a holding plan level * /
Def persist (level: StorageLevel): DStream [T] = {
If (this.isInitialized) {
Throw new UnsupportedOperationException (
"Cannot change storage level of a DStream after streaming context has started")
}
This.storageLevel = level
This
}
/ * * default persistence level is (MEMORY_ONLY_SER) * /
Def persist (): DStream [T] = persist (StorageLevel.MEMORY_ONLY_SER)
Def cache (): DStream [T] = persist ()
From the above source code, you can see that the main difference between persist () and cache () is:
What the cache () method calls at the bottom is the persist () method persist () method, which has two overloaded methods, persist (), which has no parameters. The default is memory perisist (level: StorageLevel). You can choose the same persistence level checkpoint as RDD persistence Checkpoint introduction.
Streaming applications are usually run by 24gam7, so they must be resilient to failures that have nothing to do with the application logic, such as system failures, JVM crashes, and so on. To do this, Spark Streaming needs to checkpoint enough information to a fault-tolerant storage system, such as HDFS, so that it can recover from a failure. There are two types of checkpoints:
Metadata checkpoint
Metadata checkpoints guarantee recovery from Driver program failures. That is, if the node running drive fails, you can view the most recent checkpoin data to get the latest status. Typical application metadata includes:
Configuration: the configuration used to create a streaming application. DStream action: defines the DStream operation of the streaming application. Unfinished batch: the job corresponding to the running batch is queued in the queue, and the data of the batch has not been calculated yet.
Data checkpoint
Save the generated RDD to reliable storage. In some stateful transitions, you need to merge data from multiple batches, so you need to turn on checkpoints. In this type of transformation, the generated RDD depends on the RDD of the previous batch, which causes the length of the dependency chain to increase over time. To avoid an unlimited increase in recovery time (proportional to dependency chains), intermediate RDD with stateful transitions periodically checkpoint to reliable storage (such as HDFS) to break the dependency chain, which functions like persistence and only needs to recover from the current state without recalculating the entire lineage.
In a nutshell, metadata checkpoints are mainly needed when recovering from Driver program failures. If stateful transitions are used, data or RDD checkpoints are required.
When to enable checkpoints
Checkpoints must be enabled for applications with the following types:
Stateful transition operation is used
"if you use updateStateByKey or reduceByKeyAndWindow in your application, you must provide a checkpoint directory to allow periodic RDD checkpoints."
Recover from a failure of the Driver program running the application
Metadata checkpoints are used to recover progress information.
Note that a simple flow application that does not have the previous state transition can run without checkpointing enabled. In this case, recovery from the driver failure will also be partial (some missing but unprocessed data may be lost). This is generally acceptable, and many run Spark Streaming applications in this way. Support for non-Hadoop environments is expected to be improved in the future.
How to configure checkpoints
Checkpointing can be enabled by setting a directory in a fault-tolerant, reliable file system, such as HDFS,S3, and so on, to which checkpoint information can be saved. To enable checkpoints, you need to enable the following two configurations:
StreamingContext.checkpoint (): directory where checkpoints are configured, such as HDFS path dstream.checkpoint (): frequency of checkpoints
The interval between configuring checkpoints is optional. If not, a default value is selected based on the type of DStream. For MapWithStateDStream, the default checkpoint interval is 10 times that of batch interval. For other DStream, the default checkpoint interval is 10s, or the interval between batch interval. It should be noted that the frequency of checkpoint must be an integral multiple of batch interval, otherwise an error will be reported.
In addition, if you want your application to recover from a Driver program failure, you need to create a StreamingContext in the following way:
Def createStreamingContext (conf: SparkConf,checkpointPath: String):
StreamingContext = {
Val ssc = new StreamingContext ()
/ /.... Other code...
Ssc.checkPoint (checkpointDirectory)
Ssc
}
# create a new StreamingContext or get it from the most recent checkpoint
Val context = StreamingContext.getOrCreate (checkpointDirectory
CreateStreamingContext _)
# start
Context.start ()
Context.awaitTermination ()
When the program starts for the first time, it creates a new StreamingContext and then calls start (). When you restart the program after a failure, it recreates the StreamingContext based on the checkpoint data in the checkpoint directory.
Note:
RDD checkpoints need to save data to reliable storage, resulting in some cost overhead. This may result in increased processing time for those batches in which RDD got the checkpoint. Therefore, you need to set a reasonable interval between checkpoints. When the batch interval is small (for example, 1 second), a checkpoint for each batch interval can significantly reduce throughput. Conversely, too long checkpoint intervals can lead to an increase in lineage and task size, which can have an adverse impact. "for stateful transitions that require RDD checkpoints, the default interval is a multiple of batch interval, which should be at least 10 seconds." You can configure it using * * dstream.checkpoint (checkpointInterval) * *. In general, a checkpoint interval of 5-10 batch interval for DStream is a better choice.
The difference between checkpoints and persistence
Persistence
When we keep the RDD at the DISK_ONLY storage level, the RDD will be stored in one location, and subsequent use of the RDD will not recalculate the lineage. After calling persist (), Spark remembers the lineage of RDD, even if it doesn't call it. After the job runs, the cache is cleared and the file is destroyed.
Checkpoint
If the checkpoint stores the RDD in HDFS, the lineage consanguinity will be deleted. After the job runs, unlike the plan, the checkpoint file is not deleted. When checkpoint a RDD, it will result in double computation. That is, before completing the actual calculation, the operation first calls the persistence method, and then writes it to the checkpoint directory. Using DataFrames & SQL to process stream data
In Spark Streaming applications, you can easily stream data using DataFrames and SQL operations. Use cases are as follows:
Object SqlStreaming {
Def main (args: Array [String]): Unit = {
Val conf = new SparkConf ()
.setAppName (SqlStreaming.getClass.getSimpleName)
.setMaster ("local [4]")
Val ssc = new StreamingContext (conf, Seconds (5))
Val lines = ssc.socketTextStream ("localhost", 9999)
Val words = lines.flatMap (_ .split (""))
Words.foreachRDD {rdd = >
/ / call the SparkSession singleton method. If it has been created, return it directly.
Val spark = SparkSessionSingleton.getInstance (rdd.sparkContext.getConf)
Import spark.implicits._
Val wordsDataFrame = rdd.toDF ("word")
WordsDataFrame.show ()
WordsDataFrame.createOrReplaceTempView ("words")
Val wordCountsDataFrame =
Spark.sql ("select word, count (*) as total from words group by word")
WordCountsDataFrame.show ()
}
Ssc.start ()
Ssc.awaitTermination ()
}
}
/ * * SparkSession singleton * /
Object SparkSessionSingleton {
@ transient private var instance: SparkSession = _
Def getInstance (sparkConf: SparkConf): SparkSession = {
If (instance = = null) {
Instance = SparkSession
.builder
.config (sparkConf)
.getOrCreate ()
}
Instance
}
} Thank you for your reading, the above is the content of "what is the Spark Streaming programming method". After the study of this article, I believe you have a deeper understanding of what the Spark Streaming programming method is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.