In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Dependencies of 1 RDD and Fault tolerant 1.1 RDD
There are two kinds of RDD dependencies: narrow dependency (Narrow Dependencies) and wide dependency (Wide Dependencies, called Shuffle Dependencies in source code).
Dependency has two functions, one is to solve the efficiency of data fault tolerance, and the other is to divide stage.
Narrow dependency: one Partition of each parent RDD is used by a maximum of one Partition of the quilt RDD (1:1 or nRDD 1). Operations such as map, filter, union, etc., all produce narrow dependencies.
Child RDD partitions usually correspond to several parent RDD partitions (O (1)), regardless of the size of the data.
Wide dependency: the Partition of a parent RDD will be used by the Partition of multiple child RDD, such as groupByKey, reduceByKey, sortByKey, etc.
(child RDD partitions usually correspond to all parent RDD partitions (O (n), depending on the data size)
Compared with wide dependencies, narrow dependencies are good for optimization, mainly based on the following two points:
1. Wide dependencies often correspond to shuffle operations. You need to transfer the partitions of the same parent RDD to different child RDD partitions during operation, which may involve data transfer between multiple nodes; while the partitions of each parent RDD with narrow dependencies are only passed to one child RDD partition, and the conversion can usually be completed in one node.
2. When the RDD partition is lost (a node fails), spark will recalculate the data.
For narrow dependencies, since one partition of the parent RDD corresponds to only one child RDD partition, it only needs to recalculate the parent RDD partition corresponding to the child RDD partition, so the recalculation results in 100% data utilization.
For wide dependencies, the recalculated parent RDD partition corresponds to multiple child RDD partitions, so in fact only part of the data in the parent RDD is used to recover the missing child RDD partition, and the other part corresponds to the other non-lost partition of the child RDD, which results in redundant computation; more generally, the wide dependent sub-RDD partition usually comes from multiple parent RDD partitions, and in extreme cases, all parent RDD partitions have to be recalculated.
As shown in the figure below, if the b1 partition is missing, it is necessary to recalculate a1memery a2 and a3, which results in redundant computation (data corresponding to b2 in a1mema2 a3).
It is useful to distinguish between these two kinds of dependencies. First, narrow dependencies allow all parent partitions to be pipelined (pipeline) on a cluster node. For example, map is performed element by element, followed by filter, while wide dependencies require all the parent partition data to be calculated first, and then Shuffle between nodes, similar to MapReduce. Second, narrow dependency can recover failed nodes more effectively, that is, only the parent partition of the missing RDD partition needs to be recalculated, and different nodes can be calculated in parallel; for a wide dependency Lineage graph, the failure of a single node may cause all ancestors of the RDD to lose some partitions, so it needs to be recalculated as a whole.
[misunderstanding] it has been misunderstood that each child RDD in a narrow dependency may correspond to more than one parent RDD, and when the child RDD is lost, it will cause multiple parent RDD to recalculate, so narrow dependency is not as advantageous as wide dependency. In fact, we should look at this problem at the partition level, and the utility of recalculation lies not in how many calculations are done, but in how many calculations are redundant. Recalculations in narrow dependencies are necessary, so recalculation is not redundant.
Narrowly dependent functions are: map, filter, union, join (parent RDD is hash-partitioned), mapPartitions, mapValues
Widely dependent functions are: groupByKey, join (parent RDD is not hash-partitioned), partitionBy
1.2 dependency exampl
Dependent inheritance relationship:
Val rdd1 = sc.parallelize (1 to 10,1)
Val rdd2 = sc.parallelize (11 to 20,1)
Val rdd3 = rdd1.union (rdd2)
Rdd3.dependencies.size
/ / the length is 2, and the values are rdd1 and rdd2, meaning that rdd3 depends on rdd1 and rdd2
Rdd3.dependencies
/ / result:
Rdd3.dependencies (0). Rdd.collect
/ / print the data of rdd1
Rdd3.dependencies (1). Rdd.collect
/ / print the data of rdd2
Rdd3.dependencies (3). Rdd.collect
/ / the array is out of bounds and an error is reported
Which RDD Actions corresponds to shuffleDependency? The following join (R5) does not seem to have shuffleDependency
Val R1 = sc.parallelize (List ("dog", "salmon", "salmon", "rat", "elephant"))
Val R2 = r1.keyBy (_ .length)
Val R3 = sc.parallelize (List ("dog", "cat", "gnu", "salmon", "rabbit", "turkey", "wolf", "bear", "bee"))
Val R4 = r3.keyBy (_ .length)
Val R5 = r2.join (R4)
Answer: join does not necessarily have shuffleDependency, but not in the above operation.
RedueceByKey produces shuffleDependency.
Note that the keyBy in the above operation is not quite what I expected. Pay attention.
KeyBy: similar to the map operation, an key is added to each element
The following example is interesting:
Val R1 = sc.textFile ("hdfs:///user/hadoop/data/block_test1.csv")
R1
Val R2 = r1.dependencies (0) .rdd
R2.partitions.size
R2.preferredLocations (r2.partitions (0))
R2.preferredLocations (r2.partitions (3))
What's interesting is (finding dependencies, priority locations):
1. The type of R1 is MapPartitionsRDD
2. R1 depends on R2, which cannot be seen without this assignment statement. Type R2 is: HadoopRDD
3. You can retrieve the location of each partition of R2, and the number of copies of the hdfs file system is set to 2.
1.3Fault tolerance of RDD (lineage, checkpoint)
Generally speaking, there are two ways of fault tolerance for distributed datasets: data checkpointing and recording data updates (CheckPoint Data, and Logging The Updates).
For large-scale data analysis, the cost of data checkpoint operation is very high, which requires the replication of large data sets between machines through the network connection of the data center, and the network bandwidth is often much lower than the memory bandwidth. at the same time, it also needs to consume more storage resources.
Therefore, Spark chooses how to record updates. However, if the update granularity is too fine and too much, then the record update cost is not low. Therefore, RDD only supports coarse-grained transformations, that is, recording only a single operation performed on a single block (recording how it is converted from other RDD, that is, lineage), and then creating a series of transformation sequences of RDD (each RDD contains information about how he was transformed from other RDD and how to reconstruct a piece of data. Therefore, the fault-tolerant mechanism of RDD, also known as "Lineage" fault-tolerance, is recorded in order to recover lost partitions.
Lineage is essentially similar to a redo log (Redo Log) in a database, except that the redo log has a large granularity and redoes the same thing to the global data to recover the data.
Lineage fault tolerance principle: in the fault tolerance mechanism, if a node crashes and the operation is narrowly dependent, you only need to recalculate the lost parent RDD partition and do not depend on other nodes. Wide dependencies require all partitions of the parent RDD to exist, which is expensive to recalculate. It can be understood whether the cost is economical or not: in a narrow dependency, when the partition of the child RDD is lost and the parent RDD partition is recalculated, all the data of the corresponding partition of the parent RDD is the data of the child RDD partition, and there is no redundant calculation. In the case of wide dependency, all the data of each parent RDD partition recalculated by a lost child RDD partition is not used by the lost child RDD partition, and some of the data is equivalent to the data needed in the corresponding unlost child RDD partition, which results in redundant computing overhead, which is why the wide dependency cost is higher. Therefore, if we use the Checkpoint operator to do checkpointing, we should consider not only whether the Lineage is long enough, but also whether there is a wide dependency. Adding Checkpoint to the wide dependency is the best value for money.
Checkpoint mechanism. RDD needs to add checkpoints in the following two cases:
The Lineage in @ DAG is too long and too expensive if it is recalculated (e.g. in multiple iterations)
More benefits can be gained by doing Checkpoint on wide dependencies.
Because RDD is read-only, consistency is not the main concern in Spark RDD computing, and memory is relatively easy to manage, which is also the foresight of the designer, which reduces the complexity of the framework, improves performance and scalability, and lays a strong foundation for the enrichment of the upper framework in the future.
In RDD computing, fault tolerance is carried out through the checkpoint mechanism. Traditionally, there are two ways to do checkpointing: through redundant data and log record update operations. The doCheckPoint method in RDD is equivalent to caching data through redundant data, while the lineage described earlier is fault-tolerant through fairly coarse-grained record update operations.
Checkpoint (essentially by writing RDD to Disk to do checkpoint) is to assist fault tolerance through lineage. If the Lineage is too long, the cost of fault tolerance will be too high, so it is better to do checkpoint fault tolerance in the intermediate stage. If a node has a problem and loses its partition, the overhead will be reduced by redoing Lineage from the RDD that does the checkpoint.
1.4 the relationship between checkpoint and cache
1. In essence, checkpoint is a fault-tolerant mechanism and cache is an optimization mechanism.
2. Checkpoint writes data to shared storage (hdfs); cache is usually in memory
3. If the RDD,computing chain is too long or depends on a lot of other RDD, you need to do checkpoint if it takes a long time or too much computation. Will be reused (but not too large) RDD, do cache.
In fact, storing the output of ShuffleMapTask to a local disk is considered a checkpoint, but the main purpose of this checkpoint is to output data to partition.
4. After the completion of the checkpoint operation of RDD, the lineage,cache operation will be cut off and it will not affect the lineage.
Checkpoint is particularly important in Spark Streaming, where stateful operations are needed in spark streaming, which is required in some stateful transformations, in which the generation of RDD relies on the previous batches, causing the dependency chain to grow over time. To avoid this endless lengthening, to periodically save the intermediate generated RDDs to reliable storage to cut off the dependency chain, checkpoint must be performed at regular intervals.
There is a significant difference between cache and checkpoint. The cache calculates the RDD and puts it in memory, but the dependency chain of RDD (equivalent to the redo log in the database) cannot be discarded. When a certain executor goes down at a certain point, the RDD of the above cache will be lost, which needs to be calculated by replaying the dependency chain. The difference is that checkpoint stores the RDD in HDFS, which is a reliable storage of multiple copies, so the dependency chain can be lost. That is to say, it breaks the dependency chain, which is highly fault-tolerant through replication.
Note: checkpoint needs to re-calculate the job from scratch. It is best to cache first so that checkpoint can directly save the RDD in the cache. There is no need to re-calculate it, which greatly improves the performance.
1.5 use and process of checkpoint
Correct use posture of checkpoint
Val data = sc.textFile ("/ tmp/spark/1.data") .cache () / / pay attention to cache
Sc.setCheckpointDir ("/ tmp/spark/checkpoint")
Data.checkpoint
Data.count
/ / question: whether there is a problem between cache and checkpoint; with cache to avoid the second calculation, I can see the relevant instructions in the code!
It is very simple to use: set the checkpoint directory, and then call the checkpoint method on rdd. When action, the data is checkpoint.
Checkpoint writing process
The following states are passed during the RDD checkpoint process
[Initialized-> marked for checkpointing-> checkpointing in progress-> checkpointed]
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.