Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark2.x from shallow to deep series 7 RDD python api detailed explanation one

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Before learning any technology of spark, please correctly understand spark, you can refer to: correct understanding of spark

The following describes the three creation methods of RDD, the basic transformation api of single type RDD, sampling Api and pipe operation.

1. Three ways to create RDD

Create a RDD from a stable file storage system, such as local fileSystem or hdfs, as follows:

"" method for creating RDD: 1: from a stable storage system, such as a hdfs file, or a local file system "" text_file_rdd = sc.textFile ("file:////Users/tangweiqun/spark-course/word.txt")print" text_file_rdd = {0} ".format (", ".join (text_file_rdd.collect ()

two。 You can create a new RDD from an existing RDD via transformation api. Here is the transformation api of map

"" 2: from an existing RDD, that is, RDD's transformation api "" map_rdd = text_file_rdd.map (lambda line: "{0}-{1}" .format (line, "test")) print "map_rdd = {0}" .format ("," .join (map_rdd.collect ()

3. To create a RDD from list data in memory, you can specify the number of partitions of the RDD, and if not, take all the cores numbers of all Executor

"" 3: from a list that already exists in memory, you can specify partitions. If not, the number of partitions is the number of cores of all executor. The following api specifies 2 partitions "" parallelize_rdd = sc.parallelize ([1,2,3,3,4], 2) print "parallelize_rdd = {0}" .format (parallelize_rdd.glom (). Collect ())

Note: for the third case, makeRDD api is also provided in scala. This api can specify the machine on which each partition of the RDD is created. The principle of this api is detailed in spark core RDD scala api.

2. Basic transformation api of single type RDD

First create a RDD based on the data in memory

Conf= SparkConf () .setAppName ("appName") .setMaster ("local") sc = SparkContext (conf=conf) parallelize_rdd = sc.parallelize ([1,2,3,3,4], 2)

Map operation, which means to apply our custom function interface to each element of parallelize_rdd. Add 1 to each element as follows:

Map_rdd = parallelize_rdd.map (lambda x: X + 1) "" result: [[2,3], [4,4,5]] "print" map_rdd = {0} ".format (map_rdd.glom () .collect ())

It should be noted that the map operation can return data of a different type than RDD, as follows, returning an object of type String:

Map_string_rdd = parallelize_rdd.map (lambda x: "{0}-{1}" .format (x, "test")) "" result: [['1murf testament,' 2lym testle'], ['3murf testament,' 3combe testament, '4combe testament] "print" map_string_rdd = {0} ".format (map_string_rdd.glom (). Collect ())

2. FlatMap operation. Apply our custom lambda function to each element of parallelize_rdd. The output of this function is a list of data, and flatMap will flatten the list of output data.

Flatmap_rdd = parallelize_rdd.flatMap (lambda x: range (x)) "" result: [[0,0,1], [0,1,2,0,1,2,3]] "print" flatmap_rdd = {0} ".format (flatmap_rdd.glom () .format ())

3. Filter operation, apply our custom filter function to each element of parallelize_rdd to filter out the elements we don't need, as follows, filter out the elements that are not equal to 1:

Filter_rdd = parallelize_rdd.filter (lambda x: X! = 1) "" result: [[2], [3,3,4]] "print" filter_rdd = {0} ".format (filter_rdd.glom () .collect ())

4. Glom operation to view the element data corresponding to each partition of parallelize_rdd

GlomRDD = parallelize_rdd.glom () "" result: [[1,2], [3Magne3,4]] indicates that parallelize_rdd has two partitions. The first partition has data 1 and 2, and the second partition has data 3Magne3 and 4 "print" glomRDD = {0} ".format (glomRDD.collect ()).

5. For mapPartitions operation, apply our custom function interface method to the data of each partition of parallelize_rdd. Suppose we need to add an initial value to each element, and the acquisition of this initial value is very time-consuming. At this time, using mapPartitions will have great advantages, as follows:

/ / this is an initial value acquisition method, which is a time-consuming method def get_init_number (source): print "get init number from {0}, may be take much time." .format (source) time.sleep (1) return 1def map_partition_func (iterator): "" each partition gets the initial value once, and integerJavaRDD has two partitions. Then the getInitNumber method will be called twice, so corresponding to the more time-consuming operations that need to be initialized, such as initializing the connection to the database, generally use mapPartitions to initialize each partition once. Instead of using map operation: param iterator:: return: "init_number = get_init_number (" map_partition_func ") yield map (lambda x: X + init_number, iterator) map_partition_rdd = parallelize_rdd.mapPartitions (map_partition_func)"result: [[2,3]], [4,4] 5]] "print" map_partition_rdd = {0} ".format (map_partition_rdd.glom () .collect ()) def map_func (x):" traverses each element to get the initial value If the integerJavaRDD contains five elements, then the getInitNumber method will be called four times, seriously affecting the time Not as good as mapPartitions: param x:: return: "init_number = get_init_number (" map_func ") return x + init_numbermap_rdd_init_number = parallelize_rdd.map (map_func)"result: [[2,3], [4,4,5]]" print "map_rdd_init_number = {0}" .format (map_rdd_init_number.glom () .collect ())

6. MapPartitionsWithIndex operation, apply our custom function interface method to the data of each partition of parallelize_rdd, and bring the partition information when applying the function interface method, that is, you know which partition data you are currently dealing with.

Def map_partition_with_index_func (partition_index, iterator): yield (partition_index, sum (iterator)) map_partition_with_index_rdd = parallelize_rdd.mapPartitionsWithIndex (map_partition_with_index_func) "" result: [[(0,3)], [(1,10)]] "print" map_partition_with_index_rdd = {0} ".format (map_partition_with_index_rdd.glom (). Collect ())

3. Sampling Api

First create a RDD based on the data in memory

Conf= SparkConf () .setAppName ("appName") .setMaster ("local") sc = SparkContext (conf=conf) parallelize_rdd = sc.parallelize ([1,2,3,3,4], 2)

Sample

The first parameter is withReplacement if withReplacement=true indicates that there is a sample to be put back. Poisson sampling algorithm is used to achieve sampling that means no return if withReplacement=false. Bernoulli sampling algorithm is used to achieve the second parameter: fraction, which indicates the probability that each element is taken as a sample, not a factor that represents the amount of data to be extracted, such as sampling from 100 data, fraction=0.2. It does not mean that 100 * 0.2 = 20 data needs to be extracted, but that the probability of 100 elements being taken as a sample is 0.2. The size of the sample is not fixed, but obeys the binomial distribution when withReplacement=true is fraction > = 0 when withReplacement=false is 0 < fraction < 1 the third parameter is: reed represents the seed that generates a random number That is, based on this reed, a random seed is generated for each partition of rdd "" sample_rdd = parallelize_rdd.sample (False, 0.5,100) "result: [[1], [3,4]]" print "sample_rdd = {0}" .format (sample_rdd.glom (). Collect ())

2. RandomSplit

RDD is randomly sampled and divided according to weights, and several weights are divided into several RDD// random samples using Bernoulli sampling algorithm. Here are two weights. It will be cut into two RDD "" split_rdds = parallelize_rdd.randomSplit ([0.2,0.8]) print len (split_rdds) "" [[], [3,4]] "" print "split_rdds [0] = {0}" .format (split_rdds [0] .glom (). Collect ()) "[[1,2]] [3]] "" print "split_rdds [1] = {0}" .format (split_rdds [1] .glom () .collect ())

3. TakeSample

"" / / Random sampling specified number of sample data / / the first parameter is withReplacement// if withReplacement=true indicates that there is a put-back sampling, use Poisson sampling algorithm to achieve / / if withReplacement=false means no return sampling, use Bernoulli sampling algorithm to achieve / / how much is the second parameter specified The number of samples returned "randomly sampled a specified number of sample data results: [1]" print parallelize_rdd.takeSample (False, 1)

4. Stratified sampling, sampling RDD of key-value type

"" create a RDD of type key value "pair_rdd = sc.parallelize ([(('awful, 1), (' bounded, 2), ('clothed, 3), (' bounded, 4), ('Avalanche, 5)]) sampleByKey_rdd = pair_rdd.sampleByKey (withReplacement=False, fractions= {' Avav 0.5, 'bounded pair_rdd 0.2})" result: [[(' Aban, 1), ('B') " 2), ('print, 4)]] "print" sampleByKey_rdd = {0} ".format (sampleByKey_rdd.glom () .format ())

The principle of sampling can be referred to: spark core RDD api. These principled things are not easy to express in words.

4. Pipe, which means that other scripts, such as python or shell scripts, are executed at a certain step in the RDD execution flow

Conf= SparkConf () .setAppName ("appName") .setMaster ("local") sc = SparkContext (conf=conf) parallelize_rdd = sc.parallelize (["test1", "test2", "test3", "test4", "test5"], 2) "/ / if it is in a real spark cluster Then echo.py is required to have / / under the same directory on every machine in the cluster. The second parameter is the environment variable "" pipe_rdd = parallelize_rdd.pipe ("python / Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py"). {"env": "env"}) "" result: slave1-test1-env slave1-test2-env slave1-test3-env slave1-test4-env slave1-test5-env "" print "pipe_rdd = {0}" .format (".join (pipe_rdd.collect ()

The echo.py is as follows:

Import sysimport os#input = "test" input = sys.stdinenv_keys = os.environ.keys () env = "if" env "inenv_keys: env = os.environ [" env "] for ele in input: output =" slave1- "+ ele.strip ('\ n') +"-"+ env print (output) input.close

For the principle of pipe, and how to implement it, see: spark core RDD api, which also clearly describes how to eliminate the work of manually copying scripts to each machine.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report