In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
[TOC]
1. Some basic terms in spark
RDD: resilient distributed dataset, which is the core focus of spark
Operators: some functions that manipulate RDD
Application: spark program written by the user (DriverProgram + ExecutorProgram)
Job: an operation triggered by an action class operator
Stage: a set of tasks that divide job into stage based on dependencies
Task: there are multiple task with the same operation within the same stage (but with different data), which is the smallest execution unit in the cluster.
Maybe after talking about these concepts, I still don't quite understand it. It doesn't matter. It's just an impression.
II. Basic principles and use of RDD 2.1.What is RDD
RDD, full name: Resilient Distributed Dataset, that is, resilient distributed dataset. Is the most basic data abstraction in spark. It represents an immutable, partitioned set of elements that can be computed in parallel. RDD has the characteristics of data flow model: automatic fault tolerance, location-aware scheduling and scalability. Maybe it's not clear. Let me give you an example:
Suppose I use sc.textFile (xxxx) to read a file's data from hdfs, then the file's data is equivalent to a RDD, but in fact the file's data is processed on several different worker nodes, but logically, in this spark cluster, the data belongs to one RDD. This is why RDD is a logical concept, it is an abstract object of the entire cluster, distributed in the cluster. From this, we can see that RDD is the key for spark to realize data distributed computing processing. For example:
figure 2.1 RDD principle
2.2 Properties of RDD
With regard to the attributes of RDD, there is a comment in the source code as follows:
* Internally, each RDD is characterized by five main properties:*-A list of partitions1, is a group of partitions: RDD is composed of partitions, and each partition runs on a different Worker. In this way, distributed computing is realized, that is, the basic constituent unit of a dataset. For RDD, each shard is processed by a computing task and determines the granularity of parallel computing. You can specify the number of RDD shards when creating a RDD. If not, the default value will be used. The default value is the number of CPU Core assigned to the program. *-A function for computing each split2 and split are understood as partitions. In RDD, there are a series of functions for processing and calculating the data in each partition. Here we call a function an operator. RDD in Spark is calculated in fragments, and each RDD implements the compute function to achieve this purpose. The compute function composes iterators, eliminating the need to save the results of each calculation. Operator types: there is a dependency between transformation action*-A list of dependencies on other RDDs3 and RDD. Narrow dependence, wide dependence. Stage needs to be divided by dependencies, and tasks are executed according to Stage. Each transformation of RDD generates a new RDD, so there is a pipeline-like front-and-back dependency between RDD. When some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all partitions of RDD. *-Optionally, a Partitioner for key-value RDDs (e.g. To say that the RDD is hash-partitioned) 4. You can automatically create RDD with partition rules. When you create a RDD, you can specify partitions or customize partition rules. Currently, two types of sharding functions are implemented in Spark, one is hash-based HashPartitioner and the other is range-based RangePartitioner. Only for RDD of key-value will there be Partitioner, and the value of Parititioner of non-key-value RDD is None. The Partitioner function determines not only the number of slices for RDD itself, but also the number of slices for parent RDD Shuffle output. *-Optionally, a list of preferred locations to compute each split on (e.g. Block locations for* an HDFS file) 5. Priority is given to the node close to the file location to perform the task. Mobile computing, do not move data this point needs to be explained: generally speaking, spark is built on top of hdfs, reading data from hdfs for processing. Hdfs is a distributed storage, such as A, B, C three datanode, assuming that the data to be processed by spark happens to be stored on the C node. If spark executes the task on node B or node An at this time, it has to read data from node C and then transmit it to node An or B over the network before processing, which is actually very performance-consuming. Here, spark means that priority is given to executing tasks on nodes close to processing data, that is, priority is given to tasks on C nodes. This can save the time and performance of data transmission. That is, mobile computing without moving data. 2.3.Create RDD to create RDD first need to create SparkContext object: / / create spark profile object. Sets the app name, master address, and local as the local mode. / / if it is submitted to the cluster, it is usually not specified. Because it is possible to run on multiple cluster sinks, it is inconvenient to write val conf = new SparkConf (). SetAppName ("wordCount"). SetMaster ("local") / / create a spark context object val sc = new SparkContext (conf)
Create a RDD through sc.parallelize ():
Sc.parallelize (seq,numPartitions) seq: the number of numPartitions: number of partitions, such as list,array, etc., which can be unspecified. Default is 2 examples: val rdd1 = sc.parallelize (Array, 3) rdd1.partitions.length
Create from an external data source
Val rdd1 = sc.textFile ("/ usr/local/tmp_files/test_WordCount.txt") 2.4 operator type
Operators can be divided into transformation and action types.
Transformation:
Delayed calculation, lazy lazy value, will not trigger calculation. The calculation is triggered only if the action operator is encountered. They simply remember the transformation actions that are applied to the underlying dataset, such as a file. These transformations actually run only when an action occurs that requires the result to be returned to Driver. This design allows Spark to run more efficiently
Action:
Similar to transformation, but directly triggers the calculation and does not wait for the 2.5transformation operator
For the convenience of explanation, the implementation of creating a rdd is demonstrated in spark-shell:
Scala > val rdd1 = sc.parallelize (List) rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [0] at parallelize at: 242.5.1 map (func) map [U] (FRV T = > U) () argument is a function, and the function argument is required to be single, and the return value is also single. Use the function to process the incoming data, and then return the processed data example: / / pass in an anonymous function here, passing each value in rdd1 * 2, and return the new array scala > val rdd2 = rdd1.map (_ * 2) rdd2: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [1] at map at: 26max / where collect is an action operator Trigger the calculation and print the result scala > rdd2.collectres0: Array [Int] = Array (2,4,6,8,10,16,68,200,158) 2.5.2 the filterfilter parameter is a judgment function, determines whether the passed parameter is true or false, and is often used to filter data. Finally, the true data return example: / / filter out data greater than 20 scala > rdd2.filter (_ > 20). Collectres4: Array [Int] = Array (68,200,158) 2.5.3 flatMapflatMap (fpurt T = > U) map before flat,flat is to expand and merge multiple lists and other objects into a large list. And return the processed data. This function is generally used to handle multiple cases where a list also contains multiple lists: scala > val rdd4 = sc.parallelize (Array ("a b c", "d e f", "x y z")) rdd4: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [18] at parallelize at: 24 at parallelize at / processing logic is to cut each string in array by spaces and then generate multiple array Then merge multiple array deployments into a new arrayscala > val rdd5 = rdd4.flatMap (_ .split (") rdd5: org.apache.spark.rdd.RDD [String] = MapPartitionsRDD [19] at flatMap at: 26scala > rdd5.collectres5: Array [String] = Array (a, b, c, d, e, f, x, y, z) 2.5.4 set Operation union (otherDataset) Union intersection (otherDataset) intersection distinct ([numTasks]) deduplication example: scala > val rdd6 = sc.parallelize (List (5Pol 6)) 7) rdd6: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [20] at parallelize at: 24scala > val rdd7 = sc.parallelize (List) rdd7: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [21] at parallelize at: 24pm / merge scala > val rdd8 = rdd6.union (rdd7) rdd8: org.apache.spark.rdd.RDD [Int] = UnionRDD [22] at union at: 28scala > rdd8.collectres6: Array [Int] = Array 2, 3, 4, 5, 6) / / weight removal scala > rdd8.distinct.collectres7: Array [Int] = Array (4,8,1,9,5,6,10,2,7,3) / / intersection scala > val rdd9 = rdd6.intersection (rdd7) rdd9: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [31] at intersection at: 28scala > rdd9.collectres8: Array [Int] = Array (6 5) 2.5.5 grouping operation groupByKey ([numTasks]): only the same key is grouped and aggregated reduceByKey (f: (VMagneV) = > V, [numTasks]) first of all, the KV of the same key is aggregated. Then operate on the value. Scala > val rdd1 = sc.parallelize (List (("Tom", 1000), ("Jerry", 3000), ("Mary", 2000)) rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [32] at parallelize at: 24scala > val rdd2 = sc.parallelize (List (("Jerry", 1000), ("Tom", 3000), ("Mike", 2000)) rdd2: org.apache.spark.rdd.RDD [(String) Int)] = ParallelCollectionRDD [33] at parallelize at: 24scala > val rdd3 = rdd1 union rdd2rdd3: org.apache.spark.rdd.RDD [(String, Int)] = UnionRDD [34] at union at: 28scala > rdd3.collectres9: Array [(String, Int)] = Array ((Tom,1000), (Jerry,3000), (Mary,2000), (Jerry,1000), (Tom,3000), (Mike,2000)) scala > val rdd4 = rdd3.groupByKeyrdd4: org.apache.spark.rdd.RDD [(String) At groupByKey at [Int]] = ShuffledRDD [35] at groupByKey at: 30 shock / grouping scala > rdd4.collectres10: Array [(String, Iterable [Int])] = Array ((Tom,CompactBuffer (1000, 3000)), (Jerry,CompactBuffer (3000, 1000)), (Mike,CompactBuffer (2000)), (Mary,CompactBuffer (2000)) Note: when using grouping functions GroupByKey is not recommended. Because of poor performance, it is officially recommended that reduceByKey// group and aggregate scala > rdd3.reduceByKey (_ + _). Collectres11: Array [(String, Int)] = Array ((Tom,4000), (Jerry,4000), (Mike,2000), (Mary,2000)) 2.5.6 the function of cogroup is not well summarized. Look directly at the example: scala > val rdd1 = sc.parallelize (List (("Tom", 1), ("Tom", 2), ("jerry", 1), ("Mike", 2)) rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [37] at parallelize at: 24scala > val rdd2 = sc.parallelize (List (("jerry", 2), ("Tom", 1), ("Bob", 2)) rdd2: org.apache.spark.rdd.RDD [(String) Int)] = ParallelCollectionRDD [38] at parallelize at: 24scala > val rdd3 = rdd1.cogroup (rdd2) rdd3: org.apache.spark.rdd.RDD [(String, (Iterable [Int], Iterable [Int]))] = MapPartitionsRDD [40] at cogroup at: 28scala > rdd3.collectres12: Array [(String, (Iterable [Int], Iterable [int]))] = Array (Tom, (CompactBuffer (1, 2), CompactBuffer (1)), (Mike, (CompactBuffer (2), CompactBuffer (), (jerry, (CompactBuffer (1)) CompactBuffer (2), (Bob, (CompactBuffer (), CompactBuffer (2) 2.5.7 sort sortByKey (acsending:true/false) sort according to key in KV (sortBy (fuse T = > UrealsendingGroupe true false) General sort And it sorts the processed data, which can be used for KV. Sort by value example: scala > val rdd1 = sc.parallelize (List): org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [0] at parallelize at: 24scala > val rdd2 = rdd1.map (_ * 2) rdd2: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [1] at map at: 26scala > rdd2.collectres0: Array [Int] = Array (2,4,6,8,10,16,68,200,158) scala > rdd2.sortBy (x = > x) True) res1: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [6] at sortBy at: 29scala > rdd2.sortBy. Collectres2: Array [Int] = Array (2,4,6,8,10,16,68,158,200) scala > rdd2.sortBy. Collectres3: Array [Int] = Array (200,158,68,16,10,8,6,4,2)
Another example:
Requirements: we want to sort by value in KV, but SortByKey is sorted by key. Practice 1: 1. The first step is to exchange key value. Then call sortByKey2 and KV to change positions again scala > val rdd1 = sc.parallelize (List (("tom", 1), ("jerry", 1), ("kitty", 2), ("bob", 1)) rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [42] at parallelize at: 24scala > val rdd2 = sc.parallelize (List (("jerry", 2), ("tom", 3), ("kitty", 5), ("bob") ) rdd2: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [43] at parallelize at: 24scala > val rdd3 = rdd1 union (rdd2) rdd3: org.apache.spark.rdd.RDD [(String, Int)] = UnionRDD [44] at union at: 28scala > val rdd4 = rdd3.reduceByKey (_ + _) rdd4: org.apache.spark.rdd.RDD [(String, Int)] = ShuffledRDD [45] at reduceByKey at: 30scala > rdd4.collectres13: Array [(String, Int)] = Array (bob,3), (tom) 4), (jerry,3), (kitty,7)) / / change positions and reorder Then call back scala > val rdd5 = rdd4.map (t = > (t.false) .map) .rdd5: org.apache.spark.rdd.RDD [(String, Int)] = MapPartitionsRDD [50] at map at: 32scala > rdd5.collectres14: Array [(String, Int)] = Array ((kitty,7), (tom,4), (bob,3), (jerry) 3) practice 2: directly use the sortBy operator The 2.6action operator can be sorted directly by value
Reduce
Similar to the previous reduceByKey, but used for non-KV data merging, and is the action operator scala > val rdd1 = sc.parallelize (List (List)) rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [41] at parallelize at: 24scala > val rdd2 = rdd1.reduce (_ + _) rdd2: Int = 15
There are also some action operators:
Reduce (func) aggregates all the elements in the RDD through the func function, which must be an interchangeable and parallel collect () that returns all elements of the dataset as an array in the driver. First () returns the first element of RDD (similar to take (1)) take (n) returns an array of the first n elements of the dataset takeSample (withReplacement,num, [seed]), which consists of num elements randomly sampled from the dataset. You can choose whether to replace the insufficient parts with random numbers. Seed is used to specify the random number generator seed takeOrdered (n, [ordering]), return an array of the first n elements of the dataset, and sort saveAsTextFile (path) save the elements of the dataset as textfile to the HDFS file system or other supported file system. For each element, Spark will call the toString method. Replace it with the text in the file saveAsSequenceFile (path) save the elements in the dataset to the specified directory in Hadoop sequencefile format, so that HDFS or other file systems supported by Hadoop. SaveAsObjectFile (path) countByKey () returns a (KMagneInt) map for RDD of type (KMagneV), which represents the number of elements corresponding to each key. Foreach (func) runs the function func on each element of the dataset to update it. 2.7 caching characteristics of RDD
RDD also has a caching mechanism, that is, caching RDD to memory or disk without double counting.
Several operators are involved here:
Cache () indicates that the rdd can be cached, which is cached in memory by default. The bottom layer is actually calling persist () persist () to indicate that the rdd can be cached. The default cache in memory persist (newLevel: org.apache.spark.storage.StorageLevel) is similar to the above. But val NONE = new StorageLevel (false, false) val DISK_ONLY = new StorageLevel (true, false, false, false) val DISK_ONLY_2 = new StorageLevel (true, false, 2) val MEMORY_ONLY = new StorageLevel (false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel (false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel (false, true, false) False) val MEMORY_ONLY_SER_2 = new StorageLevel (false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel (true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel (true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel (true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel (true, true, false, false, 2) val OFF_HEAP = new StorageLevel (true, true 1) basically divided into two categories: pure memory cache pure disk cache disk + memory cache
Generally speaking, the direct default location, that is, caching in memory, performs better, but it consumes a lot of memory, which should be noted, if not necessary, do not cache.
For example:
Read a large file and count the number of lines scala > val rdd1 = sc.textFile ("hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt") rdd1: org.apache.spark.rdd.RDD [String] = hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt MapPartitionsRDD [52] at textFile at: 24scala > rdd1.countres15: Long = 923452 trigger calculation Counting the number of rows scala > rdd1.cacheres16: rdd1.type = hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt MapPartitionsRDD [52] at textFile at: 24 indicates that this RDD can be cached and will not trigger calculation scala > rdd1.countres17: Long = 923452 to trigger calculation, and cache the result scala > rdd1.countres18: Long = 923452 to read data directly from the cache.
One thing to note: when you call the cache method, you only say that when the rdd triggers the calculation later, the result can be cached, not that the current rdd is cached.
2.8Fault-tolerant Mechanism of RDD-checkpoint
During the calculation of spark, multiple conversion processes of RDD are involved. If a partition calculation of RDD fails at this time, the result is lost. The easiest way, of course, is to recalculate from scratch, but it's a waste of time. When checkpoint triggers the calculation, the RDD is saved in the checkpoint state, and if the later calculation is wrong, it can be recalculated from the checkpoint.
checkpoint generally sets a checkpoint path on a fault-tolerant and highly reliable file system (such as HDFS, S3, etc.) to save checkpoint data. Read directly from the checkpoint directory when something goes wrong. There are two modes: local directory and remote directory.
2.8.1 Local directory
In this mode, it is required to run in local mode and cannot be used in cluster mode. It is generally used for test and development.
Sc.setCheckpointDir (local path) set local checkpoint path rdd1.checkpoint set checkpoint rdd1.count encounter action class operator, trigger calculation, checkpoint 2.8.2 remote directory will be generated in the checkpoint directory (hdfs as an example)
This mode is required to run in cluster mode and be used in production environment.
The usage of scala > sc.setCheckpointDir ("hdfs://192.168.109.132:8020/sparkckpt0619") scala > rdd1.checkpointscala > rdd1.countres22: Long = 923452 is similar, but the directory is different.
Note that when using checkpoint, there is a passage in the source code as follows:
This function must be called before any job has been executed on this RDD. It is strongly recommend that this rdd ispersisted in memory,otherwise saving it on a file will require recomputation. The rough meaning is: call this method before you start the calculation, that is, before the action operator. And it's best to set the rdd cache in memory, otherwise you need to recalculate it when you save the checkpoint. 2.9Dependencies in RDD and stage principle 2.9.1 RDD dependence
is a key concept in the operation principle of RDD.
First of all, should talk about dependency, which means that there is a dependency between RDD, because the conversion of multiple RDD is involved in the calculation of spark. There are two different types of relationships between RDD and the parent on which it depends, namely narrow dependency (narrow dependency) and wide dependency (wide dependency). Look at the picture.
figure 2.2 width dependence of RDD
Wide dependency:
The partition of a parent RDD is dependent on the partition of multiple child RDD. In fact, it is the process of shuffle the data of the parent RDD, because a partition of the parent RDD is dependent on the partition of multiple RDD, which means that the data of the parent RDD needs to be scrambled and allocated to multiple RDD, and the process of disruption is actually shuffle. The general reality is that the partition of multiple parent RDD and the partition of multiple child RDD are interlaced dependencies.
Narrow dependence:
The partition of a parent RDD is dependent on at most the partition of one child RDD.
2.9.2 stage partition
figure 2.3 RDD dependency
DAG (Directed Acyclic Graph) is called directed acyclic graph. The original RDD forms DAG through a series of transformations, and DAG is divided into different Stage according to the different dependencies between RDD. The function of wide-narrow dependency is to divide the wide dependency between stage,stage and the narrow dependency within stage.
for narrow dependencies, because the partition of both parent and child RDD are one-to-one dependencies, parent-child transformations can be performed in a task. For example, the above task0,CDF are narrow dependencies, so direct CDF transformations can be performed in a task. There are narrow dependencies within a stage.
for broad dependencies, due to the existence of shuffle, it is required that all the parent RDD are processed before the execution shuffle can be executed, and then the child RDD can be processed. Due to the existence of shuffle, the task task chain must not be continuous, and it is necessary to start planning the task task chain again, so wide dependency is the basis for dividing stage.
goes further, why divide stage?
After divides the stage according to the wide dependency, the task chain is not contiguous because of the wide dependency of shuffle. The division of stage is to make a stage only narrow dependencies, narrow dependencies are one-to-one relationship, then the task chain is continuous, there is no shuffle, such as the above task0, C-> D-> F, in a partition, the conversion process is one-to-one, so it is a continuous task chain, put in a task, while the other partition is similar, put in the task1. Because F-> G is a wide dependency and requires shuffle, task chains are not contiguous. A line like this strands the RDD conversion logic until it encounters a wide dependency, which is a task, while an task actually deals with a partitioned data conversion process. In spark, task is the smallest scheduling unit, and spark assigns task to the worker node close to the partition data for execution. So in fact, spark scheduling is task.
then goes back to the original question, why divide the stage, because after dividing the stage according to the width dependency, it is convenient to divide the task inside the stage, each task processes the data of a partition, and then the spark dispatches the task to the corresponding worker node for execution. Therefore, from partition stage to partition task, the core is to implement parallel computing.
so, in the end, the purpose of dividing stage is to divide task more conveniently.
2.9.3 what is stored in RDD?
said that, in fact, we can think of a question: is there data stored in RDD? In fact, it is not, it actually stores the transformation chain of the data, specifically the transformation chain of the partition, that is, the operator contained in the task. After dividing the stage and then the task, it is clear what operators are inside a task, and then the computing task is sent to the worker node for execution. This kind of calculation is called the pipeline computing model, and the operator is in the pipeline.
therefore, in fact, RDD is called a flexible distributed dataset, which does not mean that it stores data, but stores the method functions that manipulate the data, that is, operators.
2.10 RDD advanced operator
2.10.1 mapPartitionsWithIndex
The parameter def mapPartitionsWithIndex [U] (f: (Int, Iterator [T]) ⇒ Iterator [U]) indicates that f is a function parameter and needs to be customized. F receives two parameters, the first of which is Int, which represents the partition number. The second Iterator [T] represents all the elements in the partition. With these two parameters, you can define functions that handle partitions. Iterator [U]: the result returned after the operation is completed. For example: print out the elements in each partition, including the partition number. / / create a rdd first Specify the number of partitions as 3scala > val rdd1 = sc.parallelize (List), 3) rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [0] at parallelize at: 24scala > def fun1 (index:Int,iter:Iterator [Int]): Iterator [string] = {| iter.toList.map (x = > "[PartId:" + index + ", value =" + x + "]) .iterator |} fun1: (index:Int Iter: Iterator [string] scala > rdd1.mapPartitionsWithIndex (fun1). Collectres0: Array [String] = Array ([PartId: 0, value = 1], [PartId: 0, value = 2], [PartId: 1, value = 3], [PartId: 1, value = 4], [PartId: 1, value = 5], [PartId: 2, value = 6], [PartId: 2, value = 7], [PartId: 2, value = 8])
2.10.2 aggregate
Aggregation operation, similar to grouping (group by). But aggregate aggregates locally (similar to combine in mr) and then globally. Performance is better than using the reduce operator directly, because reduce is directly globally aggregated. Def aggregate [U] (zeroValue: U) (seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U) Parameter description: zeroValue: initial value, which will be added to each partition Finally, it will also be added to the global operation seqOp: (U, T) ⇒ U: local aggregation operation function combOp: (U, U) ⇒ U: global aggregation operation function = = example 1: the initial value is 10scala > val rdd2 = sc.parallelize (List 2) rdd2: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [6] at parallelize at: 27max / print to see the partition scala > rdd2.mapPartitionsWithIndex (fun1). Collectres7: Array [String] = Array ([PartId: 0, value = 1], [PartId: 0, value = 2], [PartId: 1, value = 3], [PartId: 1, value = 4], [PartId: 1, value = 5]) / / find the maximum value for each partition Finally, the maximum value for each partition is obtained, and then the global maximum is added to scala > rdd2.aggregate (10) (max (_, _), _ + _) res8: Int = 30. Why is it 10 here? The initial value of 10 means there is an extra 10 local operation for each partition, and the maximum value for each partition is 10 global operation. There is also an additional 10, that is, 10 (local maximum) + 10 (local maximum) + 10 (global operation default value) = 30 partial operations = example 2: there are two ways to sum all partition global data using aggregate: 1, reduce (_ + _) 2, aggregate (0) (_ + _, _ + _)
2.10.3 aggregateByKey
Similar to the aggregate operation, the difference: the data of the operation, only operate on the value in the same key. The KV of the same key is grouped locally first, and then the value is aggregated. Then grouped globally, and then aggregated the value. Similar functions implemented by aggregateByKey and reduceByKey But it is more efficient than reduceByKey: val pairRDD = sc.parallelize (("cat", 2), ("cat", 5), ("mouse", 4), ("cat", 12), ("dog", 12), ("mouse", 2)), 2) def fun1 (index:Int,iter:Iterator [(String,Int)]): IteratorString = {iter.toList.map (x = > "[PartId:" + index + ") Value = "+ x +"]) .iterator} pairRDD.mapPartitionsWithIndex (fun1) .SecrettScala > val pairRDD = sc.parallelize (List (("cat", 2), ("cat", 5), ("mouse", 4), ("cat", 12), ("dog", 12), ("mouse", 2)), 2) pairRDD: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [16] at parallelize at: 27scala > def fun1 (index:Int,iter:Iterator [(String) Int)]): Iterator [string] = {| iter.toList.map (x = > "[PartId:" + index + ", value =" + x + "]) .iterator |} fun1: (index: Int, iter: Iterator [(String, Int)]) Iterator [string] scala > pairRDD.mapPartitionsWithIndex (fun1). Collectres31: Array [String] = Array ([PartId: 0, value = (cat,2)], [PartId: 0, value = (cat,5)], [PartId: 0, value = (mouse)] 4)], [PartId: 1, value = (cat,12)], [PartId: 1, value = (dog,12)], [PartId: 1, value = (mouse,2)]) requirements: find each partition in The animals with the largest number of animals are summed with pairRDD.aggregateByKey (0) (math.max (_, _), _ + _). Roomt0: (cat,2) and (cat,5)-- > (cat,5) (mouse,4) 1: (cat,12) (dog,12) (mouse,2): (cat,17) (mouse,6) (dog,12)
2.10.4 coalesce and repartition
Both of them are used for repartition repartition (numPartition) to specify the number of repartitions. Shuffle coalesce (numPartition,shuffleOrNot) must occur to specify the number of repartitions. Shuffle does not occur by default. You can specify shuffle.
It depends on the usage of more operators.
It's written in detail.
2.11 Subarea
Spark comes with two partition classes:
HashPartitioner: this is the default partitioner, which is used in some operators involving shuffle. Partitions are involved in some operators that can specify a minimum number of partitions. These partitions can only be used for KV pairs
RangePartitioner: partitioned according to the range of key. For example, 1x100101200is divided into different partitions.
You can also customize the partition by yourself, as follows:
1. Inherit the Partitioner class first, and write the partition logic in it to form a new partition class
2. Rdd.partitionBy (new partiotionerClassxxx ())
Example:
The data format is as follows: 192.168.88.1-- [30/Jul/2017:12:55:02 + 0800] "GET / MyDemoWeb/web.jsp HTTP/1.1" 200239192.168.88.1-- [30/Jul/2017:12:55:02 + 0800] "GET / MyDemoWeb/hadoop.jsp HTTP/1.1" 200242 requirements: write the access logs of the same page into a separate file code: package SparkExerimport org.apache.spark. {Partitioner SparkConf, SparkContext} import scala.collection.mutable/** * Custom Partition: * 1. Inherit Partitioner class first Write partition logic inside to form a new partition class * 2, rdd.partitionBy (new partiotionerClassxxx ()) * / object CustomPart {def main (args: Array [String]): Unit = {/ / specify the home directory of hadoop Some packages System.setProperty ("hadoop.home.dir" of hadoop are needed to write files to the local area. "F:\\ hadoop-2.7.2") val conf = new SparkConf (). SetAppName ("Tomcat Log Partitioner"). SetMaster ("local") val sc = new SparkContext (conf) / / cutting file val rdd1 = sc.textFile ("G:\ test\\ tomcat_localhost_access_log.2017-07-30.txt"). Map (line = > {val jspName = line.split (") (6) (jspName) Line)}) / / extract all key That is, the page name val rdd2 = rdd1.map (_. _ 1). Distinct (). Collect () / partition val rdd3 = rdd1.partitionBy (new TomcatWebPartitioner (rdd2)) / / write partition data to a file rdd3.saveAsTextFile ("G:\\ test\\ tomcat_localhost")}} class TomcatWebPartitioner (jspList: array [string]) extends Partitioner {privateval listMap = new mutable.HashMap [String,Int] () var partitionNum = 0 / according to the page name Plan the total number of partitions for (s {(item.mid, item.recs.map (itemRecs= > (itemRecs.mid,itemRecs.socre)) .toMap). CollectAsMap () this is a key step, which is to broadcast the variable out / / broadcast this variable, and then you can call val moviesRecsMapBroadcast = spark.sparkContext.broadcast (moviesRecsMap) / / in any executor because it is lazy loading. So you need to call it manually to actually broadcast the moviesRecsMapBroadcast.id. Spark small case 3.1 Statistics of the page requirements of the top N visitors: according to the site visit log, the data format of the page name of the top N visits is as follows: 192.168.88.1-[30/Jul/2017:12:55:02 + 0800] "GET / MyDemoWeb/web.jsp HTTP/1.1" 200239192.168.88.1-[30/Jul/2017:12:55:02 + 0800] " GET / MyDemoWeb/hadoop.jsp HTTP/1.1 "200 242 Code: package SparkExerimport org.apache.spark. {SparkConf SparkContext} / * analyze tomcat log * log example: * 192.168.88.1-- [30/Jul/2017:12:53:43 + 0800] "GET / MyDemoWeb/ HTTP/1.1" 200259 * * count the number of visits to each page * / object TomcatLog {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetAppName ("TomcatLog analysis"). SetMaster (" Local ") val sc = new SparkContext (conf) val rdd1 = sc.textFile (" G:\\ test\\ tomcat_localhost_access_log.2017-07-30.txt ") .map (_ .split (") (6)) .map ((_ (1)) .reduceByKey (_ + _) .map (t = > (t = > (t.account2recot.room1)) .sortByKey (false) .map (t = > (t.cur2mct. Room1) .map () / / you can also use sortBy (_. _ 2). False) sorts / / fetches the first N data rdd1.take (2) .foreach (x = > println (x.room1 + ":" + x.room2)) println ("=") / fetches the last N data rdd1.takeRight (2) .foreach (x = > println (x.room1 + ":" + x.foreach 2)) sc.stop ()} 3.2 custom partition examples in rdd.
Take a look at the example of partitions in 2.11 above.
3.3 spark connection mysqlpackage SparkExerimport java.sql. {Connection, DriverManager, PreparedStatement} import org.apache.spark. {SparkConf SparkContext} object SparkConMysql {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetAppName ("Tomcat Log To Mysql"). SetMaster ("local") val sc = new SparkContext (conf) val rdd1 = sc.textFile ("G:\\ test\\ tomcat_localhost_access_log.2017-07-30.txt") .map (_ .split ("") (6)) rdd1.foreach (l = > {/ The / jdbc operation needs to be included in the rdd to be called by executor on all worker That is, to use rdd to serialize val jdbcUrl = "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8" var conn:Connection = null / / sql to edit the object var ps:PreparedStatement = null conn = DriverManager.getConnection (jdbcUrl, "root", "wjt86912572") / /? Is a placeholder, followed by a value that needs to be filled in the form of ps1.setxxx (rowkey,value), in the order ps = conn.prepareStatement ("insert into customer values (?)") Ps.setString (1) ps.setInt (2)}} Note: when spark operates jdbc, if you directly use jdbc to operate the database, there will be serialization problems. Because in the spark distributed framework, all objects that operate on RDD should be internal to RDD before they can be used in the entire distributed cluster. That is, serialization is required. Generally speaking: five worker share a jdbc connection object, and five worker each create a separate connection object, so when defining a jdbc connection object, it needs to be defined within the RDD.
The above method is too cumbersome, and a new jdbc connection object is created for each data.
Optimization: use rdd1.foreachPartition () to operate on each partition rather than on each piece of data
This saves database resources by creating only one jdbc connection object for each partition
Package SparkExerimport java.sql. {Connection, DriverManager, PreparedStatement} import org.apache.spark. {SparkConf SparkContext} object SparkConMysql {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetAppName ("Tomcat Log To Mysql"). SetMaster ("local") val sc = new SparkContext (conf) val rdd1 = sc.textFile ("G:\\ test\\ tomcat_localhost_access_log.2017-07-30.txt") .map (_ .split ("") (6) rdd1.foreachPartition (updateMysql) / * * * the above method is too cumbersome And each data will create a new jdbc connection object * Optimization: use rdd1.foreachPartition () to operate on each partition Instead of operating on each piece of data * this saves database resources by creating only one jdbc connection object for each partition * /} def updateMysql (it: iterator [string]) = {val jdbcUrl = "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8" var conn:Connection = null / / sql statement edits the object var ps:PreparedStatement = null conn = DriverManager.getConnection (jdbcUrl, "root") "wjt86912572") / / conn.createStatement () / / ps = conn.prepareStatement ("select * from customer") /? Is a placeholder, followed by a value that needs to be filled in the form of ps1.setxxx (rowkey,value), in the order ps = conn.prepareStatement ("insert into customer values (?)") It.foreach (data= > {ps.setString (1) ps.setInt (2) ps.executeUpdate ()}) ps.close () conn.close ()}}
Another way to connect to mysql is through the JdbcRDD object.
Package SparkExerimport java.sql.DriverManagerimport org.apacheimport org.apache.sparkimport org.apache.spark.rdd.JdbcRDDimport org.apache.spark. {SparkConf, SparkContext} object MysqlJDBCRdd {def main (args: Array [String]): Unit = {val conn = () = > {Class.forName ("com.mysql.jdbc.Driver"). NewInstance () DriverManager.getConnection ("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "root") "wjt86912572")} val conf = new SparkConf () .setAppName ("Tomcat Log To Mysql") .setMaster ("local") val sc = new SparkContext (conf) / / create jdbcrdd object val mysqlRdd = new JdbcRDD (sc,conn, "select * from customer where id >? And id
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.