In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
How to carry out a detailed analysis of Spark Core, in view of this problem, this article introduces the corresponding analysis and solutions in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.
Let's start with a question, which is often asked in an interview:
Why is Spark popular?
Reason 1: excellent data model and rich computing abstraction
Before the emergence of Spark, very mature computing systems such as MapReduce already exist, and provide high-level API (map/reduce), which runs computing in the cluster and provides fault tolerance, so as to achieve distributed computing.
Although MapReduce provides the abstraction of data access and computing, the reuse of data is simply to write intermediate data to a stable file system (such as HDFS), so it will produce data replication and backup, disk I bank O and data serialization, so it will be very inefficient when it comes to operations that need to reuse intermediate results between multiple calculations. This kind of operation is very common, such as iterative computing, interactive data mining, graph computing and so on.
Recognizing this problem, the academic AMPLab proposed a new model called RDD. RDD is a fault-tolerant and parallel data structure (in fact, it can be understood as a distributed set, which is as easy to operate as a local set). It allows users to explicitly save the intermediate result data set in memory, and optimize the data storage processing by controlling the partition of the data set. RDD also provides rich API (map, reduce, filter, foreach, redeceByKey...). To manipulate the dataset. RDD was later provided and open source by AMPLab in a framework called Spark.
In short, Spark draws lessons from the idea of MapReduce, retains the advantages of distributed parallel computing and improves its obvious shortcomings. API, which allows intermediate data to be stored in memory, improves the speed of development and provides rich operational data.
Reason 2: perfect biosphere-fullstack
At present, Spark has developed into a collection of multiple sub-projects, including SparkSQL, Spark Streaming, GraphX, MLlib and other sub-projects.
Spark Core: realizes the basic functions of Spark, including RDD, task scheduling, memory management, error recovery, interaction with the storage system and other modules.
A package used by Spark SQL:Spark to manipulate structured data. With Spark SQL, we can manipulate the data using SQL.
A component provided by Spark Streaming:Spark for streaming real-time data. Provides an API for manipulating data streams.
Spark MLlib: a library that provides common machine learning (ML) functions. It includes classification, regression, clustering, collaborative filtering, etc., as well as additional support functions such as model evaluation and data import.
GraphX (Graph Computing): API, which is used for graph computing in Spark, has good performance, has rich functions and operators, and can run complex graph algorithms freely on massive data.
Cluster Manager: Spark is designed to efficiently scale computing from one compute node to thousands of compute nodes.
StructuredStreaming: handles structured flows, unifying offline and real-time API.
Spark VS Hadoop
HadoopSpark type basic platform, including computing, storage, scheduling distributed computing tools scenarios batch iterative computing on large-scale data sets, interactive computing, stream computing prices are low for machines, cheap for memory, relatively expensive programming paradigm Map+Reduce, API is relatively low-level, algorithm adaptability is poor RDD constitutes DAG directed acyclic graph, API is at the top level. Convenient to use data storage structure MapReduce intermediate calculation results are stored on HDFS disk, delay large RDD intermediate calculation results are stored in memory, delayed small operation mode Task is maintained in process mode, task startup slow Task is maintained in thread mode, and task startup is fast.
❣️ Note:
Although Spark has great advantages over Hadoop, Spark can not completely replace Hadoop,Spark, which is mainly used to replace the MapReduce computing model in Hadoop. HDFS can still be used for storage, but intermediate results can be stored in memory; scheduling can use Spark built-in, or a more mature scheduling system, YARN, and so on.
In fact, Spark has been well integrated into the Hadoop ecosystem and has become an important member of it. It can realize resource scheduling and management with the help of YARN and distributed storage with the help of HDFS.
In addition, Hadoop can use cheap, heterogeneous machines to do distributed storage and computing, but Spark has higher requirements for hardware and memory and CPU.
Spark Core 1, RDD detailed explanation 1. Why is there a RDD?
In many iterative algorithms (such as machine learning, graph algorithm, etc.) and interactive data mining, the intermediate results are reused between different computing stages, that is, the output of one stage will be used as the input of the next stage. However, the previous MapReduce framework used an acyclic data flow model to write intermediate results into HDFS, resulting in a lot of data replication, disk IO, and serialization overhead. And these frameworks can only support some specific computing models (map/reduce), and do not provide a general data abstraction.
A paper on RDD published by AMP Lab: "Resilient Distributed Datasets: a Fault-Tolerant Abstraction for In-Memory Cluster Computing" is designed to solve these problems.
RDD provides an abstract data model, so that we do not have to worry about the distributed characteristics of the underlying data, we only need to express the specific application logic as a series of conversion operations (functions), and the conversion operations between different RDD can also form dependencies, thus realizing pipelization, thus avoiding the storage of intermediate results and greatly reducing the overhead of data replication, disk IO and serialization. It also provides more API (map/reduec/filter/groupBy...).
2. What is RDD?
RDD (Resilient Distributed Dataset), called elastic distributed dataset, is the most basic data abstraction in Spark, representing an immutable, partitioned set of elements that can be computed in parallel. Word disassembly:
Resilient: it is flexible, the data in RDD can be stored in memory or disk Distributed: its elements are distributed storage, can be used for distributed computing Dataset: it is a collection, can store a lot of elements 3. RDD main attributes
Go to the source code of RDD and take a look:
RDD source code
You can see the comments on RDD introduction in the source code. Let's translate:
A list of partitions: a list of Partition / Partition, the basic constituent units of a dataset. For RDD, each shard is processed by a computing task, and the number of shards determines the degree of parallelism. You can specify the number of RDD shards when creating a RDD. If not, the default value will be used.
A function for computing each split: a function is applied to each partition. RDD in Spark is calculated in fragments, and the compute function is applied to each partition.
A list of dependencies on other RDDs: one RDD depends on multiple other RDD. Each transformation of RDD generates a new RDD, so there is a pipeline-like front-and-back dependency between RDD. When some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all partitions of RDD. (fault tolerance mechanism of Spark)
Optionally, a Partitioner for key-value RDDs (e.g. To say that the RDD is hash-partitioned): optional, for RDD of type KV there will be a Partitioner, the partition function of RDD, which defaults to HashPartitioner.
Optionally, a list of preferred locations to compute each split on (e.g. Block locations for an HDFS file): optional, a list that stores the priority location (preferred location) for each Partition. For a HDFS file, this list holds the location of the block where each Partition is located. According to the concept of "Mobile data is not as good as Mobile Computing", Spark will try its best to select those worker nodes that store data for task computing when scheduling tasks.
Summary
RDD is a representation of a dataset, representing not only the dataset, but also where the dataset comes from and how it is calculated. The main properties include:
Partition list calculates function dependencies the best location for partition functions (default is hash)
Partition list, partition function, and the best location, these three attributes actually talk about where the dataset is, where it is more appropriate to calculate, and how to partition.
Computing functions and dependencies, these two attributes actually talk about where the dataset came from.
2. RDD-API1. How RDD is created
Created by datasets of external storage systems, including local file systems, as well as all datasets supported by Hadoop, such as HDFS, Cassandra, HBase, etc.:
Val rdd1 = sc.textFile ("hdfs://node1:8020/wordcount/input/words.txt")
A new RDD is generated by operator transformation from the existing RDD:
Val rdd2=rdd1.flatMap (_ .split (""))
Created by an existing Scala collection:
Val rdd3 = sc.parallelize (Array (1, 2, 3, 4, 5, 6, 7, 8)) or
Val rdd4 = sc.makeRDD (List (1, 2, 3, 4, 5, 6, 7, 8))
The bottom layer of the makeRDD method calls the parallelize method:
RDD source code 2. Operator classification of RDD
RDD's operators fall into two categories:
Transformation conversion operation: returns a new RDDAction action operation: the return value is not RDD (no return value or other)
❣️ Note:
1. Instead of actually storing the data to be calculated, RDD records where the data is located and the transformation relationship of the data (what methods are called and what functions are passed in).
2. All transformations in RDD are lazily evaluated / delayed, that is, they are not evaluated directly. These transformations actually run only when an Action action occurs that requires the result to be returned to Driver.
3. Lazy evaluation / delayed execution is used because this allows RDD operations to form a DAG directed acyclic graph for Stage partition and parallel optimization during Action. This design allows Spark to run more efficiently.
3. Transformation conversion operator meaning map (func) returns a new RDD. The RDD is composed of each input element transformed by the func function to form a filter (func) to return a new RDD. The RDD is made up of input elements with a return value of true calculated by the func function. FlatMap (func) is similar to map, but each input element can be mapped to 0 or more output elements (so func should return a sequence Instead of a single element) mapPartitions (func) is similar to map, but runs independently on each shard of RDD, so when running on a RDD of type T, the function type of func must be Iterator [T] = > Iterator [U] mapPartitionsWithIndex (func) similar to mapPartitions, but func takes an integer parameter to represent the index value of the shard, so when running on a RDD of type T The function type of func must be (Int, Interator [T]) = > Iterator [U] sample (withReplacement, fraction, seed) samples the data according to the ratio specified by fraction, and you can choose whether to replace it with random numbers. Seed is used to specify the random number generator seed union (otherDataset) to join the source RDD and the parameter RDD and return a new RDDintersection (otherDataset) after the intersection of the source RDD and the parameter RDD return a new RDDdistinct ([numTasks]). After deduplicating the source RDD, a new RDDgroupByKey ([numTasks]) is called on a (KMJ V) RDD, and a (K, Iterator [V]) RDDreduceByKey (func, [numTasks]) is called on a (KMJ V) RDD. Returns a (KQuery V) RDD that aggregates the values of the same key together using the specified reduce function. Similar to groupByKey, the number of reduce tasks can be set by the second optional parameter aggregateByKey (zeroValue) (seqOp, combOp, [numTasks]) to aggregate the same key values in the PairRDD, and a neutral initial value is also used in the aggregation process. Similar to the aggregate function, the type of the aggregateByKey return value does not need to be the same as the type of value in RDD. SortByKey ([ascending], [numTasks]) must be called on a (KMagol V) RDD. K must implement the Ordered interface and return a RDDsortBy (func, [ascending], [numTasks]) sorted by key. (func, [ascending], [numTasks]) is similar to sortByKey, but more flexible join (otherDataset, [numTasks]) calls on RDD of types (KMague V) and (KMagw). Return a RDDcogroup (otherDataset, [numTasks]) of (K, (VMagneW)) of all the elements corresponding to the same key called on the RDD of type (K, (Iterable,Iterable)) and (K, (Iterable,Iterable)), return a RDDcartesian (otherDataset) Cartesian product pipe (command, [envVars]) of type (K, (Iterable,Iterable)) pipe operation coalesce (numPartitions) on rdd to reduce the number of RDD partitions to the specified value. After filtering a large amount of data, you can perform this operation repartition (numPartitions) to re-partition RDD 4. Action Action operator meaning reduce (func) aggregates all elements in the RDD through the func function, which must be an interchangeable and parallel collect () in the driver Return all elements of the dataset as an array count () returns the number of elements of RDD first () returns the first element of RDD (similar to take (1)) take (n) returns an array of the first n elements of the dataset takeSample (withReplacement,num, [seed]) returns an array of num elements randomly sampled from the dataset, you can choose whether to replace the insufficient parts with random numbers Seed is used to specify that the random number generator seed takeOrdered (n, [ordering]) returns the first n elements of the natural or custom order saveAsTextFile (path) saves the elements of the dataset in the form of textfile to the HDFS file system or other supported file system. For each element, Spark will call the toString method, replace it with text in the file saveAsSequenceFile (path), and save the elements in the dataset to the specified directory in Hadoop sequencefile format. You can make HDFS or other file system saveAsObjectFile (path) supported by Hadoop serialize the elements of the dataset to the specified directory in the way of Java serialization. CountByKey () for the RDD of type (KMagi V), returns a map of (KMIT), indicating the number of elements corresponding to each key foreach (func) on each element of the dataset, run the function func to update foreachPartition (func) on each partition of the dataset, run the function func
Statistical operations:
Operator meaning count number mean mean sum sum max maximum min minimum variance variance sampleVariance calculates variance stdev standard deviation from sampling: measure the discreteness of data sampleStdev sampling standard deviation stats view statistical results 3, RDD persistence / cache
In actual development, some RDD calculations or conversions may be time-consuming. If these RDD will be used frequently in the future, you can persist / cache these RDD so that you don't have to recalculate it next time, which improves the efficiency of the program.
Val rdd1 = sc.textFile ("hdfs://node01:8020/words.txt")
Val rdd2 = rdd1.flatMap (x = > x.split ("")). Map ((_, 1)). ReduceByKey (_ + _)
Rdd2.cache / / caching / persistence
Rdd2.sortBy (_. _ 2 _ false) .persist / / triggers action to read the HDFS file, and rdd2 will actually perform persistence
Rdd2.sortBy (_. _ 2 _ false) .trigger action will read the data in the cache and execute faster than before, because the rdd2 has been persisted into memory
Persistence / caching API details ersist method and cache method
RDD can cache the previous calculation results through the persist or cache methods, but instead of caching immediately when these two methods are called, when the subsequent action is triggered, the RDD will be cached in the memory of the compute node and reused later.
By looking at the source code of RDD, it is found that cache finally calls the persist no-parameter method (the default storage is only in memory):
RDD source code storage level
The default storage level is only stored in memory, and there are many other storage levels for Spark, which are defined in object StorageLevel.
The persistence level description MORY_ONLY (default) stores RDD in JVM as a nonserialized Java object. If there is not enough memory to store RDD, some partitions will not be cached and will be recalculated every time they are needed. This is the default level MORY_AND_DISK (which can be used in development) to store RDD in JVM as a nonserialized Java object. If the data cannot fit in memory, it is overwritten to disk. MEMORY_ONLY_SER (Java and Scala) is read from disk when needed. RDD is stored as serialized Java objects (one byte array per partition). This is generally more space efficient than nonserialized objects (deserialized objects), especially when using fast serialization But reading data in this way consumes more CPUMEMORY_AND_DISK_SER (Java and Scala) like MEMORY_ONLY_SER, but if the data cannot be stored in memory, it will be overwritten to disk instead of having to recalculate them every time DISK_ONLY stores RDD partitions on disk MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc., at the same storage level as above, except that persistent data is stored in two copies. Backup each partition is stored on two cluster nodes OFF_HEAP (in the lab) is similar to MEMORY_ONLY_SER, but stores the data in out-of-heap memory. (that is, not directly stored in JVM memory)
Summary:
The purpose of RDD persistence / caching is to improve the speed of subsequent operations. There are many cache levels. By default, there are only in memory. When memory_and_disk is used in development, RDD data will only be persisted / cached when performing action operations. In actual development, if a RDD is frequently used later, the RDD can be persisted / cached. 4. Limitations of RDD fault-tolerant mechanism Checkpoint persistence:
Persistence / caching can put data in memory, which is fast, but it is also the least reliable; you can also put data on disk, which is not completely reliable! For example, the disk will be damaged.
Problem solving:
The generation of Checkpoint is for more reliable data persistence. In Checkpoint, the data is generally placed on HDFS, which naturally relies on the natural high fault tolerance and high reliability of HDFS to achieve maximum data security, and to achieve the fault tolerance and high availability of RDD.
Usage:
Directory of SparkContext.setCheckpointDir ("directory") / / HDFS
RDD.checkpoint
Summary:
How to ensure the security and reading efficiency of data in development: caching / persistence can be done for frequently used and important data, and then checkpint operation can be done.
The difference between persistence and Checkpoint:
Location: Persist and Cache can only be stored on local disk and memory (or out-of-heap memory-in the lab) Checkpoint can save data to reliable storage such as HDFS.
Life cycle: the RDD of Cache and Persist will be cleared after the end of the program, or the RDD that manually calls the unpersist method Checkpoint will still exist after the end of the program and will not be deleted.
5. RDD dependency 1. There are two types of dependencies: there are two different types of dependencies between RDD and its parent RDD, namely wide dependency (wide dependency/shuffle dependency) and narrow dependency (narrow dependency).
Illustration: width dependence how to distinguish width dependence:
Narrow dependency: one partition of the parent RDD will only be dependent on one partition of the child RDD
Wide dependency: a partition of the parent RDD will be covered by multiple partition dependencies of the child RDD (involving shuffle).
two。 Why design width dependence for narrow dependence:
Multiple partitions with narrow dependencies can be calculated in parallel
If the data of a narrow dependent partition is lost, you only need to recalculate the data of the corresponding partition.
For wide dependencies:
The basis for dividing Stage (phases): for wide dependencies, you must wait until the calculation of the previous stage is complete before the next phase can be calculated.
6. The generation and partition of DAG Stage1. DAG describes what DAG is:
DAG (Directed Acyclic Graph directed acyclic graph) refers to the process of data transformation, with direction and no closed loop (actually the process executed by RDD)
The original RDD forms the DAG directed acyclic graph through a series of transformation operations. When the task is executed, the real calculation (a process in which the data is manipulated) can be performed according to the description of DAG.
Boundary of DAG
Start: RDD created with SparkContext
End: Action is triggered, and once Action is triggered, a complete DAG is formed.
2.DAG partition StageDAG partition Stage
A Spark program can have multiple DAG (there are several Action, there are several DAG, at the end of the figure, there is only one Action (not shown in the figure), then it is a DAG).
A DAG can have multiple Stage (divided by wide dependency / shuffle).
Multiple Task can be executed in parallel with the same Stage (number of task = number of partitions, as shown above, there are three partitions P1, P2, P3 in Stage1, and corresponding three Task).
You can see that only the reduceByKey operation in this DAG is a wide dependency, which is used by the Spark kernel as a boundary to divide it into different Stage.
At the same time, we can notice that in the Stage1 in the figure, there are narrow dependencies from textFile to flatMap to map, and these steps can form a pipeline operation. The partition generated by the flatMap operation can proceed with the map operation without waiting for the end of the whole RDD operation, which greatly improves the efficiency of the calculation.
Why divide Stage?-- parallel Computing
If a complex business logic has shuffle, it means that the previous phase produces results before the next phase can be executed, that is, the calculation of the next phase depends on the data of the previous phase. Then we divide according to shuffle (that is, by wide dependency), we can divide a DAG into multiple Stage/ phases. In the same Stage, there will be multiple operators to form a pipeline pipeline, and multiple parallel partitions in the pipeline can be executed in parallel.
How to divide the stage of DAG?
For narrow dependencies, the conversion processing of partition is calculated in stage, not partitioned (the narrow dependencies can be pipelined in the same stage as far as possible).
For wide dependencies, due to the existence of shuffle, the next calculation can only be started after the parent RDD processing is completed, that is, the stage needs to be partitioned.
The answers to the questions on how to conduct a detailed analysis of Spark Core are shared here. I hope the above content can be of some help to you. If you still have a lot of doubts to be solved, you can follow the industry information channel to learn more about it.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.