Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The way of Spark practice (Advanced)-- introduction to Spark to mastery: section IV

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The main contents of this section

Important concepts of Spark

Resilient distributed data set (RDD) Foundation

1. Important concepts of Spark

Part of this section is derived from the official document: http://spark.apache.org/docs/latest/cluster-overview.html

(1) Spark operation mode

At present, the most commonly used Spark operation modes are:

-local: runs in local thread mode, and is mainly used to develop and debug Spark applications

-Standalone: use Spark's own resource management and scheduler to run Spark cluster and adopt Master/Slave structure. In order to solve single point of failure, ZooKeeper can be used to achieve high reliability (High Availability,HA).

-Apache Mesos: runs on the famous Mesos resource management framework, which transfers resource management to Mesos,Spark only for task scheduling and computing

-Hadoop YARN: the cluster runs on Yarn Explorer, and resource management is handed over to Yarn,Spark for task scheduling and computing

In the Spark operation mode, the cluster operation mode of Hadoop YARN is the most commonly used. The first section of this course is to use Hadoop YARN to build Spark clusters. In this way, Spark matches perfectly with the Hadoop ecosystem to form a powerful cluster, which can be described as omnipotent.

(2) Spark components (Components)

A complete Spark application, such as the SparkWordCount program in the previous section, involves the components shown in the following figure when submitting the cluster to run:

Each Spark application runs on the cluster in a set of independent processes and is coordinated by the SparkContext object. The SparkContext object can be regarded as the entrance to the Spark application, called driver program,SparkContext, which can communicate with different kinds of cluster resource managers (Cluster Manager), such as Hadoop Yarn, Mesos, etc., so as to allocate the resources needed for the program to run, and obtain the resources needed for the cluster operation. The SparkContext will get the corresponding Executors on the other work nodes (Worker Node) in the cluster (different Spark applications have different Executor, and they are also independent processes. Executor provides distributed computing and data storage functions for the application), then SparkContext distributes the application code to each Executors, and finally assigns the Task to the executors to execute.

Term (terminology) Meaning (interpretation)

Application (Spark application) A user program that runs on Spark and consists of a driver program (containing SparkContext objects) and multiple executor threads on a cluster

Application jar (Spark application JAR package) Jar package contains the user Spark application. If the Jar package is to be submitted to the cluster to run, there is no need to package other Spark dependent packages at run time.

Driver program contains a program for the main method, which is responsible for creating SparkContext objects

Cluster manager cluster resource manager, such as Mesos,Hadoop Yarn

Deploy mode deployment mode, which is used to distinguish the operation mode of driver program: cluster mode (cluter mode), driver starts inside the cluster, and client mode (client mode), in which the driver process starts from outside the cluster

Worker node worker node, a node in a cluster that can run Spark applications

A process on Executor Worker node that is used to perform specific Spark application tasks and is responsible for data maintenance between tasks (data in memory or on disk). Different Spark applications have different Executor

Task runs in task units in Executor, and Spark applications are eventually divided into sets of optimized tasks (described in more detail in the next section)

Job A parallel computing task built by multiple tasks, such as action operations in Spark, such as collect,save, etc.)

Stage each job will be split into smaller task sets, which are called stage, and each stage is independent of each other (similar to map stage and reduce stage in MapReduce). Because it consists of multiple task sets, it is also called TaskSet.

two。 Resilient distributed data set (RDD) Foundation

Flexible distributed data set (RDD,Resilient Distributed Datasets), proposed by Berkeley Lab in 2011, the original paper title: Resilient Distributed Datasets: a Fault-Tolerant Abstraction for In-Memory Cluster Computing original paper is worth reading, is the first-hand material for the study of RDD, most of this section will be based on this paper.

(1) RDD design goal

RDD is used to support efficient use of intermediate results in parallel computing and a simpler programming model. It also has the high fault tolerance, efficient scheduling and scalability of parallel computing frameworks such as MapReduce. The fault tolerance of RDD is carried out by recording the lineage relationship of the RDD transformation operation, and lineage records the family relationship of RDD. When an error occurs, it is recovered directly through lineage. RDD is most suitable for data mining, machine learning and graph computing, so these applications involve iterative computing, and memory can greatly improve its execution efficiency in a distributed environment; RDD is not suitable for tasks such as distributed crawlers that need to update their shared status frequently.

Here is how to view the Lineage of RDD in spark-shell

/ / textFile reads the README.md file under the hdfs root directory, and then filters out all lines including Spark

Scala > val rdd2=sc.textFile ("/ README.md") .filter (line = > line.contains ("Spark"))

