In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article is about how to carry out spark python programming, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
Spark application structure
The Spark application can be divided into two parts: the driver part and the executor part initialize the SparkContext and the main program
A:driver part
The driver part mainly configures, initializes and shuts down the SparkContext. The purpose of initializing SparkContext is to build a running environment for Spark applications. In initializing SparkContext, you need to import some Spark classes and implicit conversions; after the executor part is finished, you need to close SparkContext.
B:executor part
The executor part of the Spark application is the processing of data, which is divided into three types:
Native data, including input data and output data
Generate Scala scalar data, such as count (return the number of elements in RDD), reduce, fold/aggregate; return several scalars, such as take (return the first few elements).
Generate Scala collection datasets, such as collect (pour all elements in RDD into the Scala collection type), lookup (find all values of the corresponding key).
Generate hadoop datasets, such as saveAsTextFile, saveAsSequenceFile
Scala collects datasets, such as Array (1 Magne2, 3, 4, 5), and Spark uses the parallelize method to convert it to RDD.
Hadoop dataset, Spark supports files stored on hadoop and other file systems supported by hadoop, such as local files, HBase, SequenceFile, and Hadoop input formats. For example, Spark uses the txtFile method to convert local files or HDFS files to RDD.
For entering native data, Spark currently provides two types:
For output data, Spark not only supports the above two kinds of data, but also supports scala scalars
The basic unit of parallel operations in RDD,Spark, which is detailed in RDD. RDD provides four operators:
Narrow dependence operator
Wide dependency operator, wide dependency will involve the shuffle class, which is used as the boundary to generate Stage when parsing the DAG diagram, as shown in the figure.
Input and output one-to-one operators, and the partition structure of the resulting RDD remains unchanged, mainly map and flatMap
Input and output one-to-one, but as a result, the partition structure of RDD has changed, such as union, coalesce
Operators that select some elements from the input, such as filter, distinct, subtract, sample.
Reassemble and reduce a single RDD based on key, such as groupByKey, reduceByKey
Join and reorganize the two RDD based on key, such as join and cogroup.
Input operator to convert native data into RDD, such as parallelize, txtFile, etc.
The transformation operator, the most important operator, is the object on which Spark generates the DAG diagram. The transformation operator does not execute immediately. After triggering the action operator, it is submitted to driver for processing, and the DAG diagram is generated-- > Stage-- > Task-- > Worker execution. According to the role of the transformation operator in the DAG diagram, it can be divided into two types:
Caching operator, for RDD to be used many times, buffering can speed up the running speed, and multiple backup caching can be used for important data.
Action operator to convert the operation result RDD into native data, such as count, reduce, collect, saveAsTextFile and so on.
Shared variables, when a function is passed to the patition operation within the Spark, a copy of the variable used by the function is copied and maintained on each node, and the nodes do not affect each other. However, in Spark Application, you may need to share some variables to provide Task or driver use. Spark provides two shared variables:
Broadcast variables, which can be cached to shared variables of each node, usually read-only, using:
> from pyspark.context import SparkContext > sc = SparkContext ('local',' test') > b = sc.broadcast ([1,2,3,4,5]) > b.value [1,2,3,4,5] > sc.parallelize ([0]) 0]) .flatMap (lambda x: b.value). Collect () [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
The accumulator, which only supports the addition of variables, can realize the summation of counters and variables. The user can call SparkContext.accumulator (v) to create an accumulator with an initial value of v, while Task running on the cluster can use the "+ =" operation, but these tasks cannot be read; only the driver can get the value of the accumulator. How to use it:
Python programming
Experimental project
Sogou log data analysis
Source of experimental data: sogou compact version data download address
Data format description:
Access time\ t user ID\ t [query word]\ t ranking of the URL in the returned result\ t sequence number clicked by user\ t URL clicked by user
Among them, the user ID is automatically assigned according to the Cookie information when the user accesses the search engine using the browser, that is, different queries entered by the browser at the same time correspond to the same user ID.
The above data format is an official statement. In fact, there is not a\ t division between the ranking and the sequence number in the data set, but a space division.
The session of the user with the most queries in a session and the corresponding number of queries
Import sys from pyspark import SparkContext if _ _ name__ = = "_ main__": if len (sys.argv)! = 2: print > > sys.stderr "Usage: SogouC" exit (- 1) sc = SparkContext (appName= "SogouC") sgRDD = sc.textFile (sys.argv [1]) print sgRDD.filter (lambda line: len (line.split ('\ t')) = = 5) .map (lambda line: (line.split ('\ t') [1], 1)) .reduceByKey (lambda x, y: X + y) .map (lambda pair: (pair [1]) Pair [0]) .sortByKey (False) .map (lambda pair: (pair [1], pair [0])) .take (10) sc.stop ()
Any node in the virtual cluster runs the command:. / bin/spark-submit-- master spark://hadoop1:7077-- executor-memory 3G-- driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt
Operation results: [(upright 11579135515147154, 431), (upright 6383499980790535, 385), (upright 7822241147182134, 370), (upright 9007558064074, 335), (upright 1238595959371514434, 223), (upright 787615177142486, 214), (uplink 502949445189088, 210), (uplink 250132071983056, 208), (uplink 91658294324751543, 223)]
The above is how to carry out spark python programming, the editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.