Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the function of the divider in Spark

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you what the role of the divider in Spark is, the content is concise and easy to understand, it can definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

Dig a data tilt pit for yourself in Spark. In order to solve this problem, the principle of Spark divider is studied by the way, and the overtime work on weekends is summarized.

Let's talk about data tilt first.

Data skew means that when the RDD in Spark is calculated, the partition within each RDD contains uneven data. For example, there are five partitions, one of which accounts for 90% of the data, which leads to the fact that five partitions can have five people working in parallel, but as a result, four people don't work very much, and the work is all on one person. There are many solutions to this problem on the Internet.

However, if it is the problem of the underlying data, no matter how it is optimized, the tilt of the data can not be solved.

For example, if you want to groupby a certain rdd, and then do the join operation, if the key of the grouping is unevenly distributed, then the true sample cannot be optimized. Because once the key is segmented, the complete join cannot be done. If the key is not segmented, the corresponding partition data will inevitably be skewed.

However, it is important to understand why the data is skewed, so read on!

The role of zoning

In rdd in the format of PairRDD (key,value), many operations are based on key, so in order to split the task independently, the data is reorganized according to key. Like groupbykey.

Reorganization certainly requires a rule, and the most common is that Hash,Spark-based also provides a slightly more complex sampling-based Range partitioning method.

Let's first take a look at how the divider is used in the Spark calculation process:

The use of Paritioner

Take groupbykey, for example:

