Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Data Partition of 7.spark core

2025-03-13 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Brief introduction

One of the most important features of    spark is to control the partition of the dataset at each node. Controlling data distribution can reduce network overhead and greatly improve overall performance.

   only Pair RDD has partitions, and the value of non-Pair RDD partitions is None. If the RDD is scanned only once, there is no need for pre-partitioning; partitioning is useful if the RDD is used multiple times in key-based operations such as connections.

Divider

The    partitioner determines the number of partitions in the RDD and which partition each piece of data ultimately belongs to.

   spark provides two dividers: HashPartitioner and RangePartitioner, both of which inherit from the org.apache.spark.Partitioner class and implement three methods.

NumPartitions: Int: specify the number of partitions getPartition (key: Any): Int: partition number (0~numPartitions-1) equals (): check whether the partition object is the same as other partition instances, and determine whether the two RDD partition methods are the same. HashPartitioner partition

How    HashPartitioner partitions work: for a given key, calculate its hashCode, divide it by the number of partitions to take the remainder, and the final value is the partition ID to which the key belongs. The implementation is as follows:

Class HashPartitioner (partitions: Int) extends Partitioner {require (partitions > = 0, s "Number of partitions ($partitions) cannot be negative.") Def numPartitions: Int = partitions def getPartition (key: Any): Int = key match {case null = > 0 case _ = > Utils.nonNegativeMod (key.hashCode, numPartitions)} override def equals (other: Any): Boolean = other match {case h: HashPartitioner = > h.numPartitions = numPartitions case _ = > false} override def hashCode: Int = numPartitions} RangePartitioner partition

   HashPartitioner partitions can cause an uneven amount of data in each partition. On the other hand, the RangePartitioner partition tries to ensure that the amount of data in each partition is uniform, mapping the number in a certain range to a certain partition. The data between partitions is ordered, but the elements within the partition cannot be guaranteed to be in order.

The principle of    RangePartitioner partition execution:

Calculate the overall data sampling size sampleSize, the calculation rule is: each partition to extract at least 20 data or a maximum of 1m of data. Calculate the number of data samples for each partition according to sampleSize and the number of partitions sampleSizePrePartition calls the sketch function of RangePartitioner for data sampling and calculates the samples of each partition. Calculate the overall proportion of the sample and the data partition with too much data to prevent the data from tilting. For the RDD partition with a large amount of data, the sample function API of RDD is called to extract the data again. The final sample data is sorted and allocated through the determineBounds function of RangePartitoner, and the rangeBounds is calculated. Class RangePartitioner [K: Ordering: ClassTag, V] (partitions: Int, rdd: RDD [_ = 0) S "Number of partitions cannot be negative but found $partitions.") / / get the sorter of type K data in RDD private var ordering = implied [ordering [K]] / / An array of upper bounds for the first (partitions-1) partitions private var rangeBounds: Array [K] = {if (partitions if (fraction * n > sampleSizePerPartition) {/ / if the fraction multiplied by the amount of data in the current partition is greater than the abstract data size of each partition calculated previously Then it means that the data extracted by the current partition is too little, and the data distribution of the partition is uneven. It is necessary to re-extract imbalancedPartitions + = idx} else {/ / the partition that does not belong to the partition with uneven data distribution, calculate the proportion weight, and add / / The weight is 1 over the sampling probability to the candidates collection. Val weight = (n.toDouble / sample.size). ToFloat for (key (x, weight))} / / calculate the final sampling data rangeBounds out RangePartitioner.determineBounds (candidates, partitions)} / / the number of partitions of the next RDD is the number of elements in the rangeBounds array + 1 def numPartitions: Int = rangeBounds.length + 1 / / binary finder Internally use the binary lookup method private var binarySearch provided by the Arrays class in java: ((Array [K], K) = > Int) = CollectionsUtils.makeBinarySearch [K] / / returns the corresponding partition id based on the key value of RDD. Def getPartition (key: Any): Int = {/ / cast the key type to the original data type in RDD val k = key.asInstanceOf [K] var partition = 0 if (rangeBounds.length rangeBounds.length) {partition = rangeBounds.length}} / / sort the data according to whether it is in ascending or descending order The default is ascending if (ascending) {partition} else {rangeBounds.length-partition}} that affects the operator operation of the partition

