Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Why do Spark Broadcast use singleton mode?

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces why Spark Broadcast should use singleton mode, the content is very detailed, interested friends can refer to, hope to be helpful to you.

Many friends who use Spark Streaming should have used broadcast. In most cases, broadcast variables are declared in singleton mode. Have any fans ever wondered why? Langjian is here to help you analyze, there are the following reasons:

Broadcast variables are not changed in most cases, and using singleton mode can reduce the overhead of repeatedly generating broadcast variables for each job generation execution of spark streaming.

Singleton mode also needs to be synchronized. For many beginners, there is no need to consider the synchronization problem, because the novice will not adjust the scheduling mode of the spark program task, but the default scheduling mode is FIFO, which basically does not cause concurrency problems. 1)。 If you configure the Fair scheduling mode and modify the number of job that Spark Streaming runs in parallel execution, the default is 1, then you need to add synchronization code. 2)。 There is also a reason for concurrency problems when sharing broadcast in the case of multiple output streams and configuring Fair scheduling mode.

Be careful. Sometimes, for example, broadcast configuration files, rules, etc., need to change broadcast. When using fair, you can use local variables as broadcasts in foreachrdd to avoid mutual interference.

First look at the example, and then gradually reveal the internal mechanism.

1. Examples

Here is how a double-checked broadcast variable is declared.

