In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Brief introduction of elastic data set RDDs I and RDD
RDD, whose full name is Resilient Distributed Datasets, is the most basic data abstraction of Spark. It is a read-only, partitioned collection of records that supports parallel operations and can be transformed from external data sets or other RDD. It has the following characteristics:
A RDD consists of one or more Partitions. For RDD, each partition is processed by a computing task. Users can specify the number of partitions when creating a RDD. If not, the number of cores assigned by the program is adopted by default. RDD has a function used to calculate partitions, compute;RDD saves dependencies between them, and each transformation of RDD generates a new dependency, which is like a pipeline between RDD. After some partition data is lost, the lost partition data can be recalculated through this dependency instead of recalculating all partitions of the RDD. Key-Value RDD also has a Partitioner (partition) to determine which partition the data is stored in. HashPartitioner (by hash partition) and RangeParationer (by range) are currently supported in Spark. A list of priority locations (optional) that stores the priority locations (prefered location) for each partition. For a HDFS file, this list stores the location of each partition's block. According to the concept of "mobile data is not as good as mobile computing", Spark will assign computing tasks to the storage location of the data blocks it wants to process as much as possible when scheduling tasks.
Some of the code related to the RDD [T] abstract class is as follows:
/ / implemented by a subclass to calculate a given partition def compute (split: Partition) Context: TaskContext): Iterator [T] / / get all partitions protected def getPartitions: Array [Partition] / / get all dependencies protected def getDependencies: Seq [dependency [_]] = deps// get priority location list protected def getPreferredLocations (split: Partition): Seq [String] = Nil// partitioner is overridden by subclasses to specify their partition mode @ transient val partitioner: Option [Partitioner] = None II, create RDD
There are two ways to create RDD, which are described as follows:
2.1 created from an existing collection
Spark-shell is used for testing here, and the startup command is as follows:
Spark-shell-master local [4]
After starting spark-shell, the program automatically creates an application context, which is equivalent to executing the following Scala statement:
Val conf = new SparkConf () .setAppName ("Spark shell") .setMaster ("local [4]") val sc = new SparkContext (conf)
To create a RDD from an existing collection, you can specify the number of partitions at the time of creation, and if not, the number of cores assigned to the CPU by the program:
Val data = Array (1, 2, 3, 4, 5) / / create a RDD from an existing collection. The default number of partitions is the number of cores of the CPU assigned by the program val dataRDD = sc.parallelize (data) / / View the number of partitions dataRDD.getNumPartitions// explicitly specifies the number of partitions val dataRDD = sc.parallelize (data,2)
The implementation results are as follows:
2.2 referencing datasets in an external storage system
Reference a dataset from an external storage system, such as the local file system, HDFS,HBase, or any data source that supports Hadoop InputFormat.
Val fileRDD = sc.textFile ("/ usr/file/emp.txt") / / get the first line of text fileRDD.take (1)
There are two points to be aware of when using an external storage system:
If you read data from the local file system in a cluster environment, the file must exist on all machines in the cluster with the same path; support directory paths, support compressed files, and support the use of wildcards. 2.3 textFile & wholeTextFiles
Both can be used to read external files, but the return format is different:
TextFile: the return format is RDD [String], which is the file content, and each element in the RDD corresponds to a row of data; wholeTextFiles: the return format is RDD [(String, String)], the first parameter in the tuple is the file path, and the second parameter is the file content; both provide a second parameter to control the minimum number of partitions; when reading the file from HDFS, Spark creates a partition for each block. Def textFile (path: String,minPartitions: Int = defaultMinPartitions): RDD [String] = withScope {...} def wholeTextFiles (path: String,minPartitions: Int = defaultMinPartitions): RDD [(String, String)] = {..} III. Operation RDD
RDD supports two types of operations: transformations (conversion, creating a new dataset from an existing dataset) and actions (returning values to the driver after running calculations on the dataset). All conversion operations in RDD are lazy, they just remember them, but they don't execute them immediately, and they don't really evaluate until they encounter an action operation, which is similar to lazy evaluation in functional programming.
Val list = List (1,2,3) / / map is a transformations operation, while foreach is an actions operation sc.parallelize (list). Map (_ * 10) .foreach (println) / / output: 10 20 30 4, cache RDD4.1 cache level
One reason why Spark is so fast is that RDD supports caching. After successful caching, if the dataset is used in subsequent operations, it is fetched directly from the cache. Although there is a risk of cache loss, due to the dependency between RDD, if the cache data of a partition is lost, you only need to recalculate that partition.
Spark supports multiple cache levels:
Storage Level (storage level) Meaning (meaning) the default cache level of MEMORY_ONLY, which stores RDD in JVM in the form of deserialized Java objects. If there is not enough memory space, some of the partitioned data will no longer be cached. MEMORY_AND_DISK stores the RDD in JVM as a deserialized Java object. If there is not enough memory space, store uncached partition data to disk and read from disk when you need to use these partitions. MEMORY_ONLY_SER stores RDD as serialized Java objects (each partition is an array of byte). This approach saves storage space than deserializing objects, but increases the computational burden on CPU when reading. Only Java and Scala are supported. MEMORY_AND_DISK_SER is similar to MEMORY_ONLY_SER, but overflowed partition data is stored to disk instead of being recalculated when used. Only Java and Scala are supported. DISK_ONLY only caches RDDMEMORY_ONLY_2, MEMORY_AND_DISK_2, and etc on disk with the same function as the corresponding levels above, but makes replicas on two nodes in the cluster for each partition. OFF_HEAP is similar to MEMORY_ONLY_SER, but stores data in out-of-heap memory. This requires out-of-heap memory to be enabled.
You need to configure two parameters to start out-of-heap memory:
Spark.memory.offHeap.enabled: whether to enable out-of-heap memory. The default value is false, which needs to be set to true;spark.memory.offHeap.size: the size of out-of-heap memory space. The default value is 0, and it needs to be set to a positive value. 4.2 use caching
There are two ways to cache data: persist and cache. What cache calls internally is also persist, which is a specialized form of persist and is equivalent to persist (StorageLevel.MEMORY_ONLY). Examples are as follows:
/ / all storage levels are defined in the StorageLevel object fileRDD.persist (StorageLevel.MEMORY_AND_DISK) fileRDD.cache () 4.3 remove cache
Spark automatically monitors cache usage on each node and deletes old data partitions according to the least recently used (LRU) rule. Of course, you can also delete it manually using the RDD.unpersist () method.
5. Understand the introduction of shuffle5.1 shuffle
In Spark, a task corresponds to a partition, and data is usually not manipulated across partitions. But if you encounter operations such as reduceByKey, Spark must read data from all partitions, look for all values of all keys, and then aggregate them to calculate the final result of each key, which is called Shuffle.
5.2 impact of Shuffle
Shuffle is an expensive operation because it usually manipulates data across nodes, which involves disk ID O, network ID O, and data serialization. Some Shuffle operations also consume a lot of heap memory because they use heap memory to temporarily store data that needs to be transferred over the network. Shuffle also generates a large number of intermediate files on disk, which are retained from Spark 1.3 until the corresponding RDD is no longer used and garbage collected, in order to avoid repeated creation of Shuffle files during calculation. If the application retains references to these RDD for a long time, garbage collection may not occur for a long time, which means that long-running Spark jobs can take up a lot of disk space, and you can usually use the spark.local.dir parameter to specify the storage directory for these temporary files.
5.3 actions that cause Shuffle
Because Shuffle operations have a significant impact on performance, you need to pay special attention to their use. All of the following operations will result in Shuffle:
Involves rezoning operations: such as repartition and coalesce; all operations involving ByKey, such as groupByKey and reduceByKey, except countByKey; join operations: such as cogroup and join. 5. Wide dependence and narrow dependence
There are two different types of dependencies between RDD and its parent RDD (s):
Narrow dependency: a partition of the parent RDDs is dependent on at most one partition of the child RDDs; wide dependency (wide dependency): a partition of the parent RDDs can be dependent on multiple child partitions of the child RDDs.
As shown in the following figure, each box represents a RDD, and a rectangle with color represents the partition:
It is useful to distinguish between these two types of dependencies:
First, narrow dependencies allow pipelined (pipeline) computation of parent partition data on a cluster node, such as performing a map operation followed by a filter operation. On the other hand, the wide dependency needs to calculate the data of all the parent partitions, and then Shuffle between the nodes, which is similar to MapReduce. Narrow dependencies allow for more efficient data recovery because only the parent partition of the lost partition needs to be recalculated and different nodes can calculate in parallel, while for wide dependencies, if data is lost, all parent partition data needs to be calculated and Shuffle again. VI. Generation of DAG
RDD (s) and their dependencies constitute DAG (directed acyclic graph). DAG defines the Lineage (pedigree) relationship between these RDD (s). Through the pedigree relationship, if part or all of the calculation results of a RDD are lost, it can also be recalculated. So how does Spark generate computing tasks based on DAG? DAG is mainly divided into different computing phases (Stage) according to different dependencies:
For narrow dependencies, because the dependency of the partition is determined, the conversion operation can be executed in the same thread, so it can be divided into the same execution phase; for wide dependencies, due to the existence of Shuffle, the next calculation can only be started after the parent RDD (s) is processed by Shuffle, so the phases need to be re-divided when encountering wide dependencies.
Reference materials Zhang'an Station. Spark technology insider: in-depth analysis of Spark kernel architecture design and implementation principle [M]. Machinery Industry Press. 2015-09-01RDD Programming GuideRDD: memory-based fault-tolerant abstraction for cluster computing
For more articles in big data's series, please see the GitHub Open Source Project: big data's getting started Guide.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.