In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The contents of this issue:
1. Dynamic allocation of Spark Streaming resources
2. Spark Streaming dynamically controls the consumption rate.
Why do you need dynamic?
A) Spark is coarse-grained by default, and resources are allocated before calculation. For Spark Streaming, there are high peak and low peak, but the resources they need are not the same. From the point of view of high peak, a lot of resources will be wasted.
B) with the continuous operation of Spark Streaming, resource consumption and management are also factors that we have to consider.
There are challenges when Spark Streaming resources are dynamically adjusted:
Spark Streaming runs according to Batch Duration, and Batch Duration needs a lot of resources. Next time, Batch Duration will not need so many resources. The Batch Duration operation has expired before the resources are adjusted. Adjust the time interval at this time.
Dynamic request for Spark Streaming resources
1. Dynamic resource allocation is not enabled by default in SparkContext, but can be configured manually in SparkConf.
/ / Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled (_ conf) if (! dynamicAllocationEnabled & & _ conf.getBoolean ("spark.dynamicAllocation.enabled", false)) {logWarning ("DynamicAllocation and num executors both set, thus dynamic allocation disabled.")} _ executorAllocationManager = if (dynamicAllocationEnabled) {Some (this, listenerBus, _ conf)} else {None} _ executorAllocationManager.foreach (_ .start ())
Set the spark.dynamicAllocation.enabled parameter to true
Here, resources will be dynamically allocated by instantiating ExecutorAllocationManager objects. Inside, timers will constantly scan Executor, and schedule () will be called through thread pool to complete resource dynamic allocation.
/ * * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. * / def start (): Unit = {listenerBus.addListener (listener) val scheduleTask = new Runnable () {override def run (): Unit = {try {schedule () / / dynamically adjust Executor allocation} catch {case ct: ControlThrowable = > throw ctcase t: Throwable = > logWarning (s "Uncaught exception in thread ${Thread.currentThread (). GetName}", t)}} executor.scheduleAtFixedRate (scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)}
Private def schedule (): Unit = synchronized {val now = clock.getTimeMillis updateAndSyncNumExecutorsTarget (now) / / update the number of Executor removeTimes.retain {case (executorId, expireTime) = > val expired = now > = expireTimeif (expired) {initializing = falseremoveExecutor (executorId)}! expired}} / * * Updates our target number of executors and syncs the result with the cluster manager. * Check to see whether our existing allocation and the requests we've made previously exceed our * current needs. If so, truncate our target and let the cluster manager know so that it can * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. * * @ return the delta in the target number of executors. * / private def updateAndSyncNumExecutorsTarget (now: Long): Int = synchronized {val maxNeeded = maxNumExecutorsNeededif (initializing) {/ / Do not change our target while we are still initializing, / / Otherwise the first job may have to ramp up unnecessarily0} else if (maxNeeded
< numExecutorsTarget) {// The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requestsval oldNumExecutorsTarget = numExecutorsTarget numExecutorsTarget = math.max(maxNeeded, minNumExecutors)numExecutorsToAdd = 1// If the new target has not changed, avoid sending a message to the cluster managerif (numExecutorsTarget < oldNumExecutorsTarget) { client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +s"$oldNumExecutorsTarget) because not all requested executors are actually needed") }numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors (maxNeeded) logDebug (s "Starting timer to add more executors (to" + s "expire in $sustainedSchedulerBacklogTimeoutS seconds)") addTime + = sustainedSchedulerBacklogTimeoutS * 1000delta} else {0}}
Dynamic control of consumption rate:
Spark Streaming provides a flexible mechanism, the relationship between the speed of flow and processing speed, whether it is time to process data. If it is not in time, it will automatically dynamically control the speed of the data flow in and the spark.streaming.backpressure.enabled parameter setting.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 233
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.