Rdd2: org.apache.spark.rdd.RDD [String] = MapPartitionsRDD [2] at filter at: 21

/ / the toDebugString method prints out the family relationship of RDD.

/ / you can see that the textFile method generates two RDD, namely HadoopRDD

/ / MapPartitionsRDD, and filter will also generate a new MapPartitionsRDD

Scala > rdd2.toDebugString

15-09-20 01:35:27 INFO mapred.FileInputFormat: Total input paths to process: 1

Res0: String =

(2) MapPartitionsRDD [2] at filter at: 21 []

| | MapPartitionsRDD [1] at textFile at: 21 [] |

| / README.md HadoopRDD [0] at textFile at: 21 []

one

two

three

four

five

six

seven

eight

nine

ten

eleven

twelve

(2) RDD abstraction

RDD is a read-only (val type), partitioned collection of records in Spark. There are only two ways to create a RDD in Spark: (1) from the storage system, and (2) from other RDD. There are several ways to create from storage, either a local file system, a distributed file system, or data in memory.

The following code demonstrates the creation of a RDD from HDFS

Scala > sc.textFile ("/ README.md")

Res1: org.apache.spark.rdd.RDD [String] = MapPartitionsRDD [4] at textFile at: 22

one

two

The following code demonstrates creating a RDD from memory

/ / an array is defined in memory

Scala > val data = Array (1,2,3,4,5)

Data: Array [Int] = Array (1,2,3,4,5)

/ / create ParallelCollectionRDD by parallelize method

Scala > val distData = sc.parallelize (data)

DistData: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [5] at parallelize at: 23

one

two

three

four

five

six

seven

The following code demonstrates the creation of a new RDD from another RDD

/ / the filter function converts distData RDD into a new RDD

Scala > val distDataFiletered=distData.filter (e = > e > 2)

DistDataFiletered: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [6] at filter at: 25

/ / trigger the action operation (we will talk about it later) to view the filtered content

/ / Note that collect is only suitable for use when the amount of data is small.

Scala > distDataFiltered.collect

Res3: Array [Int] = Array (3,4,5)

one

two

three

four

five

six

seven

eight

(3) RDD programming model

In the previous example, we have been exposed to how to program with RDD, which we mentioned earlier

/ / the filter function converts distData RDD into a new RDD

Scala > val distDataFiletered=distData.filter (e = > e > 2)

/ / trigger the action operation (we will talk about it later) to view the filtered content

/ / Note that collect is only suitable for use when the amount of data is small.

Scala > distDataFiltered.collect

one

two

three

four

five

This code has explained to us the core idea of the RDD programming model: "the filter function converts distData RDD into a new RDD" and "triggers the action operation". In other words, the operation of RDD includes Transformations (conversion) and Actions.

The transformations operation converts a RDD into a new RDD. It is important to note that all transformation are lazy. If people who know the lazy in scala know, it will not be executed immediately after transformation, but will only remember the transformation for the corresponding data set, and will not be executed until it is actually used. For example, after distData.filter (e = > e > 2) transformation, it will not be executed immediately, but will not be executed until the distDataFiltered.collect method is executed, as shown in the following figure

As you can see from the figure above, the final distDataFiltered.collect execution is not triggered until the transformation method is executed.

We know from the introduction of transformation that action is the incentive to solve the final execution of the program, and the action operation will return the result of the program execution such as collect operation or save the run result, such as the saveAsTextFile method in SparkWordCount.

The transformation supported by Spark 1.5.0 includes:

(1) map

Map function method parameters:

/ * *

* Return a new RDD by applying a function to all elements of this RDD.

, /

Def map [U: ClassTag] (f: t = > U): RDD [U]

one

two

three

four

/ / use example

Scala > val rdd1=sc.parallelize (Array) .map (x = > 2cm x) .collect

Rdd1: Array [Int] = Array (2,4,6,8)

one

two

(2) filter

Method parameters:

/ * *

* Return a new RDD containing only the elements that satisfy a predicate.

, /

Def filter (f: t = > Boolean): RDD [T]

one

two

three

four

Use the example

