In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "what are the knowledge points of Spark programming". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what are the knowledge points of Spark programming".
# Spark Development Guide #
# # introduction # # in general, every Spark application consists of a driver that runs the user's main function and performs a variety of parallel operations on a cluster. The main abstraction (concept) provided by Spark is a flexible distributed dataset, which is a collection of elements that are divided into different nodes of the cluster and can be operated in parallel. The creation of a RDDs can start with a file on the Hadoop file system (or any file system that supports Hadoop), or by converting a collection of Scala that already exists in the driver. Users can also make Spark persist a RDD to memory so that it can be effectively reused in parallel operations. Finally, RDDs can automatically recover from node failures.
The second abstraction (concept) in Spark is shared variables, which can be used in parallel operations. By default, Spark runs a function in parallel through a series of tasks on different nodes. He passes a copy of the variables used in each function to each task. Sometimes, a variable needs to be shared between different tasks, or between tasks and drivers. Spark supports two types of shared variables: broadcast variables, which can cache a value in the memory of all nodes, an accumulator, and a variable that can only be added, such as counters and sums.
This guide demonstrates each feature of Spark in each language supported by Spark. It is easiest to follow along with if you launch Spark's interactive shell-either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.
# # connecting to Spark##
# Java###
Spark1.0.2 works after Java6 or java6. If you are using Java8,Spark to support lamdba expressions to simplify function writing, otherwise, you can use the classes under the org.apache.spark.api.java.function package.
To write Spark applications with Java, you need to add Spark dependencies. Spark can be used through Maven Central:
GroupId=org.apache.spark artifactId=spark-core_2.10 version=1.0.2
In addition, if you want to access a HDFS cluster, you need to add a hadoop-client dependency based on your version of HDFS. Some commonly used HDFS version tags are displayed on the page.
GroupId=org.apache.hadoop artifactId=hadoop-client version=
Finally, you need to import some Spark classes into your program by adding the following lines:
Import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.SparkConf
# # initializing Spark##
# the first thing a Java### Spark program needs to do is to create a JavaSparkContext object that tells Spark how to access a cluster. To create a SparkContext, you must first create a SparkConf object that contains information about your application.
SparkConf conf=new SparkConf () .setAppName (appName) .setMaster (master); JavaSparkContext sc=new JavaSparkContext (conf)
The appName parameter is the name of your application and will be displayed on the UI of the cluster. Master is Spark, Mesos, or YARN cluster URL, or a dedicated string "local" to make it run in local mode. In practice, when running on a cluster, you will not want to hard-code master into the program, but by running the program using spark-submit and accepting master. However, in local tests or unit tests, you can pass "local" to run Spark in the process.
# # Elastic distributed dataset # # one of the concepts that Spark repeatedly revolves around is elastic distributed dataset. It is a collection of elements with fault-tolerant mechanism and can be operated in parallel. There are two ways to create a RDDs. Parallelize a collection that already exists in your driver, or reference a dataset of an external storage system, such as a shared file system, HDFS, HBase, or any data source that can provide an Hadoop InputFormat.
# parallel collections # # parallel collections are created on the Collection that already exists in your driver by calling the parallelize method of JavaSparkContext. The elements of the collection will be copied to form a distributed dataset that can be manipulated in parallel. For example, here is how to create a parallel collection containing the numbers 1 to 5:
List data=Arrays.asList (1, 2, 3, 4); JavaRDD distData=sc.parallelize (data)
Once created, distributed datasets (distData) can operate in parallel. For example, we can add the elements in the list by calling distData.reduce ((amemb)-> astatb). We will describe it later in the operation of the distributed dataset.
Note: in this guide, we often use concise Java8 lamdba syntax to define java functions, but in older versions of Java, you can implement interfaces in the org.apache.spark.api.java.function package. We will describe passing functions to Spark in detail below.
Another important parameter of parallel sets is the number of data sets that are sliced (slices). Spark will run a task for each slice in the cluster. Typically, you need 2-4 slice for each CPU in the cluster. Typically, Spark will try to automatically set the number of slice based on your cluster. However, you can set it manually and pass it to parallelize as a second parameter (for example: sc.parallelize (data,10)).
# external datasets # # Spark can create distributed datasets through any storage source supported by Hadoop. Including your local file system, HDFS,Cassandra,HBase,Amazon S3 and so on. Spark supports text files (text file), SequenceFiles (serialization file), and any other Hadoop InputFormat (input format).
Text file can be created by using SparkContext's textFile method. This method takes the URI of a file (or a local path on the machine, or URI such as hdfs://,s3n://) and reads the file as a collection of lines. Here is an example of a call:
JavaRDD distFile=sc.textFile ("data.txt")
Once created, the distFile can be used for dataset manipulation. For example, we can add the lengths of all data rows by using map and reduce. For example: distFile.map (s-> s.length ()). Reduce ((aforme b)-> (aquib)).
Some considerations for Spark when reading files:
If you use a path on the local file system
All file-based input methods of Spark, including textFile, support running directories, and compressing file box wildcards. For example, you can eat textFile ("/ my/directory/"), textFile ("/ my/directory/.txt"), and textFile ("/ my/directory/.gz").
The textFile method can also accept an optional second parameter to control the number of slice in this file. By default, Spark creates a slice for each file (block defaults to 64MB in HDFS). But you can specify a higher slice value by passing a larger value. Note that your slice number cannot be less than the block number.
In addition to text files, Spark's Java API also supports other centralized data formats.
JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
For serialized files (SequenceFiles), use SparkContext's sequenceFile [K _ value V], where K and V are the types of key and value in the file. They must be subclasses of Hadoop's Writeable interface, such as IntWriteable and Text.
For other Hadoop input formats, you can use the JavaSparkContext.hadoopRDD method. It can accept any JobConf and input format class, key class and value class. Just set the input source as you would with Hadoop Job. You can also use JavaSparkContext.newHadoopRDD for InputFormats, based on "new" MapReduce API (org.apache.hadoop.mapreduce).
JavaRDD.saveAsObjectFile and JavaContext.objectFile support saving RDD in a simple format consisting of Java object serialization. Although this is not an efficient specialized format to Avro, it provides a simple way to store RDD.
# RDD operations # RDDs supports two types of operations: transformations, which creates a new dataset from an existing dataset. Action (actions), which returns a value to the driver after running the calculation on the dataset. For example, map is a transformation that passes each element of the dataset to a function and returns a new RDD to represent the result. Reduce, on the other hand, is an action that aggregates all the elements of some RDD by a number of rows and returns the final result to the driver (although there is also a parallel reduceByKey that returns a distributed dataset).
All transformations in Spark are lazy, that is, they don't calculate the results immediately. Instead, they simply remember the transformations applied to these underlying data sets, such as file. These transformations are actually performed only when an action occurs that requires a result to be returned to the driver. This design allows Spark to run more efficiently-for example, we can create a dataset through map, use it in reduce, and eventually return only the results of reduce to the driver, rather than the entire large new dataset.
By default, each converted RDD is recalculated when you run an action on it. However, you can also use the persist method (or cache) to persist a RDD into memory. In this case, the Spark will save the relevant elements in the cluster, and the next time you visit the RDD, it will be able to access it more quickly. It is also supported to persist datasets on disk or to replicate datasets between clusters.
# basic operations # to illustrate the basics of RDD, consider the following simple programs:
JavaDDD lines=sc.textFile ("data.txtt"); JavaRDD lineLengths=lines.map (s-> s.length ()); int totalLength=lineLengths.reduce ((Amenb)-> aquib)
The first line defines a basic RDD through an external file. This dataset is not loaded into memory and no action is performed on it. Lines is just a pointer to this file. The second line defines lineLengths as the result of the map transformation. In addition, lineLengths is not calculated immediately because of inertia. Finally, we run reduce, which is an action. At this point, Spark splits the calculation into different task and makes it run on separate machines, and each machine runs its own map part and local reducation, only returning its results to the driver.
If we want to reuse lineLengths in the future, we can add:
LineLengths.persist ()
Before reduce, this would cause the lineLengths to be saved in memory after the first calculation.
# the API that passes Functions to Spark#### Spark relies heavily on the transfer function to make its driver run on the cluster. In Java, a function has a class representation that implements the interface in the org.apache.spark.api.java.function package. There are two ways to create such a function:
Implement the Function interface in your own class, which can be an anonymous inner class, which names the class, and you want to pass an instance of it to Spark
In Java8, use lamdba expressions to succinctly define an implementation
For brevity, most of this guide uses the lamdba syntax, which is easy to use, and all APIs in long-form. For example, we can write the above code as follows:
JavaRDD lines = sc.textFile ("data.txt"); JavaRDD lineLengths = lines.map (new Function () {public Integer call (String s) {return s.length ();}}); int totalLength = lineLengths.reduce (new Function2 () {public Integer call (Integer a, Integer b) {return a + b;}}))
Or, if writing inline functions is clumsy:
Class GetLength implements Function {public Integer call (String s) {return s.length ();}} class Sum implements Function2 {public Integer call (Integer a, Integer b) {return a + b;}} JavaRDD lines = sc.textFile ("data.txt"); JavaRDD lineLengths = lines.map (new GetLength ()); int totalLength = lineLengths.reduce (new Sum ())
Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked final. Spark will ship copies of these variables to each worker node as it does for other languages
# Wroking with Key-Value Pairs works with key / value pairs # although most Spark operations work on RDDs containing various types of objects, some special operations can only use RDDs containing key-value pairs. One of the most common operations is distributed "shuffle" operations, such as grouping or aggregating elements through key.
In Java, key-value is represented by the scala Tuple2 class under the scala standard package. You can simply call new Tuple2 (aline b) to create a tuuple and access its fields through tuple._1 () and tuple._2 ().
The RDDs of a key-value pair is represented by JavaPairRDD. You can build JavaPairRDDs through JavaRDDs, using specified versions of map operations, such as mapToPair and flatMapToPair. JavaPair will have not only standard RDD functions, but also special key-value functions.
For example, the following code uses the reduceByKey operation on the key-value pair to calculate the number of times each line of text appears in a file and.
JavaRDD lines=sc.textFile ("data.txt"); JavaPairRDD pairs=lines.mapToPair (s-> new Tuple2 (sMagne1)) JavaPairRDD counts=pairs.reduceByKey ((Arecom b)-> aquib)
We can also use counts.sortByKey (), for example, to sort the key-value pair alphabetically. And finally call counts.collect () to return to the driver as an array of objects.
Note: when using custom objects as key for key-value pair operations, you must make sure that the custom equals () method is accompanied by a matching hashCode () method. For more information, refer to the provisions listed in the outline of the Object.hashCode () document.
# conversion # # the following table lists the common transformations supported by Spark. For more information, please refer to the RDD API documentation and the pair RDD function documentation.
# Action # the table below lists the common actions supported by Spark. For more information, please refer to the RDD API documentation and the pair RDD function documentation.
# RDD persistence # # one of the most important features of Spark is to persist (or cache) a dataset into memory between different operations. When you persist a RDD, each node stores its calculated shard results in memory and reuses them in other actions on the dataset (or derived dataset). This will make the subsequent action faster (by more than 109 times faster). Caching is a key tool for (Spark) iterative algorithms and fast interactive use.
You can use the persist () and cache () methods to mark a RDD that will be persisted. The first time he is calculated by an action, he will remain in the memory of this node. Spark's cache is fault tolerant-if any partition of RDD is lost, it will automatically recalculate it by using its conversion operation that was originally created.
In addition, each persistent RDD can use different storage levels. Allows you, for example, to persist the dataset to disk, persist the dataset to memory as serialized Java objects (saving space), copy across nodes, or store it off-heap in Tachyon. These levels are set by passing a StorageLevel object (Scala,Java,Python) to persist (). The cache () method is a shortcut that uses the default storage level, that is, StorageLevel.MEMORY_ONLY (stores deserialized objects to memory), and the full storage level is set to:
Spark also automatically persists some intermediate data in shuffle operations (for example, reduceByKey). Even when the user does not call the persist method. This is done to prevent the entire input from being recalculated due to a node failure during the shuffle operation. We still recommend that users call persist on the resulting RDD if they want to reuse it.
# Storage level selection #
# remove data # Spark automatically monitors cache usage on each node and deletes old data partitions using LRU. If you want to delete the yige RDD manually instead of waiting for it to be automatically removed from the cache, use the RDD.unpersist () method.
# # shared variables # # in general, when a function passed to a Spark operation (such as map or reduce) runs on a remote cluster node, it actually operates on independent copies of all variables used by the function. These variables are copied to each machine, and all updates to these variables on the remote machine are not passed back to the driver. It is generally inefficient to read and write variables between different tasks. However, Spark still provides two limited shared variables for two common usage patterns: broadcast variables and accumulators.
# # broadcast variable # broadcast variable allows programmers to cache a read-only variable on each machine instead of keeping a copy of each task. They can be used, for example, to give each node a large set of input data in an efficient way. Spark will try to use an efficient broadcast algorithm to allocate broadcast variables to reduce the cost of communication.
The broadcast variable is created from the variable v by calling the SparkContext.broadcast (v) method. The broadcast variable is a wrapper for v. Its value can be accessed by calling the value method. The following code shows these:
Broadcast broadcastVar = sc.broadcast (new int [] {1,2,3}); broadcastVar.value (); / / returns [1,2,3]
After the broadcast variable is created, it should be used instead of v in any function on the cluster so that v is no longer passed to these nodes. In addition, object v cannot be modified after being broadcast, which ensures that the value of the broadcast variable obtained by all nodes is the same (for example, this variable is later passed to a new node).
# # Accumulator # # Accumulator is a variable that can only be "added" through associated operations. Therefore, parallel computing can be supported efficiently. They can be used to implement counters (* for example, in MapReduce) and summation. Spark natively supports accumulators of numeric types. Developers can also add new support types themselves.
An accumulator can be created from an initial value v by calling the SparkContext.accumulator (v) method. Tasks running on the cluster can be added by using the add method or the + = operation (in Scala and Python). However, they cannot read this value. Only the driver can use the value method to read the value of the accumulator.
The following code shows how to use an accumulator to add all the elements in an array:
Accumulator accum = sc.accumulator (0); sc.parallelize (Arrays.asList (1,2,3,4)) .foreach (x-> accum.add (x)); / /... / 18:41:08 on 10-09-29 INFO SparkContext: Tasks finished in 0.317106 saccum.value (); / / returns 10
Although this code uses an accumulator with built-in support for Integer types. But developers can also create their own types by implementing AccumulatorParam. The AccumulatorParam interface has two methods: zero provides a "zero value" for your data type, and addInPlace adds the two values. For example, suppose we have a vector class to represent mathematical vectors, we can write:
Class VectorAccumulatorParam implements AccumulatorParam {public Vector zero (Vector initialValue) {return Vector.zeros (initialValue.size ());} public Vector addInPlace (Vector v1, Vector v2) {v1.addInPlace (v2); return v1;}} / / Then, create an Accumulator of this type:Accumulator vecAccum = sc.accumulator (new Vector (...), new VectorAccumulatorParam ())
In Java, Spark also supports a more generic Accumulable interface to accumulate data, and their result types are different from those of the added elements (for example, collecting the same elements to build a list).
Thank you for your reading, these are the contents of "what are the knowledge points of Spark programming?" after the study of this article, I believe you have a deeper understanding of what are the knowledge points of Spark programming, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.