The operator operations that    affects partitioning are: cogroup (), groupWith (), join (), leftOuterJoin (), rightOuterJoin (), groupByKey (), reduceByKey (), combineByKey (), partitionBy (), repartition (), coalesce (), sort (), mapValues () (if the parent RDD has a partition mode), flatMapValues () (if the parent RDD has a partition mode).

   for operator operations that perform two RDD, the way the output data is partitioned depends on how the parent RDD is partitioned. By default, the result is hash partitions, and the number of partitions is the same as the parallelism of the operation. However, if one of the parent RDD sets the partitioning mode, the result is that partitioning method; if both parent RDD sets the partitioning mode, the result RDD uses the partitioning mode of the first parent RDD.

The difference between repartition and partitionBy

Both    repartition and partitionBy repartition the data, and both use HashPartitioner by default. But the differences between the two are:

PartitionBy can only be used when Pair RDD acts on Pair RDD, and the result is different.

   in fact, the result of partitionBy is what we expected. Instead of using the original key, repartition actually uses a randomly generated number as a key.

Def repartition (numPartitions: Int) (implicit ord: Ordering [T] = null): RDD [T] = withScope {coalesce (numPartitions, shuffle = true)} def coalesce (numPartitions: Int, shuffle: Boolean = false) (implicit ord: Ordering [T] = null): RDD [T] = withScope {if (shuffle) {/ * Distributes elements evenly across output partitions, starting from a random partition. * / val distributePartition = (index: Int, items: Iterator [T]) = > {var position = (new Random (index)) .nextInt (numPartitions) items.map {t = > / / Note that the hash code of the key will just be the key itself. The HashPartitioner / / will mod it with the number of total partitions. Position = position + 1 (position, t)}}: Iterator [(Int, T)] / / include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD (new ShuffledRDD [Int, T, T] (mapPartitionsWithIndex (distributePartition), new HashPartitioner (numPartitions)), numPartitions). Values} else {new CoalescedRDD (this, numPartitions)}} repartition and coalesce

Both    operators repartition the partition of RDD. Repartition is only a simple implementation of shuffle as true in the coalesce interface (assuming that RDD has N partitions and needs to be redivided into M partitions)

NM and N and M are not much different (if N is 1000 and M is 100), shuffle can be set to false, no shuffle process is performed, and there is a narrow dependency between the parent RDD and the child RDD. In the case that shuffle is false, if NM and there is a great difference between the two, if shuffle is set to false, and the father and son RDD have a narrow dependency and are in the same Stage, the parallelism of the spark program may not be enough, thus affecting the performance. If when M is 1, in order to make the operation before coalesce have better parallelism, you can set shuffle to true. Case analysis requirements

   counts how users visit their unsubscribed topic pages.

User information table: a RDD,UserInfo made up of (UserID,UserInfo) contains a list of topics subscribed to by this user. Event list: a RDD made up of (UserID,LinkInfo), which stores the visits of each user of the site every five minutes. The code implements val sc = new SparkContext () val userData = sc.sequenceFile [UserID,LinkInfo] ("hdfs://..."). Persistdef processNewLogs (logFileName:String) {val events = sc.sequenceFile [UserID,LinkInfo] (logFileName) / / RDD of (UserID, (UserInfo,LinkInfo)) pairs val joined = usersData.join (events) val offTopicVisits = joined.filter {/ / Expand the tuple into its components case (userId, (userInfo) LinkInfo)) = >! userInfo.topics.contains (linkInfo.topic)} .count () println ("Number of visits to non-subscribed opics:" + offTopicVisits)} shortcomings

The    join operation calculates the hash values of all keys in both datasets, transmits records with the same hash value to the same machine over the network, and then joins all records with the same key. The userData table has a large amount of data, so it is time-consuming to hash and shuffle data across nodes.

The improved code implements the advantages of val userData = sc.sequenceFile [UserID,LinkInfo] ("hdfs://..."). PartionBy (new HashPartiotioner (100)). Persist ()

The    userData table is repartitioned to put all the data with the same key in a single partition. Then call persist to persist the result data without calculating hash and cross-node mixing every time. The running speed of the program is significantly improved.

Loyal to technology, love sharing. Welcome to the official account: java big data programming to learn more technical content.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report