Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Practice of loading and saving Spark SQL data

2025-03-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

One: detailed explanation of pre-knowledge:

The important thing of Spark SQL is the operation of save and load provided by DataFrame,DataFrame itself.

Load: you can create a DataFrame

Save: save the data in DataFrame to a file or specific format to indicate the type of file we want to read and the specific format to indicate the type of file we want to output.

Second: Spark SQL read and write data code practice:

Import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List Public class SparkSQLLoadSaveOps {public static void main (String [] args) {SparkConf conf = new SparkConf () .setMaster ("local") .setAppName ("SparkSQLLoadSaveOps"); JavaSparkContext sc = new JavaSparkContext (conf); SQLContext = new SQLContext (sc) / * read () is a DataFrameReader type, and load can read the data out * / DataFrame peopleDF = sqlContext.read (). Format ("json") .load ("E:\\ Spark\\ Sparkinstanll_package\\ Big_Data_Software\\ spark-1.6.0-bin-hadoop2.6\ examples\\ src\ main\\ resources\\ people.json") / * * operate on DataFrame directly * Json: it is a self-explanatory format. How to determine what format it is when reading Json? * by scanning the entire Json. Only after scanning will you know that the metadata * / / is append that specifies the output file through mode. Create a new file to append file peopleDF.select ("name") .write () .mode (SaveMode.Append) .save ("E:\\ personNames");}}

The source code of the reading process is analyzed as follows:

1. The read method returns DataFrameReader for reading data.

`DataFrameReader` that can be used to read data in as a `DataFrame`. * {{* sqlContext.read.parquet ("/ path/to/file.parquet") * sqlContext.read.schema (schema) .json ("/ path/to/file.json") *}} * * @ group genericdata * @ since 1.4.0 * / @ Experimental// creates a DataFrameReader instance and obtains the DataFrameReader reference def read: DataFrameReader = new DataFrameReader (this)

two。 Then call format in the DataFrameReader class to indicate the format of the read file.

/ * Specifies the input data source format. * * @ since 1.4.0 * / def format (source: String): DataFrameReader = {this.source = source this}

3. Change the incoming input into DataFrame through the path through the load method in DtaFrameReader.

/ * * Loads input in as a `DataFrame`, for data sources that require a path (e.g. Data backed by * a local or distributed file system). * * @ since 1.4.0 * / / TODO: Remove this one in Spark 2.0.def load (path: String): DataFrame = {option ("path", path) .load ()}

At this point, the reading of the data is complete, so let's operate on DataFrame.

The following is the write operation!

1. Call the select function in DataFrame to filter the columns

/ * Selects a set of columns. This is a variant of `select` that can only select * existing columns using column names (i.e. Cannot construct expressions). * * {{* / / The following two are equivalent: * df.select ("colA", "colB") * df.select ($"colA", $"colB") *}} * @ group dfops * @ since 1.3.0 * / @ scala.annotation.varargsdef select (col: String, cols: String*): DataFrame = select ((col +: cols) .map (Column (_): _ *)

two。 The result is then written to the external storage system through write.

/ * *:: Experimental:: * Interface for saving the content of the `DataFrame` out into external storage. * * @ group output * @ since 1.4.0 * / @ Experimentaldef write: DataFrameWriter = new DataFrameWriter (this)

3. Mode specifies how to append the file when keeping the file

/ * Specifies the behavior when data or table already exists. Options include:// Overwrite overrides *-`SaveMode.Overwrite`: overwrite the existing data.// creates a new file, and then appends *-`SaveMode.Append`: append the data. *-`SaveMode.Ignore`: ignore the operation (i.e. No-op). *-`SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @ since 1.4.0 * / def mode (saveMode: SaveMode): DataFrameWriter = {this.mode = saveMode this}

4. Finally, the save () method triggers action to output the file to the specified file.

/ * * Saves the content of the `DataFrame` at the specified path. * * @ since 1.4.0 * / def save (path: String): Unit = {this.extraOptions + = ("path"-> path) save ()}

Third: the whole flow chart of Spark SQL reading and writing is as follows:

Fourth: detailed explanation of the source code of some functions in the process:

DataFrameReader.Load ()

1. Load () returns a collection of data of type DataFrame, which is read from the default path.

/ * Returns the dataset stored at path as a DataFrame, * using the default data source configured by spark.sql.sources.default. * * @ group genericdata * @ deprecated As of 1.4.0, replaced by `read () .load (path) `. This will be removed in Spark 2.0. * / @ deprecated ("Use read.load (path). This will be removed in Spark 2.0.", "1.4.0") def load (path: String): DataFrame = {/ / the read at this time is DataFrameReader read.load (path)}

two。 Track the load source code and enter it, which is as follows:

The method in DataFrameReader. Load () passes the input into a DataFrame through the path.

/ * * Loads input in as a `DataFrame`, for data sources that require a path (e.g. Data backed by * a local or distributed file system). * * @ since 1.4.0 * / / TODO: Remove this one in Spark 2.0.def load (path: String): DataFrame = {option ("path", path) .load ()}

3. The source code for tracking load is as follows:

/ * * Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. External * key-value stores). * * @ since 1.4.0 * / def load (): DataFrame = {/ / parsing a pair of incoming Source val resolved = ResolvedDataSource (sqlContext, userSpecifiedSchema = userSpecifiedSchema, partitionColumns = Array.empty [String], provider = source, options = extraOptions.toMap) DataFrame (sqlContext, LogicalRelation (resolved.relation))}

