In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail how to customize an accumulator in Spark, the content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
Accumulator is a distributed variable mechanism provided in Spark. Its principle is similar to mapreduce, that is, distributed changes, and then aggregate these changes. A common use of accumulators is to count events during job execution while debugging.
The accumulator is easy to use
Spark's built-in accumulators provide Long and Double types. The following is a simple usage example, in which we count the odd numbers in RDD while filtering out the odd numbers, and finally calculate the sum of the remaining integers.
Val sparkConf = new SparkConf (). SetAppName ("Test"). SetMaster ("local [2]") val sc = new SparkContext (sparkConf) val accum = sc.longAccumulator ("longAccum") / / the number of statistical odd numbers val sum = sc.parallelize (Array) 2) .filter (n = > {if (n% 2 / 2) accum.add (1L) n% 2 / 2) println ("sum:" + sum) println ("accum:" + accum.value) sc.stop ()
The result is:
Sum: 20
Accum: 5
This is a normal result, but in the process of using the accumulator, if you do not have a deep understanding of the execution process of spark, you will encounter two typical errors: add less (or do not add) and add more.
Custom accumulator
The ability to customize accumulator types has been provided since version 1.X, but it is troublesome to use. After version 2.0, the ease of use of accumulators has been greatly improved, and the government also provides a new abstract class: AccumulatorV2 to provide a more friendly implementation of custom type accumulators. The official also gives an example of an implementation: the CollectionAccumulator class, which allows you to collect some information during the execution of a spark application in the form of a collection. For example, we can use this class to collect some details of Spark processing data, of course, because the value of the accumulator will eventually converge to the driver side, in order to avoid the outofmemory problem on the driver side, we need to control the scale of the information collected and should not be too large.
Inherit the AccumulatorV2 class and override all its methods
Package sparkimport constant.Constantimport org.apache.spark.util.AccumulatorV2import util.getFieldFromConcatStringimport util.setFieldFromConcatStringopen class SessionAccmulator: AccumulatorV2 () {private var result = Constant.SESSION_COUNT + "= 0 |" + Constant.TIME_PERIOD_1s_3s + "= 0 |" + Constant.TIME_PERIOD_4s_6s + "= 0 |" + Constant.TIME_PERIOD_7s_9s + "= 0 |" + Constant.TIME_PERIOD_10s_30s + "= 0 |" + Constant.TIME_PERIOD_30s_60s + " = 0 | "+ Constant.TIME_PERIOD_1m_3m +" = 0 | "+ Constant.TIME_PERIOD_3m_10m +" = 0 | "+ Constant.TIME_PERIOD_10m_30m +" = 0 | "+ Constant.TIME_PERIOD_30m +" = 0 | "+ Constant.STEP_PERIOD_1_3 +" = 0 | "+ Constant.STEP_PERIOD_4_6 +" = 0 | "+ Constant.STEP_PERIOD_7_9 +" = 0 | "+ Constant.STEP_PERIOD" _ 10x30 + "= 0 |" + Constant.STEP_PERIOD_30_60 + "= 0 |" + Constant.STEP_PERIOD_60 + "= 0" override fun value (): String {return this.result} / * merge data * / override fun merge (other: AccumulatorV2?) {if (other = = null) return else {if (other is SessionAccmulator) {var newResult = "" val resultArray = arrayOf (Constant.SESSION_COUNT) Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s, Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m, Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m, Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6 Constant.STEP_PERIOD_7_9, Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60) resultArray.forEach {val oldValue = other.result.getFieldFromConcatString ("|", it) if (oldValue.isNotEmpty ()) {val newValue = oldValue.toInt () + 1 / / find the cause It's been giving values in a loop all the time. Debug30 minutes are annoying. If (newResult.isEmpty ()) {newResult = result.setFieldFromConcatString ("|", it, newValue.toString ())} / / the problem lies here. The custom is not written wrong. Failed to merge newResult = newResult.setFieldFromConcatString ("|", it NewValue.toString ()} result = newResult}} override fun copy (): AccumulatorV2 {val sessionAccmulator = SessionAccmulator () sessionAccmulator.result = this.result return sessionAccmulator} override fun add (p0: String?) {val v1 = this.result val v2 = p0 if (v2.isNullOrEmpty ()) {return} else {var newResult = "val oldValue = v1.getFieldFromConcatString (" | ") V2percent!) If (oldValue.isNotEmpty ()) {val newValue = oldValue.toInt () + 1 newResult = result.setFieldFromConcatString ("|", v2 NewValue.toString ()} result = newResult} override fun reset () {val newResult = Constant.SESSION_COUNT + "= 0 |" + Constant.TIME_PERIOD_1s_3s + "= 0 |" + Constant.TIME_PERIOD_4s_6s + "= 0 |" + Constant.TIME_PERIOD_7s_9s + "= 0 |" + Constant.TIME_PERIOD_10s_30s + "= 0 |" + Constant.TIME_PERIOD " _ 30s_60s + "= 0 |" + Constant.TIME_PERIOD_1m_3m + "= 0 |" + Constant.TIME_PERIOD_3m_10m + "= 0 |" + Constant.TIME_PERIOD_10m_30m + "= 0 |" + Constant.TIME_PERIOD_30m + "= 0 |" + Constant.STEP_PERIOD_1_3 + "= 0 |" + Constant.STEP_PERIOD_4_6 + "= 0 |" + Constant.STEP_PERIOD_7_ " 9 + "= 0 |" + Constant.STEP_PERIOD_10_30 + "= 0 |" + Constant.STEP_PERIOD_30_60 + "= 0 |" + Constant.STEP_PERIOD_60 + "= 0" result = newResult} override fun isZero (): Boolean {val newResult = Constant.SESSION_COUNT + "= 0 | + Constant.TIME_PERIOD_1s_3s +" = 0 | "+ Constant.TIME_PERIOD_4s_6s +" = 0 | "+ Constant.TIME_PERIOD_7s_9s + "= 0 |" + Constant.TIME_PERIOD_10s_30s + "= 0 |" + Constant.TIME_PERIOD_30s_60s + "= 0 |" + Constant.TIME_PERIOD_1m_3m + "= 0 |" + Constant.TIME_PERIOD_3m_10m + "= 0 |" + Constant.TIME_PERIOD_10m_30m + "= 0 |" + Constant.TIME_PERIOD_30m + "= 0 |" + Constant " .step _ PERIOD_1_3 + "= 0 |" + Constant.STEP_PERIOD_4_6 + "= 0 |" + Constant.STEP_PERIOD_7_9 + "= 0 |" + Constant.STEP_PERIOD_10_30 + "= 0 |" + Constant.STEP_PERIOD_30_60 + "= 0 |" + Constant.STEP_PERIOD_60 + "= 0" return this.result = = newResult}}
Method introduction
Value method: gets the value in the accumulator
Merge method: this method is very important, be sure to write it correctly, this method is the method of merging the accumulators of each task (which will be used in the execution process below)
Iszero method: judge whether it is the initial value or not
Reset method: resets values in the accumulator
Copy method: copy accumulator
The execution flow of the accumulator in spark:
First, several task,spark engine will call the copy method to copy several accumulators (unregistered), and then accumulate in each task (note that the value of the accumulator that was initially registered is unchanged in this process). Finally, the execution will call the merge method and merge the result accumulator of each task (the registered accumulator is the initial value).
.
On how to customize an accumulator in Spark to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.