In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-09 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
In this article, we unveil various concepts within wordcount.py through in-depth analysis of Spark. Let's review the wordcount.py code again to answer the following questions
For Hello Word examples in most languages, there is the main () function, the main function of wordcount.py, or where is the main () that calls Spark
The reading of data and how to convert each RDD data
The working mechanism of map and flatMap and the difference
The role of reduceByKey
The code for WordCount.py is as follows:
From _ _ future__ import print_functionimport sysfrom operator import add# SparkSession: is a programming entry for Spark, replacing the original SQLContext and HiveContext, easy to call Dataset and DataFrame API# SparkSession can be used to create DataFrame, register DataFrame as a table, execute SQL on the table, cache tables and read parquet files. From pyspark.sql import SparkSessionif _ _ name__ = = "_ main__": # the simple parameters commonly used in Python are passed in if len (sys.argv)! = 2: print ("Usage: wordcount", file=sys.stderr) exit (- 1) # appName sets an application name for the Spark application, and the rename will be displayed on Spark Web UI. If SparkSession already exists, get the existing SparkSession, otherwise create a new one. Spark = SparkSession\ .builder\ .appName ("PythonWordCount")\ .getOrCreate () # reads the contents of the incoming file and writes it to a new RDD instance lines. This statement does a lot of work and is not suitable for beginners. It can be cut into two statements for understanding. # map is a conversion function that transforms each data item of the original RDD into a new element through the user-defined function f mapping in map. The data items in the original RDD are one-to-one corresponding to those in the new RDD. Lines = spark.read.text (sys.argv [1]) .rdd.map (lambda r: r [0]) # flatMap is similar to map, but each element input can be mapped to 0 or more outputs Finally, the result is flattened and the output counts = lines.flatMap (lambda x: x.split (''))\ .map (lambda x: (x, 1))\ .reduceByKey (add) # collect () returns all the elements of the dataset as an array in the driver. This is usually useful after returning a filter or other operation that is small enough to subset of data. Because collect aggregates the entire RDD on one machine, you usually need to estimate the size of the returned dataset to avoid overflow. Output = counts.collect () for (word, count) in output: print ("% s:% I"% (word, count)) spark.stop () Spark entry SparkSession
The concept of SparkSession is introduced into Spark2.0, which provides users with a unified entry point to use the functions of Spark. Here, you might as well compare with Http Session, where Spark is playing the role of Web service. Programs need to establish a Session when calling Spark functions. So it's easy to understand when you see getOrCreate (), indicating that you can create a new session or take advantage of an existing session as appropriate.
Spark = SparkSession\ .builder\ .appName ("PythonWordCount")\ .getOrCreate ()
Since Spark is thought of as a Web server, which means that multiple accesses may be used, it is a good idea to name the application with an appropriate name for ease of monitoring and management. Web UI is not the focus of this article. Interested students can refer to Spark Application's Web Console.
Load data
After the SparkSession is established, the data is read in and written to the Dateset.
Lines = spark.read.text (sys.argv [1]) .rdd.map (lambda r: r [0])
In order to better decompose the execution process, it's time to use PySpark, the API of python calling Spark, which can start an interactive Python Shell. To facilitate script debugging, temporarily switch to Linux execution
# pysparkPython 2.7.6 (default, Jun 22 2015, 17:58:13) [GCC 4.8.2] on linux2Type "help", "copyright", "credits" or "license" for more information.Using Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN" .to adjust logging level use sc.setLogLevel (newLevel). For SparkR, use setLogLevel (newLevel). 08:30:26 on 17-02-23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... Using builtin-java classes where applicable17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. Hive.metastore.schema.verification is not enabled so recording the schema version 1.2.017/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp Returning NoSuchObjectExceptionWelcome to _ / _ / / _\ / _ _ `/ _ _ / _ _ /. _ _ /\ _, _ /\ _\ version 2.1.0 / _ / Using Python version 2.7.6 (default) Jun 22 2015 17:58:13) SparkSession available as' spark'. > ds = spark.read.text ('/ home/spark2.1/spark/examples/src/main/python/a.txt') > type (ds) > print dsDataFrame [value: string] > lines = ds.rdd
The advantage of interactive Shell is that it is easy to view the contents and types of variables. The file a.txt, which is now loaded into lines, is an instance of the elastic distributed dataset of RDD (Resilient Distributed Datasets).
RDD operation
You can refer to the paper for the structure of RDD in memory. There are two important points to understand RDD:
First, RDD is a kind of shared memory which is read-only and can only be transformed from the existing RDD, and then all the data is loaded into memory to facilitate multiple reuse.
Second, the data of RDD is stored in the memory of different nodes in the cluster by default, which provides fault tolerance and can automatically recover from node failure. That is, if the RDD partition on a node is lost due to a node failure, the RDD will automatically recalculate the partition through its own data source.
To explore the data content within RDD, you can use the collect () function, which returns all the elements of the RDD dataset as an array.
> lines = ds.rdd > for i in lines.collect ():... Print i... Row (value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')
Lines stores Row object types, and we want to deal with String types, so we need to use map api to further convert RDD
> lines_map = lines.map (lambda x: X [0]) > for i in lines_map.collect ():. Print i... These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.
In order to count the frequency of each word, we need to count each word separately, so the first step is to extract the word from the above string with a space as a separator, and set a counter for each word. For example, if the number of These occurrences is 1, the expected data structure is ['There', 1]. But how do you convert a RDD containing a string into an element like ['There', 1] RDD?
> flat_map = lines_map.flatMap (lambda x: x.split ('') > rdd_map = flat_map.map (lambda x: [x, 1]) > for i in rdd_map.collect ():. Print i... [ubiquitous examples, 1] [ubiquitous examples, 1] [ubiquitous giveyards, 1] [ubiquitous quickening, 1]
The following figure briefly describes the conversion process between flatMap and map.
Transfrom.png
It's not hard to see that map api just initializes the counter to 1 for all the words that appear, and doesn't count the same words, and the next task is done by reduceByKey (). In rdd_map, all words are treated as a key, while value with the same key performs operator operations within reduceByKey, because statistics of the same key is a cumulative operation, so you can directly add operations.
> from operator import add > add_map = rdd_map.reduceByKey (add) > for i in add_map.collect ():. Print i... (ubiquitous, 1) (ubiquitous, 2) (upright quickening, 1) (upright quickening, 1) (ubiquitous, 2) (upright ordeal, 1) > > print rdd_map.count () 26 > > print add_map.count () 23
According to the content of a.txt, only the words of and the appear twice, as expected.
Summary
The above decomposition steps can help us understand the operation of RDD. It is important to note that RDD divides the operation into two categories: transformation and action. No matter how many transformation operations are performed, the RDD does not actually perform the operation, and the operation is triggered only when the action operation is performed. That is, all the above RDD is triggered by collect (), so if you put the above transformation into a concise statement, it will appear in the written form of the original wordcount.py.
Counts = lines.flatMap (lambda x: x.split (''))\ .map (lambda x: (x, 1))\ .reduceByKey (add)
The real action is done by collect ().
Output = counts.collect ()
So far, an in-depth analysis of wordcount.py has been completed, but some lower-level execution processes, such as DAG, stage, and Driver programs, have been deliberately ignored.
Author: or ran Zi
Link: https://www.jianshu.com/p/067907b23546
Source: brief Book
The copyright of the brief book belongs to the author. For any form of reprint, please contact the author for authorization and indicate the source.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.