In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Before learning any technology of spark, please correctly understand spark, you can refer to: correct understanding of spark
The following describes the three creation methods of RDD, the basic transformation api of single type RDD, sampling Api and pipe operation.
1. Three ways to create RDD
Create a RDD from a stable file storage system, such as local fileSystem or hdfs, as follows:
/ / create JavaRDD textFileRDD = sc.textFile ("hdfs://master:9999/users/hadoop-twq/word.txt") from the hdfs file / / from the files in the local file system, pay attention to file: there must be at least three / /, or four, but not two / /. If the second parameter is specified, it indicates the minimum number of partitions of the RDD created. If the number of file chunks is greater than the specified number of partitions / /, the number of partitions of the files will be quasi-JavaRDD textFileRDD = sc.textFile ("hdfs://master:9999/users/hadoop-twq/word.txt" 2).
two。 You can create a new RDD from an existing RDD via transformation api. Here is the transformation api of map
JavaRDD mapRDD = textFileRDD.map (new Function () {@ Override public String call (String s) throws Exception {return s + "test";}}); System.out.println ("mapRDD =" + mapRDD.collect ())
3. To create a RDD from list data in memory, you can specify the number of partitions of the RDD, and if not, take all the cores numbers of all Executor
/ / create a single type JavaRDDJavaRDD integerJavaRDD = sc.parallelize (Arrays.asList (1,2,3,3,4), 2); System.out.println ("integerJavaRDD =" + integerJavaRDD.glom (). Collect ()); / / create a single type JavaRDDJavaDoubleRDD doubleJavaDoubleRDD = sc.parallelizeDoubles (Arrays.asList (2.0,3.3,5.6)) with type Double; System.out.println ("doubleJavaDoubleRDD =" + doubleJavaDoubleRDD.collect ()); / / create a RDDimport scala.Tuple2 of type key-value JavaPairRDD javaPairRDD = sc.parallelizePairs (Arrays.asList (new Tuple2 ("test", 3), new Tuple2 ("kkk", 3)); System.out.println ("javaPairRDD =" + javaPairRDD.collect ())
Note: for the third case, makeRDD api is also provided in scala. This api can specify the machine on which each partition of the RDD is created. The principle of this api is detailed in spark core RDD scala api.
2. Basic transformation api of single type RDD
First create a RDD based on the data in memory
JavaRDD integerJavaRDD = sc.parallelize (Arrays.asList (1,2,3,3), 2)
Map operation, which means to apply our custom function interface to each element of integerJavaRDD. Add 1 to each element as follows:
JavaRDD mapRDD = integerJavaRDD.map (new Function () {@ Override public Integer call (Integer element) throws Exception {return element + 1;}}); / / result: [2,3,4,4] System.out.println ("mapRDD =" + mapRDD.collect ())
It is important to note that the map operation can return data of a different type than RDD, as follows, to return a custom User object:
Public class User implements Serializable {private String userId; private Integer amount; public User (String userId, Integer amount) {this.userId = userId; this.amount = amount;} / / getter setter.... @ Override public String toString () {return "User {" + "userId='" + userId +'\'+ ", amount=" + amount +'}';}} JavaRDD userJavaRDD = integerJavaRDD.map (new Function () {@ Override public User call (Integer element) throws Exception {if (element < 3) {return new User ("less than 3", 22) } else {return new User ("greater than 3", 23);}); / / result: [User {userId=' < 3, amount=22}, User {userId=' < 3, amount=22}, User {userId=' > 3, amount=23}, User {userId=' > 3, amount=23}] System.out.println ("userJavaRDD =" + userJavaRDD.collect ())
2. FlatMap operation. Apply our custom FlatMapFunction to each element of integerJavaRDD. The output of this function is a list of data, and flatMap will flatten the list of output data.
JavaRDD flatMapJavaRDD = integerJavaRDD.flatMap (new FlatMapFunction () {@ Override public Iterator call (Integer element) throws Exception {/ / output a list whose elements are 0 to element List list = new ArrayList (); int I = 0 While (I = 0 withReplacement=false / when withReplacement=false 0 < fraction < 1) / the third parameter is: reed represents the seed that generates a random number, that is, a random seed JavaRDD sampleRDD = listRDD.sample (false, 0.5,100) is generated according to this reed for each partition of rdd; / / result: [1,3] System.out.println ("sampleRDD =" + sampleRDD.collect ())
2. RandomSplit
/ RDD is randomly sampled and divided according to weight, and several weights are divided into several RDD// random sampling using Bernoulli sampling algorithm. If there are two weights below, it will be cut into two RDDJavaRDD [] splitRDDs = listRDD.randomSplit (new double [] {0.4,0.6}); / / the result is 2System.out.println ("splitRDDs.length =" + splitRDDs.length) / / the result is [2,3] the result is indefinite System.out.println ("splitRDD (0) =" + splitRDDs [0] .duration ()); / / the result is [1,3] the result is indefinite System.out.println ("splitRDD (1) =" + splitRDDs [1] .indefinite ())
3. TakeSample
/ / Random sampling specified number of sample data / / the first parameter is withReplacement// if withReplacement=true indicates a returned sample, use Poisson sampling algorithm to achieve / / if withReplacement=false means no return sampling, use Bernoulli sampling algorithm to achieve / / the number of samples specified by the second parameter, the number of samples returned is [2,3] System.out.println (listRDD.takeSample (false, 2)).
4. Stratified sampling, sampling RDD of key-value type
/ / create a RDDimport scala.Tuple2;JavaPairRDD javaPairRDD = sc.parallelizePairs of type key value (Arrays.asList (new Tuple2 ("test", 3), new Tuple2 ("kkk", 3), new Tuple2 ("kkk", 3)); / / define the sampling factor Map fractions = new HashMap () for each key; fractions.put ("test", 0.5); fractions.put ("kkk", 0.4) / / sampling each key / / the result is [(test,3), (kkk,3)] / / sampleByKey does not filter full data, so it only gets the approximate value System.out.println (javaPairRDD.sampleByKey (true, fractions). Collect ()); / / the result is [(test,3), (kkk,3)] / / sampleByKeyExtra will sample the full data, so it consumes a lot of computing resources, but the result will be more accurate. System.out.println (javaPairRDD.sampleByKeyExact (true, fractions) .collect ())
The principle of sampling can be referred to: spark core RDD api. These principled things are not easy to express in words.
4. Pipe, which means that other scripts, such as python or shell scripts, are executed at a certain step in the RDD execution flow
JavaRDD dataRDD = sc.parallelize (Arrays.asList ("hi", "hello", "how", "are", "you"), 2); / / Map env = new HashMap () required to start echo.py; env.put ("env", "envtest"); List commands = new ArrayList (); commands.add ("python") / / if it is in a real spark cluster, then echo.py is required to have commands.add ("/ Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py") in the same directory of each machine in the cluster; JavaRDD result = dataRDD.pipe (commands, env) / / result: [slave1-hi-envtest, slave1-hello-envtest, slave1-how-envtest, slave1-are-envtest, slave1-you-envtest] System.out.println (result.collect ())
The echo.py is as follows:
Import sysimport os#input = "test" input = sys.stdinenv_keys = os.environ.keys () env = "if" env "inenv_keys: env = os.environ [" env "] for ele in input: output =" slave1- "+ ele.strip ('\ n') +"-"+ env print (output) input.close
For the principle of pipe, and how to implement it, see: spark core RDD api, which also clearly describes how to eliminate the work of manually copying scripts to each machine.
Systematic learning spark:
1. [Lao Tang] Spark 2.x Spark Core: https://edu.51cto.com/sd/88429
2. [Lao Tang] the fine lecture of Spark 2.x Spark SQL topic: https://edu.51cto.com/sd/16f3d
3. [Lao Tang] Scala Internal training Series Special Topics: https://edu.51cto.com/sd/8e85b
4. [Lao Tang] Spark 2.x Spark Streamig: https://edu.51cto.com/sd/8c525
5. [soup] Spark 2.x intensive package: https://edu.51cto.com/sd/ff9a4
6. From Scala to Spark 2.x topic: https://edu.51cto.com/sd/d72af
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.