In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
Editor to share with you how to create pyspark DataFrame, I believe that most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!
Pyspark creates DataFrame
For ease of operation, when using pyspark, we usually convert the data into the form of DataFrame to complete the cleaning and analysis actions.
RDD and DataFrame
In the previous pyspark basic operations, it was mentioned that RDD is also a distributed data object for operations in spark.
Here's a brief look at the types of RDD and DataFrame.
Print (type (rdd)) # print (type (df)) #
Looking through the definition of the source code, we can see that there is no inheritance relationship between them.
Class RDD (object): "A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel."class DataFrame (object):" A distributed collection of data grouped into named columns. A: class: `DataFrame` is equivalent to a relational table in Spark SQL, and can be created using various functions in: class: `SparkSession`::... "
RDD is a resilient distributed dataset, a basic abstraction in Spark. Represents an immutable collection of partitioned storage that can be operated in parallel.
DataFrame is a distributed collection of data grouped by columns, and DataFrame is equivalent to relational tables in Spark SQL. In common, they are all designed to support distributed computing.
But RDD is just a collection of elements, but DataFrame is grouped by columns, similar to MySQL's table or DataFrame in pandas.
In practice, we use DataFrame more often.
Create a DataFrame using two tuples
Try the first case and find that only the binary is passed in, and the result is no column name.
So we try the second, passing in both the binary and column names.
A = [('Alice', 1)] output = spark.createDataFrame (a). Collect () print (output) # [Row (_ 1) print (output)] output = spark.createDataFrame (a, [' name', 'age']). Collect () print (output) # [Row (name='Alice', age=1)]
Here collect () presents the data table by row, or you can use show () to display the data table.
Spark.createDataFrame (a) .show () # +-+-- + # | _ 1 | _ 2 | # +-+-- + # | Alice | 1 | # +-+-- + spark.createDataFrame (a, ['name',' age']). Show () # +-using key-value pairs to create DataFrame.
D = [{'name':' Alice', 'age': 1}] output = spark.createDataFrame (d). Collect () print (output) # [Row (age=1, name='Alice')] use rdd to create DataFrame
A = [('Alice', 1)] rdd = sc.parallelize (a) output = spark.createDataFrame (rdd). Collect () print (output) output = spark.createDataFrame (rdd, ["name", "age"]). Collect () print (output) # [Row (_ 1 examples Aliceholders, _ 2y1)] # [Row (name='Alice', age=1)] creates DataFrame based on rdd and ROW
From pyspark.sql import Rowa = [('Alice', 1)] rdd = sc.parallelize (a) Person = Row ("name", "age") person = rdd.map (lambda r: Person (* r) output = spark.createDataFrame (person). Collect () print (output) # [Row (name='Alice', age=1)] creates DataFrame based on rdd and StructType
From pyspark.sql.types import * a = [('Alice', 1)] rdd = sc.parallelize (a) schema = StructType ([StructField ("name", StringType (), True), StructField ("age", IntegerType (), True)]) output = spark.createDataFrame (rdd, schema). Collect () print (output) # [Row (name='Alice', age=1)] creates pyspark DataFrame based on pandas DataFrame
Df.toPandas () converts pyspark DataFrame to pandas DataFrame.
Df = spark.createDataFrame (rdd, ['name',' age']) print (df) # DataFrame [name: string, age: bigint] print (type (df.toPandas () # # pass pandas DataFrameoutput = spark.createDataFrame (df.toPandas ()). Collect () print (output) # [Row (name='Alice', age=1)] to create an orderly DataFrame
Output = spark.range (1,7,2). Collect () print (output) # [Row (id=1), Row (id=3), Row (id=5)] output = spark.range (3). Collect () print (output) # [Row (id=0), Row (id=1), Row (id=2)]
Get DataFrame from temporary table
Spark.registerDataFrameAsTable (df, "table1") df2 = spark.table ("table1") b = df.collect () = df2.collect () print (b) # True configuration DataFrame and temporary tables
Specify the column type when creating the DataFrame
In createDataFrame, you can specify the column type, leaving only the columns that satisfy the data type, and if there are no columns that satisfy them, an error will be thrown.
A = [('Alice', 1)] rdd = sc.parallelize (a) # when the specified type corresponds to the expected data, normally create output = spark.createDataFrame (rdd, "a: string, b: int") .collect () print (output) # [Row (axiomatic Alicestration, bread1)] rdd = rdd.map (lambda row: row [1]) print (rdd) # PythonRDD [7] at RDD at PythonRDD.scala:53# only filter out other columns on the int type. Output = spark.createDataFrame (rdd, "int") .collect () print (output) # [Row (value=1)] # No column can match, an error will be thrown. Output = spark.createDataFrame (rdd, "boolean"). Collect () # TypeError: field value: BooleanType can not accept object 1 in type registers DataFrame as a temporary table
Spark.registerDataFrameAsTable (df, "table1") spark.dropTempTable ("table1") gets and modifies the configuration
Print (spark.getConf ("spark.sql.shuffle.partitions")) # 200print (spark.getConf ("spark.sql.shuffle.partitions", u "10")) # 10print (spark.setConf ("spark.sql.shuffle.partitions", u "50")) # Noneprint (spark.getConf ("spark.sql.shuffle.partitions", u "10")) # 50 register custom functions
Spark.registerFunction ("stringLengthString", lambda x: len (x)) output = spark.sql ("SELECT stringLengthString ('test')"). Collect () print (output) # [Row (stringLengthString (test) =' 4')] spark.registerFunction ("stringLengthString", lambda x: len (x), IntegerType () output = spark.sql ("SELECT stringLengthString ('test')"). Collect () print (output) # [Row (stringLengthString (test) = 4)] spark.udf.register ("stringLengthInt", lambda x: len (x) IntegerType () output = spark.sql ("SELECT stringLengthInt ('test')"). Collect () print (output) # [Row (stringLengthInt (test) = 4)] View the list of temporary tables
You can view all temporary table names and objects.
Spark.registerDataFrameAsTable (df, "table1") print (spark.tableNames ()) # ['table1'] print (spark.tables ()) # DataFrame [database: string, tableName: string, isTemporary: boolean] print ("table1" in spark.tableNames ()) # Trueprint ("table1" in spark.tableNames ("default")) # Truespark.registerDataFrameAsTable (df, "table1") df2 = spark.tables () df2.filter ("tableName =' table1'"). First () print (df2) # DataFrame [database: string, string: tableName IsTemporary: boolean] create a DataFrame from other data sources
MySQL
Only if you need to download the jar package.
Mysql-connector-java.jar
From pyspark import SparkContextfrom pyspark.sql import SQLContextimport pyspark.sql.functions as Fsc = SparkContext ("local", appName= "mysqltest") sqlContext = SQLContext (sc) df = sqlContext.read.format ("jdbc"). Options (url= "jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"useLegacyDatetimeCode=false&serverTimezone=UTC" Dbtable= "detail_data"). Load () df.show (nasty 5) sc.stop () these are all the contents of the article "how pyspark creates DataFrame" Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.