Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

(version customization) lesson 18: graceful stop of Spark Streaming hollow RDD processing and stream handlers

2025-01-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The contents of this issue:

1. RDD is null in Spark Streaming

2. Streaming Context program stop mode

The Spark Streaming application generates RDD according to the Batch Duration we set. The resulting RDD may have empty partitons data, but it will still execute foreachPartition, get computing resources, and then calculate. This situation will be wasted.

Cluster computing resources, so you need to filter when the program is running. Refer to the following code:

Package com.dt.spark.sparkstreaming

Import org.apache.spark.SparkConf

Import org.apache.spark.streaming. {Seconds, StreamingContext}

Object OnlineForeachRDD2DB {

Def main (args: Array [String]) {

Val conf = new SparkConf () / / create a SparkConf object

Conf.setAppName ("OnlineForeachRDD2DB") / / sets the name of the application, which can be seen in the monitoring interface where the program is running

Conf.setMaster ("spark://Master:7077") / / at this time, the program is in the Spark cluster

/ * *

* set the batchDuration interval to control the frequency of Job generation and create an entry for Spark Streaming execution

, /

Val ssc = new StreamingContext (conf, Seconds)

Val lines = ssc.socketTextStream ("Master", 9999)

Val words = lines.flatMap (line = > line.split (""))

Val wordCounts = words.map (word = > (word,1). ReduceByKey (_ + _)

WordCounts.foreachRDD {rdd = >

/ * *

* for example, what will happen if rdd is empty and rdd is empty?

* rdd does not have any elements, but it can also do foreachPartition, write to the database or write the data to HDFS

* there is no record in rdd, but it will also obtain computing resources, and then calculate and consume computing resources, which is a waste of resources at this time.

* therefore, empty rdd must be processed.

* for example: use rdd.count () > 0, but rdd.count () triggers a Job

* take also triggers Job when rdd.isEmpty () is used

* def isEmpty (): Boolean = withScope {

* partitions.length = = 0 | | take (1) .length = = 0

*}

*

* whether length is equal to 0 is judged in rdd.partitions.isEmpty, which means whether there is partition.

* def isEmpty: Boolean = {length = = 0}

* Note: rdd.isEmpty () and rdd.partitions.isEmpty are two concepts

, /

/ /

If (rdd.partitions.length > 0) {

Rdd.foreachPartition {partitonOfRecord = >

If (partitionOfRecord.hasNext) / / determine whether there is data in partition

{

Val connection = ConnectionPool.getConnection ()

PartitonOfRecord.foreach (record = > {

Val sql = "insert into streaming_itemcount (item,rcount) values ('" + record._1 + "," + record._2 + ")"

Val stmt = connection.createStatement ()

Stmt.executeUpdate (sql)

Stmt.close ()

})

ConnectionPool.returnConnection (connection)

}

}

}

}

Ssc.start ()

Ssc.awaitTermination ()

}

}

II. SparkStreaming program stop mode

The first is to be stopped directly regardless of whether the received data is processed or not.

The second is to accept that all the data has been processed before it is stopped, and the second way is generally used.

The first way to stop:

/ * *

* Stop the execution of the streams immediately (does not wait for all received data

* to be processed). By default, if `stopSparkContext` is not specified, the underlying

* SparkContext will also be stopped. This implicit behavior can be configured using the

* SparkConf configuration spark.streaming.stopSparkContextByDefault.

*

* stop the execution of streams directly (without waiting for all accepted data processing to be completed), and SparkContext will be stopped by default

* implicit behavior can be configured, and the configuration parameter is spark.streaming.stopSparkContextByDefault.

*

* @ param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext

* will be stopped regardless of whether this StreamingContext has been

* started.

, /

Def stop (stopSparkContext: Boolean = conf.getBoolean ("spark.streaming.stopSparkContextByDefault", true)

Unit = synchronized {

Stop (stopSparkContext, false)

}

The second way to stop:

/ * *

* Stop the execution of the streams, with option of ensuring all received data

* has been processed.

*

* all received data will be processed before the execution of streams is stopped.

*

* @ param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext

* will be stopped regardless of whether this StreamingContext has been

* started.

* @ param stopGracefully if true, stops gracefully by waiting for the processing of all

* received data to be completed

, /

Def stop (stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {

Var shutdownHookRefToRemove: AnyRef = null

If (AsynchronousListenerBus.withinListenerThread.value) {

Throw new SparkException ("Cannot stop StreamingContext within listener thread of" +

"AsynchronousListenerBus")

}

Synchronized {

Try {

State match {

Case INITIALIZED = >

LogWarning ("StreamingContext has not been started yet")

Case STOPPED = >

LogWarning ("StreamingContext has already been stopped")

Case ACTIVE = >

Scheduler.stop (stopGracefully)

/ / Removing the streamingSource to de-register the metrics on stop ()

Env.metricsSystem.removeSource (streamingSource)

UiTab.foreach (_ .detach ())

StreamingContext.setActiveContext (null)

Waiter.notifyStop ()

If (shutdownHookRef! = null) {

ShutdownHookRefToRemove = shutdownHookRef

ShutdownHookRef = null

}

LogInfo ("StreamingContext stopped successfully")

}

} finally {

/ / The state should always be Stopped after calling `stop () `, even if we haven't started yet

State = STOPPED

}

}

If (shutdownHookRefToRemove! = null) {

ShutdownHookManager.removeShutdownHook (shutdownHookRefToRemove)

}

/ / Even if we have already stopped, we still need to attempt to stop the SparkContext because

/ / a user might stop (stopSparkContext = false) and then call stop (stopSparkContext = true).

If (stopSparkContext) sc.stop ()

}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report