Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the decryption of spark.streaming.concurrentJobs parameters

2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article will explain in detail the analysis of how to decrypt spark.streaming.concurrentJobs parameters. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

Recently, in spark streaming tuning, it was found that the default value of spark.streaming.concurrentJobs,spark to increase the parallelism of job is 1. When it is increased to 2 (configured in spark-default), if you encounter slow processing speed, there will be two Active Jobs in the streaming application UI (default is 1), that is, two batches of streaming job can be executed at the same time. This parameter affects the execution of streaming. # # the parameter is introduced into JobScheduler line 47 in spark streaming to read this parameter:

Privateval numConcurrentJobs = ssc.conf.getInt ("spark.streaming.concurrentJobs", 1) privateval jobExecutor = ThreadUtils.newDaemonFixedThreadPool (numConcurrentJobs, "streaming-job-executor")

Initialize the jobExecutor thread pool with the concurrentJobs parameter, which directly affects the number of threads in the job executor thread pool.

Job executor

The job executor thread pool is used to execute JobHandler threads; there is a job container jobSets in jobSchedule:

Privateval jobSets: java.util.Map [Time, JobSet] = new ConcurrentHashMap [Time, JobSet]

Used to save JobSet generated at different points in time, and JobSet contains multiple Job; JobSet submit logic:

Def submitJobSet (jobSet: JobSet) {if (jobSet.jobs.isEmpty) {logInfo ("No jobs added for time" + jobSet.time)} else {listenerBus.post (StreamingListenerBatchSubmitted (jobSet.toBatchInfo)) jobSets.put (jobSet.time, jobSet) jobSet.jobs.foreach (job = > jobExecutor.execute (new JobHandler (job)) logInfo ("Added jobs for time" + jobSet.time)}}

It is not difficult to see that the capacity of jobExecutor determines the number of JobHandler threads in the pool that can be processed at the same time. JobHandler is the thread of execution of job, so it determines the number of JobHandler threads that can be committed at the same time.

Usage

You can configure this parameter for streaming job through a centralized method.

Changes in spark-default global changes, all streaming job will be affected.

Submit streaming job is-- conf parameter add (recommended) when submitting a job, you can use the-- conf parameter to add a personalized configuration for the job. For example: bin/spark-submit-- master yarn-- conf spark.streaming.concurrentJobs=5 sets the job executor thread pool size of the streaming job to 5, and five batch job can be executed at the same time with sufficient resources.

The code is set in the code through sparkConf settings: sparkConf.set ("spark.streaming.concurrentJobs", "5"), or System.setProperty ("spark.streaming.concurrentJobs", "5")

Recommendations for the use of scheduler mode

When configuring multiple concurrentJob, multiple batches of job are submitted to the cluster at the same time, which requires more computing resources; when no more computing resources (Executor) are allocated to the streaming job, the schedul can be adjusted to FAIR (fair scheduling) so that multiple job submitted can fairly share computing resources. When adjusted to fair scheduling, job can share computing resources, while the submission of job is still in a chronological order (although the time interval is very small), which can easily cause the tilt of task allocation among executor and prolong the overall execution time of job. When the fifo scheduling method is used, the first-arrived job gets the computing resources first. When the number of executor is insufficient, the job will wait for the executor to be released, and the number of task is not easy to tilt. In actual use, if the number of executor is sufficient, it is recommended to use FIFO mode. For example, when concurrentJob is the default configuration, the number of executor allocation is m, then when concurrentJobs is configured as n, executor recommends that it be allocated as concurrentJobs.

This is the end of the analysis on how to decrypt spark.streaming.concurrentJobs parameters. I hope the above content can be of some help to you and learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report