Scala > val rdd1=sc.parallelize (Array (1, 2, 3, 4). Filter (x = > x > 1). Collect

Rdd1: Array [Int] = Array (2,3,4)

one

two

three

(3) flatMap

Method parameters:

/ * *

* Return a new RDD by first applying a function to all elements of this

* RDD, and then flattening the results.

, /

Def flatMap [U: ClassTag] (f: t = > TraversableOnce [U]): RDD [U]

one

two

three

four

five

Examples of use:

Scala > val data = Array (Array (1, 2, 3, 4, 5), Array (4, 5)

Data: array [Array [Int]] = Array (Array (1,2,3,4,5), Array (4,5,6))

Scala > val rdd1=sc.parallelize (data)

Rdd1: org.apache.spark.rdd.RDD [Array [Int]] = ParallelCollectionRDD [2] at parallelize at: 23

Scala > val rdd2=rdd1.flatMap (x = > x.map (y = > y))

Rdd2: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [3] at flatMap at: 25

Scala > rdd2.collect

Res0: Array [Int] = Array (1,2,3,4,5,4,6)

one

two

three

four

five

six

seven

eight

nine

ten

eleven

twelve

thirteen

fourteen

(4) mapPartitions (func)

This mapPartitions example comes from: https://www.zybuluo.com/jewes/note/35032

MapPartitions is a variant of map. The input function of map is applied to each element in RDD, while the input function of mapPartitions is applied to each partition, that is, the contents of each partition are treated as a whole. Its function is defined as:

Def mapPartitions [U: ClassTag] (f: Iterator [T] = > Iterator [U], preservesPartitioning: Boolean = false): RDD [U]

F is the input function, which handles the contents of each partition. The output of the content in each partition will be passed as Iterator [T] to the input function freguery f as Iterator [U]. The final RDD is merged by the results of all partitions processed by the input function.

Scala > val a = sc.parallelize (1 to 9,3)

Scala > def myfunc [T] (iter: Iterator [T]): Iterator [(T, T)] = {

Var res = List [(T, T)] ()

Var pre = iter.next

While (iter.hasNext) {

Val cur = iter.next

Res.: = (pre, cur)

Pre = cur

}

Res.iterator

}

Scala > a.mapPartitions (myfunc). Collect

Res0: Array [(Int, Int)] = Array ((2, Int), (1), (5), (4), (8), (7)

one

two

three

four

five

six

seven

eight

nine

ten

eleven

twelve

thirteen

The function myfunc in the above example is to form a Tuple between one element in the partition and its next element. Because the last element in the partition does not have the next element, (3p4) and (6p7) are not in the result.

There are also variants of mapPartitions, such as mapPartitionsWithContext, which can pass some state information during processing to user-specified input functions. There is also mapPartitionsWithIndex, which passes the index of the partition to the user-specified input function.

(5) mapPartitionsWithIndex

The mapPartitionsWithIndex function is a variation of the mapPartitions function with the following parameters:

Def mapPartitionsWithIndex [U: ClassTag] (

F: (Int, Iterator [T]) = > Iterator [U]

PreservesPartitioning: Boolean = false): RDD [U]

`

Scala > val a = sc.parallelize (1 to 9,3)

/ / the function has a partition index, and the first element of the returned collection is the partition index

Scala > def myfunc [T] (index:T,iter: Iterator [T]): Iterator [(Tmagenta T)] = {

Var res = List [(T, T)] ()

Var pre = iter.next

While (iter.hasNext) {

Val cur = iter.next

Res.: = (index,pre, cur)

Pre = cur

}

Res.iterator

}

Scala > a.mapPartitionsWithIndex (myfunc). Collect

Res11: Array [(Int, Int, Int)] = Array ((0meme 2) 3), (0 recorder 1), (1) 5), (1) 4), (2) 8), (2)

one

two

three

four

five

six

seven

eight

nine

ten

eleven

twelve

thirteen

fourteen

fifteen

sixteen

(6) sample

Method parameters:

/ * *

* Return a sampled subset of this RDD.

*

* @ param withReplacement can elements be sampled multiple times (replaced when sampled out)

* @ param fraction expected size of the sample as a fraction of this RDD's size

* without replacement: probability that each element is chosen; fraction must be [0,1]

* with replacement: expected number of times each element is chosen; fraction must be > = 0

* @ param seed seed for the random number generator

, /

Def sample (

WithReplacement: Boolean

Fraction: Double

Seed: Long = Utils.random.nextLong): RDD [T]

one

two

three

four

five

six

seven

eight

nine

ten

eleven

twelve

thirteen

Examples of use:

Scala > val a = sc.parallelize (1 to 9,3)

A: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [12] at parallelize at: 21

Scala > val smapledA=a.sample (true,0.5)

SmapledA: org.apache.spark.rdd.RDD [Int] = PartitionwiseSampledRDD [13] at sample at: 23

Scala > smapledA.collect

Res12: Array [Int] = Array (3,3,3,5,6,8,8)

Scala > val smapledA2=a.sample (false,0.5). Collect

SmapledA2: Array [Int] = Array (1,4)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report