Def groupByKey (): JavaPairRDD [K, JIterable [V]] = fromRDD (groupByResultToJava (rdd.groupByKey ())

It calls the groupByKey () method of PairRDDFunction

Def groupByKey (): RDD [(K, Iterable [V])] = self.withScope {groupByKey (defaultPartitioner (self))}

A default divider is created in this method. The default divider is defined as follows:

Def defaultPartitioner (rdd: RDD [_], others: RDD [_] *): Partitioner = {val bySize = (Seq (rdd) + + others). SortBy (_ .partitions.size). Reverse for (r 0) {return r.partitioner.get} if (rdd.context.conf.contains ("spark.default.parallelism")) {new HashPartitioner (rdd.context.defaultParallelism)} else {new HashPartitioner (bySize.head.partitions.size)}}

First get the number of partitions of the current partition, and if the spark.default.parallelism parameter is not set, create a Hash partitioner with the same number of partitions as before.

Of course, users can also customize the divider, or use other provided dividers. API also supports:

/ / incoming partition object def groupByKey (partitioner: Partitioner): JavaPairRDD [K, JIterable [V]] = fromRDD (groupByResultToJava (rdd.groupByKey (partitioner) / / number of incoming partitions def groupByKey (numPartitions: Int): JavaPairRDD [K, JIterable [V]] = fromRDD (groupByResultToJava (rdd.groupByKey (numPartitions)

HashPatitioner

Hash divider is the simplest and default partition. Understanding its partition rules is very helpful for us to deal with data skew or design packet key.

Class HashPartitioner (partitions: Int) extends Partitioner {require (partitions > = 0, s "Number of partitions ($partitions) cannot be negative.") Def numPartitions: Int = partitions / / calculate its HashCode through key and take the module according to the number of partitions. If the result is less than 0, add the number of partitions directly. Def getPartition (key: Any): Int = key match {case null = > 0 case _ = > Utils.nonNegativeMod (key.hashCode, numPartitions)} / / compare whether the two dividers are the same, and directly compare the number of partitions between them override def equals (other: Any): Boolean = other match {case h: HashPartitioner = > h.numPartitions = = numPartitions case _ = > false} override def hashCode: Int = numPartitions}

The most important thing here is the Utils.nonNegativeMod (key.hashCode, numPartitions), which determines which partition the data enters.

Def nonNegativeMod (x: Int, mod: Int): Int = {val rawMod = x% mod rawMod + (if (rawMod)

< 0) mod else 0) } 说白了,就是基于这个key获取它的hashCode,然后对分区个数取模。由于HashCode可能为负,这里直接判断下,如果小于0,再加上分区个数即可。 因此,基于hash的分区,只要保证你的key是分散的,那么最终数据就不会出现数据倾斜的情况。 RangePartitioner 这个分区器,适合想要把数据打散的场景,但是如果相同的key重复量很大,依然会出现数据倾斜的情况。 每个分区器,最核心的方法,就是getPartition def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } } 在range分区中,会存储一个边界的数组,比如[1,100,200,300,400],然后对比传进来的key,返回对应的分区id。 那么这个边界是怎么确定的呢? 这就是Range分区最核心的算法了,大概描述下,就是遍历每个paritiion,对里面的数据进行抽样,把抽样的数据进行排序,并按照对应的权重确定边界。 有几个比较重要的地方: 1 抽样 2 确定边界 关于抽样,有一个很常见的算法题,即在不知道数据规模的情况下,如何以等概率的方式,随机选择一个值。 最笨的办法,就是遍历一次数据,知道数据的规模,然后随机一个数,取其对应的值。其实这样相当于遍历了两次(第二次的取值根据不同的存储介质,可能不同)。 在Spark中,是使用水塘抽样这种算法。即首先取***个值,然后依次往后遍历;第二个值有二分之一的几率替换选出来的值;第三个值有三分之一的几率替换选出来的值;…;直到遍历到***一个值。这样,通过依次遍历就取出来随机的数值了。 算法参考源码: private var rangeBounds: Array[K] = { if (partitions if (fraction * n >

SampleSizePerPartition) {imbalancedPartitions + = idx} else {/ / The weight is 1 over the sampling probability. Val weight = (n.toDouble / sample.size). ToFloat for (key (x, weight))} RangePartitioner.determineBounds (candidates, partitions)} def sketch [K: ClassTag] (rdd: RDD [K], sampleSizePerPartition: Int): (Long, Array [(Int, Long) Array [K]]) = {val shift = rdd.id / / val classTagK = classTag [K] / / to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex {(idx, iter) = > val seed = byteswap32 (idx ^ (shift k, continue the sampling process. / / starting with the sequence number just now, continue to iterate through the var l = i.toLong / / random number val rand = new XORShiftRandom (seed) while (input.hasNext) {val item = input.next () / / A random number multiplies the current l, and if it is less than the sampling number k, replace it. Val replacementIndex = (rand.nextDouble () * l) .toLong if (replacementIndex)

< k) { reservoir(replacementIndex.toInt) = item } l += 1 } (reservoir, l) } } 确定边界 最后就可以通过获取的样本数据,确定边界了。 def determineBounds[K : Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] // 数据格式为(key,权重) val ordered = candidates.sortBy(_._1) val numCandidates = ordered.sizeval sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions - 1)) { val (key, weight) = ordered(i) cumWeight += weight if (cumWeight >

= target) {/ / Skip duplicate values. If (previousBound.isEmpty | | ordering.gt (key, previousBound.get)) {bounds + = key target + = step j + = 1 previousBound = Some (key)}} I + = 1} bounds.toArray}

Looking directly at the code, it is still a bit obscure. Let's give an example and explain it step by step:

According to the above algorithm flow, it can be roughly understood:

Sampling-> determining boundaries (sorting)

First of all, those who have some knowledge of spark should know that in spark, each RDD can be understood as a set of partitions, which correspond to memory blocks block, and they are the final carrier of the data. Then a RDD consists of different partitions, so that when dealing with some operators such as map,filter, it can be calculated in parallel on a partition-by-partition basis. You don't need to cooperate with other RDD until you encounter shuffle.

In the figure above, if we do not set a special setting, a RDD consists of three partitions, so when groupbykey it, it will be partitioned according to 3.

According to the above algorithm flow, if the number of partitions is 3, then the sample size is:

Val sampleSize = math.min (20.0 * partitions, 1e6)

That is, the number of samples is 60, and each partition takes 60 samples. However, considering the skew of the data, some partitions may have a lot of data, so in the actual sampling, it will be sampled according to 3 times the size:

Val sampleSizePerPartition = math.ceil (3.0 * sampleSize / rdd.partitions.size) .toInt

In other words, a maximum of 60 sample data will be taken.

Then it traverses each partition and takes the corresponding number of samples.

Val sketched = rdd.mapPartitionsWithIndex {(idx, iter) = > val seed = byteswap32 (idx ^ (shift if (fraction * n > sampleSizePerPartition) {imbalancedPartitions + = idx} else {/ / The weight is 1 over the sampling probability. Val weight = (n.toDouble / sample.size) .toFloat for (key (x, weight))}

After the sample is taken, it is time to determine the boundary.

Note that each key will have a weight, which is [total number of data / samples of the partition]

RangePartitioner.determineBounds (candidates, partitions)

First sort val ordered = candidates.sortBy (_. _ 1), then determine the step size of a weight

Val sumWeights = ordered.map (_. _ 2.toDouble). Sum val step = sumWeights / partitions

Based on this step size, the boundary is determined, and several range data are formed.

Then the divider forms a binary tree and traverses the number to determine the partition id corresponding to each key

Partition = binarySearch (rangeBounds, k)

Practice-- Custom Partitioner

The custom divider is also very simple. You only need to implement the corresponding two methods:

Public class MyPartioner extends Partitioner {@ Override public int numPartitions () {return 1000;} @ Override public int getPartition (Object key) {String k = (String) key; int code = k.hashCode ()% 1000; System.out.println (k + ":" + code); return code < 0?code+1000:code } @ Override public boolean equals (Object obj) {if (obj instanceof MyPartioner) {if (this.numPartitions () = = ((MyPartioner) obj) .numPartitions ()) {return true;} return false;} return super.equals (obj);}}

When you use it, you can new an object directly.

PairRdd.groupbykey (new MyPartitioner ()) the above content is what is the function of the divider in Spark. Have you learned the knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report