In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The main contents of this section
Important concepts of Spark
Resilient distributed data set (RDD) Foundation
1. Important concepts of Spark
Part of this section is derived from the official document: http://spark.apache.org/docs/latest/cluster-overview.html
(1) Spark operation mode
At present, the most commonly used Spark operation modes are:
-local: runs in local thread mode, and is mainly used to develop and debug Spark applications
-Standalone: use Spark's own resource management and scheduler to run Spark cluster and adopt Master/Slave structure. In order to solve single point of failure, ZooKeeper can be used to achieve high reliability (High Availability,HA).
-Apache Mesos: runs on the famous Mesos resource management framework, which transfers resource management to Mesos,Spark only for task scheduling and computing
-Hadoop YARN: the cluster runs on Yarn Explorer, and resource management is handed over to Yarn,Spark for task scheduling and computing
In the Spark operation mode, the cluster operation mode of Hadoop YARN is the most commonly used. The first section of this course is to use Hadoop YARN to build Spark clusters. In this way, Spark matches perfectly with the Hadoop ecosystem to form a powerful cluster, which can be described as omnipotent.
(2) Spark components (Components)
A complete Spark application, such as the SparkWordCount program in the previous section, involves the components shown in the following figure when submitting the cluster to run:
Each Spark application runs on the cluster in a set of independent processes and is coordinated by the SparkContext object. The SparkContext object can be regarded as the entrance to the Spark application, called driver program,SparkContext, which can communicate with different kinds of cluster resource managers (Cluster Manager), such as Hadoop Yarn, Mesos, etc., so as to allocate the resources needed for the program to run, and obtain the resources needed for the cluster operation. The SparkContext will get the corresponding Executors on the other work nodes (Worker Node) in the cluster (different Spark applications have different Executor, and they are also independent processes. Executor provides distributed computing and data storage functions for the application), then SparkContext distributes the application code to each Executors, and finally assigns the Task to the executors to execute.
Term (terminology) Meaning (interpretation)
Application (Spark application) A user program that runs on Spark and consists of a driver program (containing SparkContext objects) and multiple executor threads on a cluster
Application jar (Spark application JAR package) Jar package contains the user Spark application. If the Jar package is to be submitted to the cluster to run, there is no need to package other Spark dependent packages at run time.
Driver program contains a program for the main method, which is responsible for creating SparkContext objects
Cluster manager cluster resource manager, such as Mesos,Hadoop Yarn
Deploy mode deployment mode, which is used to distinguish the operation mode of driver program: cluster mode (cluter mode), driver starts inside the cluster, and client mode (client mode), in which the driver process starts from outside the cluster
Worker node worker node, a node in a cluster that can run Spark applications
A process on Executor Worker node that is used to perform specific Spark application tasks and is responsible for data maintenance between tasks (data in memory or on disk). Different Spark applications have different Executor
Task runs in task units in Executor, and Spark applications are eventually divided into sets of optimized tasks (described in more detail in the next section)
Job A parallel computing task built by multiple tasks, such as action operations in Spark, such as collect,save, etc.)
Stage each job will be split into smaller task sets, which are called stage, and each stage is independent of each other (similar to map stage and reduce stage in MapReduce). Because it consists of multiple task sets, it is also called TaskSet.
two。 Resilient distributed data set (RDD) Foundation
Flexible distributed data set (RDD,Resilient Distributed Datasets), proposed by Berkeley Lab in 2011, the original paper title: Resilient Distributed Datasets: a Fault-Tolerant Abstraction for In-Memory Cluster Computing original paper is worth reading, is the first-hand material for the study of RDD, most of this section will be based on this paper.
(1) RDD design goal
RDD is used to support efficient use of intermediate results in parallel computing and a simpler programming model. It also has the high fault tolerance, efficient scheduling and scalability of parallel computing frameworks such as MapReduce. The fault tolerance of RDD is carried out by recording the lineage relationship of the RDD transformation operation, and lineage records the family relationship of RDD. When an error occurs, it is recovered directly through lineage. RDD is most suitable for data mining, machine learning and graph computing, so these applications involve iterative computing, and memory can greatly improve its execution efficiency in a distributed environment; RDD is not suitable for tasks such as distributed crawlers that need to update their shared status frequently.
Here is how to view the Lineage of RDD in spark-shell
/ / textFile reads the README.md file under the hdfs root directory, and then filters out all lines including Spark
Scala > val rdd2=sc.textFile ("/ README.md") .filter (line = > line.contains ("Spark"))
Rdd2: org.apache.spark.rdd.RDD [String] = MapPartitionsRDD [2] at filter at: 21
/ / the toDebugString method prints out the family relationship of RDD.
/ / you can see that the textFile method generates two RDD, namely HadoopRDD
/ / MapPartitionsRDD, and filter will also generate a new MapPartitionsRDD
Scala > rdd2.toDebugString
15-09-20 01:35:27 INFO mapred.FileInputFormat: Total input paths to process: 1
Res0: String =
(2) MapPartitionsRDD [2] at filter at: 21 []
| | MapPartitionsRDD [1] at textFile at: 21 [] |
| / README.md HadoopRDD [0] at textFile at: 21 []
one
two
three
four
five
six
seven
eight
nine
ten
eleven
twelve
(2) RDD abstraction
RDD is a read-only (val type), partitioned collection of records in Spark. There are only two ways to create a RDD in Spark: (1) from the storage system, and (2) from other RDD. There are several ways to create from storage, either a local file system, a distributed file system, or data in memory.
The following code demonstrates the creation of a RDD from HDFS
Scala > sc.textFile ("/ README.md")
Res1: org.apache.spark.rdd.RDD [String] = MapPartitionsRDD [4] at textFile at: 22
one
two
The following code demonstrates creating a RDD from memory
/ / an array is defined in memory
Scala > val data = Array (1,2,3,4,5)
Data: Array [Int] = Array (1,2,3,4,5)
/ / create ParallelCollectionRDD by parallelize method
Scala > val distData = sc.parallelize (data)
DistData: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [5] at parallelize at: 23
one
two
three
four
five
six
seven
The following code demonstrates the creation of a new RDD from another RDD
/ / the filter function converts distData RDD into a new RDD
Scala > val distDataFiletered=distData.filter (e = > e > 2)
DistDataFiletered: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [6] at filter at: 25
/ / trigger the action operation (we will talk about it later) to view the filtered content
/ / Note that collect is only suitable for use when the amount of data is small.
Scala > distDataFiltered.collect
Res3: Array [Int] = Array (3,4,5)
one
two
three
four
five
six
seven
eight
(3) RDD programming model
In the previous example, we have been exposed to how to program with RDD, which we mentioned earlier
/ / the filter function converts distData RDD into a new RDD
Scala > val distDataFiletered=distData.filter (e = > e > 2)
/ / trigger the action operation (we will talk about it later) to view the filtered content
/ / Note that collect is only suitable for use when the amount of data is small.
Scala > distDataFiltered.collect
one
two
three
four
five
This code has explained to us the core idea of the RDD programming model: "the filter function converts distData RDD into a new RDD" and "triggers the action operation". In other words, the operation of RDD includes Transformations (conversion) and Actions.
The transformations operation converts a RDD into a new RDD. It is important to note that all transformation are lazy. If people who know the lazy in scala know, it will not be executed immediately after transformation, but will only remember the transformation for the corresponding data set, and will not be executed until it is actually used. For example, after distData.filter (e = > e > 2) transformation, it will not be executed immediately, but will not be executed until the distDataFiltered.collect method is executed, as shown in the following figure
As you can see from the figure above, the final distDataFiltered.collect execution is not triggered until the transformation method is executed.
We know from the introduction of transformation that action is the incentive to solve the final execution of the program, and the action operation will return the result of the program execution such as collect operation or save the run result, such as the saveAsTextFile method in SparkWordCount.
The transformation supported by Spark 1.5.0 includes:
(1) map
Map function method parameters:
/ * *
* Return a new RDD by applying a function to all elements of this RDD.
, /
Def map [U: ClassTag] (f: t = > U): RDD [U]
one
two
three
four
/ / use example
Scala > val rdd1=sc.parallelize (Array) .map (x = > 2cm x) .collect
Rdd1: Array [Int] = Array (2,4,6,8)
one
two
(2) filter
Method parameters:
/ * *
* Return a new RDD containing only the elements that satisfy a predicate.
, /
Def filter (f: t = > Boolean): RDD [T]
one
two
three
four
Use the example
Scala > val rdd1=sc.parallelize (Array (1, 2, 3, 4). Filter (x = > x > 1). Collect
Rdd1: Array [Int] = Array (2,3,4)
one
two
three
(3) flatMap
Method parameters:
/ * *
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
, /
Def flatMap [U: ClassTag] (f: t = > TraversableOnce [U]): RDD [U]
one
two
three
four
five
Examples of use:
Scala > val data = Array (Array (1, 2, 3, 4, 5), Array (4, 5)
Data: array [Array [Int]] = Array (Array (1,2,3,4,5), Array (4,5,6))
Scala > val rdd1=sc.parallelize (data)
Rdd1: org.apache.spark.rdd.RDD [Array [Int]] = ParallelCollectionRDD [2] at parallelize at: 23
Scala > val rdd2=rdd1.flatMap (x = > x.map (y = > y))
Rdd2: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [3] at flatMap at: 25
Scala > rdd2.collect
Res0: Array [Int] = Array (1,2,3,4,5,4,6)
one
two
three
four
five
six
seven
eight
nine
ten
eleven
twelve
thirteen
fourteen
(4) mapPartitions (func)
This mapPartitions example comes from: https://www.zybuluo.com/jewes/note/35032
MapPartitions is a variant of map. The input function of map is applied to each element in RDD, while the input function of mapPartitions is applied to each partition, that is, the contents of each partition are treated as a whole. Its function is defined as:
Def mapPartitions [U: ClassTag] (f: Iterator [T] = > Iterator [U], preservesPartitioning: Boolean = false): RDD [U]
F is the input function, which handles the contents of each partition. The output of the content in each partition will be passed as Iterator [T] to the input function freguery f as Iterator [U]. The final RDD is merged by the results of all partitions processed by the input function.
Scala > val a = sc.parallelize (1 to 9,3)
Scala > def myfunc [T] (iter: Iterator [T]): Iterator [(T, T)] = {
Var res = List [(T, T)] ()
Var pre = iter.next
While (iter.hasNext) {
Val cur = iter.next
Res.: = (pre, cur)
Pre = cur
}
Res.iterator
}
Scala > a.mapPartitions (myfunc). Collect
Res0: Array [(Int, Int)] = Array ((2, Int), (1), (5), (4), (8), (7)
one
two
three
four
five
six
seven
eight
nine
ten
eleven
twelve
thirteen
The function myfunc in the above example is to form a Tuple between one element in the partition and its next element. Because the last element in the partition does not have the next element, (3p4) and (6p7) are not in the result.
There are also variants of mapPartitions, such as mapPartitionsWithContext, which can pass some state information during processing to user-specified input functions. There is also mapPartitionsWithIndex, which passes the index of the partition to the user-specified input function.
(5) mapPartitionsWithIndex
The mapPartitionsWithIndex function is a variation of the mapPartitions function with the following parameters:
Def mapPartitionsWithIndex [U: ClassTag] (
F: (Int, Iterator [T]) = > Iterator [U]
PreservesPartitioning: Boolean = false): RDD [U]
`
Scala > val a = sc.parallelize (1 to 9,3)
/ / the function has a partition index, and the first element of the returned collection is the partition index
Scala > def myfunc [T] (index:T,iter: Iterator [T]): Iterator [(Tmagenta T)] = {
Var res = List [(T, T)] ()
Var pre = iter.next
While (iter.hasNext) {
Val cur = iter.next
Res.: = (index,pre, cur)
Pre = cur
}
Res.iterator
}
Scala > a.mapPartitionsWithIndex (myfunc). Collect
Res11: Array [(Int, Int, Int)] = Array ((0meme 2) 3), (0 recorder 1), (1) 5), (1) 4), (2) 8), (2)
one
two
three
four
five
six
seven
eight
nine
ten
eleven
twelve
thirteen
fourteen
fifteen
sixteen
(6) sample
Method parameters:
/ * *
* Return a sampled subset of this RDD.
*
* @ param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @ param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0,1]
* with replacement: expected number of times each element is chosen; fraction must be > = 0
* @ param seed seed for the random number generator
, /
Def sample (
WithReplacement: Boolean
Fraction: Double
Seed: Long = Utils.random.nextLong): RDD [T]
one
two
three
four
five
six
seven
eight
nine
ten
eleven
twelve
thirteen
Examples of use:
Scala > val a = sc.parallelize (1 to 9,3)
A: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [12] at parallelize at: 21
Scala > val smapledA=a.sample (true,0.5)
SmapledA: org.apache.spark.rdd.RDD [Int] = PartitionwiseSampledRDD [13] at sample at: 23
Scala > smapledA.collect
Res12: Array [Int] = Array (3,3,3,5,6,8,8)
Scala > val smapledA2=a.sample (false,0.5). Collect
SmapledA2: Array [Int] = Array (1,4)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.