Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to do spark python programming

2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article is about how to carry out spark python programming, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

Spark application structure

The Spark application can be divided into two parts: the driver part and the executor part initialize the SparkContext and the main program

A:driver part

The driver part mainly configures, initializes and shuts down the SparkContext. The purpose of initializing SparkContext is to build a running environment for Spark applications. In initializing SparkContext, you need to import some Spark classes and implicit conversions; after the executor part is finished, you need to close SparkContext.

B:executor part

The executor part of the Spark application is the processing of data, which is divided into three types:

Native data, including input data and output data

Generate Scala scalar data, such as count (return the number of elements in RDD), reduce, fold/aggregate; return several scalars, such as take (return the first few elements).

Generate Scala collection datasets, such as collect (pour all elements in RDD into the Scala collection type), lookup (find all values of the corresponding key).

Generate hadoop datasets, such as saveAsTextFile, saveAsSequenceFile

Scala collects datasets, such as Array (1 Magne2, 3, 4, 5), and Spark uses the parallelize method to convert it to RDD.

Hadoop dataset, Spark supports files stored on hadoop and other file systems supported by hadoop, such as local files, HBase, SequenceFile, and Hadoop input formats. For example, Spark uses the txtFile method to convert local files or HDFS files to RDD.

For entering native data, Spark currently provides two types:

For output data, Spark not only supports the above two kinds of data, but also supports scala scalars

The basic unit of parallel operations in RDD,Spark, which is detailed in RDD. RDD provides four operators:

Narrow dependence operator

Wide dependency operator, wide dependency will involve the shuffle class, which is used as the boundary to generate Stage when parsing the DAG diagram, as shown in the figure.

Input and output one-to-one operators, and the partition structure of the resulting RDD remains unchanged, mainly map and flatMap

Input and output one-to-one, but as a result, the partition structure of RDD has changed, such as union, coalesce

Operators that select some elements from the input, such as filter, distinct, subtract, sample.

Reassemble and reduce a single RDD based on key, such as groupByKey, reduceByKey

Join and reorganize the two RDD based on key, such as join and cogroup.

Input operator to convert native data into RDD, such as parallelize, txtFile, etc.

The transformation operator, the most important operator, is the object on which Spark generates the DAG diagram. The transformation operator does not execute immediately. After triggering the action operator, it is submitted to driver for processing, and the DAG diagram is generated-- > Stage-- > Task-- > Worker execution. According to the role of the transformation operator in the DAG diagram, it can be divided into two types:

Caching operator, for RDD to be used many times, buffering can speed up the running speed, and multiple backup caching can be used for important data.

Action operator to convert the operation result RDD into native data, such as count, reduce, collect, saveAsTextFile and so on.

Shared variables, when a function is passed to the patition operation within the Spark, a copy of the variable used by the function is copied and maintained on each node, and the nodes do not affect each other. However, in Spark Application, you may need to share some variables to provide Task or driver use. Spark provides two shared variables:

Broadcast variables, which can be cached to shared variables of each node, usually read-only, using:

> from pyspark.context import SparkContext > sc = SparkContext ('local',' test') > b = sc.broadcast ([1,2,3,4,5]) > b.value [1,2,3,4,5] > sc.parallelize ([0]) 0]) .flatMap (lambda x: b.value). Collect () [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]

The accumulator, which only supports the addition of variables, can realize the summation of counters and variables. The user can call SparkContext.accumulator (v) to create an accumulator with an initial value of v, while Task running on the cluster can use the "+ =" operation, but these tasks cannot be read; only the driver can get the value of the accumulator. How to use it:

Python programming

Experimental project

Sogou log data analysis

Source of experimental data: sogou compact version data download address

Data format description:

Access time\ t user ID\ t [query word]\ t ranking of the URL in the returned result\ t sequence number clicked by user\ t URL clicked by user

Among them, the user ID is automatically assigned according to the Cookie information when the user accesses the search engine using the browser, that is, different queries entered by the browser at the same time correspond to the same user ID.

The above data format is an official statement. In fact, there is not a\ t division between the ranking and the sequence number in the data set, but a space division.

The session of the user with the most queries in a session and the corresponding number of queries

Import sys from pyspark import SparkContext if _ _ name__ = = "_ main__": if len (sys.argv)! = 2: print > > sys.stderr "Usage: SogouC" exit (- 1) sc = SparkContext (appName= "SogouC") sgRDD = sc.textFile (sys.argv [1]) print sgRDD.filter (lambda line: len (line.split ('\ t')) = = 5) .map (lambda line: (line.split ('\ t') [1], 1)) .reduceByKey (lambda x, y: X + y) .map (lambda pair: (pair [1]) Pair [0]) .sortByKey (False) .map (lambda pair: (pair [1], pair [0])) .take (10) sc.stop ()

Any node in the virtual cluster runs the command:. / bin/spark-submit-- master spark://hadoop1:7077-- executor-memory 3G-- driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt

Operation results: [(upright 11579135515147154, 431), (upright 6383499980790535, 385), (upright 7822241147182134, 370), (upright 9007558064074, 335), (upright 1238595959371514434, 223), (upright 787615177142486, 214), (uplink 502949445189088, 210), (uplink 250132071983056, 208), (uplink 91658294324751543, 223)]

The above is how to carry out spark python programming, the editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report