DataFrameReader.format ()

1. Format: specify the file format, which gives you a great lesson: if it is a Json file format, you can keep it as Parquet and so on.

Spark SQL can specify the type of file to read when reading the file. For example, Json,Parquet.

/ * * Specifies the input data source format.Built-in options include "parquet", "json", etc. * * @ since 1.4.0 * / def format (source: String): DataFrameReader = {this.source = source / / FileType this}

DataFrame.write ()

1. Create a DataFrameWriter instance

/ * *:: Experimental:: * Interface for saving the content of the `DataFrame` out into external storage. * * @ group output * @ since 1.4.0 * / @ Experimentaldef write: DataFrameWriter = new DataFrameWriter (this)

two。 The source code for tracking DataFrameWriter is as follows:

Write data to the external storage system as DataFrame.

/ *:: Experimental:: * Interface used to write a `DataFrame` to external storage systems (e.g. File systems, * key-value stores, etc). Use `DataFrame`.`write` to access this. * * @ since 1.4.0 * / @ Experimentalfinal class DataFrameWriter private [sql] (df: DataFrame) {

DataFrameWriter.mode ()

1. Overwrite is overwritten, and all the previously written data are overwritten.

Append: it is appended. For normal files, it is appended in one file, but for files in parquet format, new files are created for append.

* Specifies the behavior when data or table already exists. Options include: *-`SaveMode.Overwrite`: overwrite the existing data. *-`SaveMode.Append`: append the data. *-`SaveMode.Ignore`: ignore the operation (i.e. No-op). / / default operation *-`SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @ since 1.4.0 * / def mode (saveMode: SaveMode): DataFrameWriter = {this.mode = saveMode this}

two。 Receive external parameters through pattern matching

/ * Specifies the behavior when data or table already exists. Options include: *-`overwrite`: overwrite the existing data. *-`append`: append the data. *-`roomre`: ignore the operation (i.e. No-op). *-`error`: default option, throw an exception at runtime. * * @ since 1.4.0 * / def mode (saveMode: String): DataFrameWriter = {this.mode = saveMode.toLowerCase match {case "overwrite" = > SaveMode.Overwrite case "append" = > SaveMode.Append case "ignore" = > SaveMode.Ignore case "error" | "default" = > SaveMode.ErrorIfExists case _ = > throw new IllegalArgumentException (s "Unknown save mode: $saveMode." + "Accepted modes are 'overwrite',' append', 'ignore',' error'.")} this}

1. Save saves the result to the incoming path.

/ * * Saves the content of the `DataFrame` at the specified path. * * @ since 1.4.0 * / def save (path: String): Unit = {this.extraOptions + = ("path"-> path) save ()}

two。 Trace the save method.

/ * * Saves the content of the `DataFrame` as the specified table. * * @ since 1.4.0 * / def save (): Unit = {ResolvedDataSource (df.sqlContext, source, partitioningColumns.map (_ .toArray) .getOrElse (Array.empty [string]), mode, extraOptions.toMap, df)}

3. Where source is the defaultDataSourceName of SQLConf

Private var source: String = df.sqlContext.conf.defaultDataSourceName

The default parameter for DEFAULT_DATA_SOURCE_NAME is parquet.

/ This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf ("spark.sql.sources.default", defaultValue = Some ("org.apache.spark.sql.parquet"), doc = "The default data source to use in input/output.")

Some functions in DataFrame.Scala are explained in detail:

1. The toDF function converts RDD to DataFrame.

* Returns the object itself. * @ group basic * @ since 1.3.0 * / / This is declared with parentheses to prevent the Scala compiler from treating// `rdd.toDF ("1") `as invoking this toDF and then apply on the returned DataFrame.def toDF (): DataFrame = this

2. Show () method: display the results

/ * * Displays the `DataFrame` in a tabular form. For example: * {{year month AVG ('Adj Close) MAX (' Adj Close) * 1980 12 0.503218 0.595103 * 1981 01 0.523289 0.570307 * 1982 02 0.436504 0.475256 * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 *} * @ param numRows Number of rows to show * @ param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right * * @ group action * @ since 1.5.0 * / / scalastyle:off printlndef show (numRows: Int, truncate: Boolean): Unit = println (showString (numRows, truncate)) / / scalastyle:on println

The source code for tracking showString is as follows: action is triggered in showString to collect data.

/ * Compose the string representing rows for output * @ param _ numRows Number of rows to show * @ param truncate Whether truncate long strings and align cells right * / private [sql] def showString (_ numRows: Int, truncate: Boolean = true): String = {val numRows = _ numRows.max (0) val sb = new StringBuilder val takeResult = take (numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take (numRows) val numCols = schema.fieldNames.length

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report