Object WordBlacklist {@ volatile private var instance: Broadcasting [Seq [string]] = null def getInstance (sc: SparkContext): Broadcasting [Seq [string]] = {if (instance = = null) {synchronized {if (instance = = null) {val wordBlacklist = Seq ("a", "b", "c") instance = sc.broadcast (wordBlacklist)} instance}

Broadcast variables are used as follows:

Val lines = ssc.socketTextStream (ip, port) val words = lines.flatMap (_ .split (")) val wordCounts = words.map ((_, 1)). ReduceByKey (_ + _) wordCounts.foreachRDD {(rdd: RDD [(String, Int)] Time: Time) = > / / Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance (rdd.sparkContext) / / Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance (rdd.sparkContext) / / Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter {case (word) Count) = > if (blacklist.value.contains (word)) {droppedWordsCounter.add (count) false} else {true}}. Collect (). MkString ("[", "," Val output = s "Counts at time $time $counts" println (output) println (s "Dropped ${droppedWordsCounter.value} word (s) totally") println (s "Appending to ${outputFile.getAbsolutePath}") Files.append (output + "\ n", outputFile, Charset.defaultCharset ())}

two。 Conceptual supplement

First of all, a basic concept is that Spark applications are divided into many layers from initial submission to task execution.

Apply the scheduler. Resource managers, such as standalone,yarn, are responsible for the scheduling of the whole Spark application and the management of cluster resources.

Job Scheduler. Spark operators are divided into two main categories, transform and action, in which each action produces a job. This job needs to be scheduled and executed in the resource pool provided by executor, and of course not many directly schedule and execute job.

Stage partition and scheduling. Job will be divided into several stage, this has a basic concept is wide dependency and narrow dependency, wide dependency will be divided into stage. Stage also needs scheduling execution, divided from back to front, and scheduling execution from back to back.

Task cutting and scheduling. The further refinement of stage is to divide the task collection according to less parallelism, which is the basic unit for scheduling execution on executor. The current scheduling default is one task and one cpu.

The job generation of Spark Streaming is periodic. Job accumulation occurs when the execution time of the current job exceeds the generation cycle. Accumulating a certain number of job may cause the application to fail. This is mainly due to FIFO's scheduling mode and Spark Streaming's default single-threaded job execution mechanism.

3.Spark Streaming job generation

The main entry of this source code is the StreamingContext#JobScheduler#JobGenerator object, and there is an internal RecurringTimer, which is mainly responsible for generating GenrateJobs events according to the batch processing time cycle. Of course, in the case of windows, this cycle may not generate job, depending on the sliding interval. You are interested in revealing the secret yourself, as mentioned in the video tutorial shared in the Wave Tip Planet. The specific code blocks are as follows

Privateval timer = new RecurringTimer (clock, ssc.graph.batchDuration.milliseconds, longTime = > eventLoop.post (GenerateJobs (new Time (longTime), "JobGenerator")

Let's look directly at the modern code block:

EventLoop = new EventLoop [JobGeneratorEvent] ("JobGenerator") {override protected def onReceive (event: JobGeneratorEvent): Unit = processEvent (event) override protected def onError (e: Throwable): Unit = {jobScheduler.reportError ("Error in job generator", e)}} eventLoop.start ()

The event handler is the processEvent method

/ * Processes all events * / private def processEvent (event: JobGeneratorEvent) {logDebug ("Got event" + event) event match {case GenerateJobs (time) = > generateJobs (time) case ClearMetadata (time) = > clearMetadata (time) case DoCheckpoint (time, clearCheckpointDataLater) = > doCheckpoint (time, clearCheckpointDataLater) case ClearCheckpointData (time) = > clearCheckpointData (time)}}

When the GenerateJob event is received, the generateJobs code is executed, within which the job is generated and dispatched.

/ * * Generate jobs and perform checkpointing for the given `time`. * / private def generateJobs (time: Time) {/ / Checkpoint all RDDs marked for checkpointing to ensure their lineages are / / truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). Ssc.sparkContext.setLocalProperty (RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try {jobScheduler.receiverTracker.allocateBlocksToBatch (time) / / allocate received blocks to batch graph.generateJobs (time) / / generate jobs using allocated block} match {case Success (jobs) = > val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo (time) jobScheduler.submitJobSet (JobSet (time, jobs StreamIdToInputInfos)) case Failure (e) = > jobScheduler.reportError ("Error generating jobs for time" + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead (e)} eventLoop.post (DoCheckpoint (time, clearCheckpointDataLater = false))}

You can see that the job generated code is first executed in the code.

Graph.generateJobs (time) specific code block def generateJobs (time: Time): Seq [Job] = {logDebug ("Generating jobs for time" + time) val jobs = this.synchronized {outputStreams.flatMap {outputStream = > val jobOption = outputStream.generateJob (time) jobOption.foreach (_ .setCallSite (outputStream.creationSite) jobOption} logDebug ("Generated" + jobs.length + "jobs for time" + time) jobs}

Each output stream generates a job, which is similar to foreachrdd,print. In fact, the interior is full of ForEachDStream. So what is generated is a job collection.

The job collection is then submitted to the thread pool for execution, which is done on the driver side.

JobScheduler.submitJobSet (JobSet (time, jobs, streamIdToInputInfos)) specific h function content def submitJobSet (jobSet: JobSet) {if (jobSet.jobs.isEmpty) {logInfo ("No jobs added for time" + jobSet.time)} else {listenerBus.post (StreamingListenerBatchSubmitted (jobSet.toBatchInfo)) jobSets.put (jobSet.time) JobSet) jobSet.jobs.foreach (job = > jobExecutor.execute (new JobHandler (job) logInfo ("Added jobs for time" + jobSet.time)}}

It is simply traversing the generated job collection and submitting it to the thread pool jobExecutor for execution. This is also on the driver side.

JobExecutor is a thread pool with a fixed number of threads, and the default is 1 thread.

Privateval numConcurrentJobs = ssc.conf.getInt ("spark.streaming.concurrentJobs", 1) privateval jobExecutor = ThreadUtils.newDaemonFixedThreadPool (numConcurrentJobs, "streaming-job-executor")

If necessary, you can configure spark.streaming.concurrentJobs to submit multiple job for execution at the same time.

So in this case, can job be executed in parallel?

Obviously not!

And change the scheduling mode to Fair.

For a simple even share, all you need is

Conf.set ("spark.scheduler.mode", "FAIR")

Then, the job running at the same time will share all the resources provided by the executor equally.

This is the whole process of job generation.

Because Spark Streaming tasks are concurrent in Fair mode, you need to be careful to declare synchronization when using singleton mode to generate broadcast.

On why Spark Broadcast to use singleton mode to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report