In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
Today, I will talk to you about the case analysis of loading and saving Spark SQL data, which may not be well understood by many people. in order to make you understand better, the editor has summarized the following for you. I hope you can get something according to this article.
First, pre-knowledge detailed explanation Spark SQL important is to operate DataFrame,DataFrame itself provides save and load operation, Load: can create DataFrame, Save: save the data in DataFrame to a file or with a specific format to indicate the type of file we want to read and the specific format to indicate what type of file we want to output.
Second, Spark SQL read and write data code practice
Import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List Public class SparkSQLLoadSaveOps {public static void main (String [] args) {SparkConf conf = new SparkConf () .setMaster ("local") .setAppName ("SparkSQLLoadSaveOps"); JavaSparkContext sc = new JavaSparkContext (conf); SQLContext = new SQLContext (sc) / * read () is a DataFrameReader type, and load can read the data out * / DataFrame peopleDF = sqlContext.read (). Format ("json") .load ("E:\\ Spark\\ Sparkinstanll_package\\ Big_Data_Software\\ spark-1.6.0-bin-hadoop2.6\ examples\\ src\ main\\ resources\\ people.json") / * * operate on DataFrame directly * Json: it is a self-explanatory format. How to determine what format it is when reading Json? * by scanning the entire Json. Only after scanning will you know that the metadata * / / is append that specifies the output file through mode. Create a new file to append file peopleDF.select ("name") .write () .mode (SaveMode.Append) .save ("E:\\ personNames");}}
The source code of the reading process is analyzed as follows: 1. The read method returns DataFrameReader, which is used to read data.
/ * *: Experimental:: * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]]. * {{* sqlContext.read.parquet ("/ path/to/file.parquet") * sqlContext.read.schema (schema) .json ("/ path/to/file.json") *}} * * @ group genericdata * @ since 1.4.0 * / @ Experimental// create a DataFrameReader instance and obtain the DataFrameReader reference def read: DataFrameReader = new DataFrameReader (this)
two。 Then call format in the DataFrameReader class to indicate the format of the read file.
/ * Specifies the input data source format. * * @ since 1.4.0 * / def format (source: String): DataFrameReader = {this.source = source this}
3. Change the incoming input into DataFrame through the path through the load method in DtaFrameReader.
/ * * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. Data backed by * a local or distributed file system). * * @ since 1.4.0 * / / TODO: Remove this one in Spark 2.0.def load (path: String): DataFrame = {option ("path", path) .load ()}
At this point, the reading of the data is complete, so let's operate on DataFrame. The following is the write operation!
1. Call the select function in DataFrame to filter the columns
/ * Selects a set of columns. This is a variant of `select` that can only select * existing columns using column names (i.e. Cannot construct expressions). * * {{* / / The following two are equivalent: * df.select ("colA", "colB") * df.select ($"colA", $"colB") *}} * @ group dfops * @ since 1.3.0 * / @ scala.annotation.varargsdef select (col: String, cols: String*): DataFrame = select ((col +: cols) .map (Column (_): _ *)
two。 The result is then written to the external storage system through write.
/ * *: Experimental:: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @ group output * @ since 1.4.0 * / @ Experimentaldef write: DataFrameWriter = new DataFrameWriter (this)
3. Mode specifies how to append the file when keeping the file
/ * Specifies the behavior when data or table already exists. Options include:// Overwrite overrides *-`SaveMode.Overwrite`: overwrite the existing data.// creates a new file, and then appends *-`SaveMode.Append`: append the data. *-`SaveMode.Ignore`: ignore the operation (i.e. No-op). *-`SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @ since 1.4.0 * / def mode (saveMode: SaveMode): DataFrameWriter = {this.mode = saveMode this}
4. Finally, the save () method triggers action to output the file to the specified file.
/ * * Saves the content of the [[DataFrame]] at the specified path. * * @ since 1.4.0 * / def save (path: String): Unit = {this.extraOptions + = ("path"-> path) save ()}
3. The whole flow chart of Spark SQL reading and writing is as follows
Fourth, a detailed explanation of the source code of some functions in the process
DataFrameReader.Load ()
1. Load () returns a collection of data of type DataFrame, which is read from the default path.
/ * Returns the dataset stored at path as a DataFrame, * using the default data source configured by spark.sql.sources.default. * * @ group genericdata * @ deprecated As of 1.4.0, replaced by `read () .load (path) `. This will be removed in Spark 2.0. * / @ deprecated ("Use read.load (path). This will be removed in Spark 2.0.", "1.4.0") def load (path: String): DataFrame = {/ / the read at this time is DataFrameReader read.load (path)}
two。 Trace the load source code to enter, the source code is as follows: the method in DataFrameReader. Load () passes the input into a DataFrame through the path.
/ * * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. Data backed by * a local or distributed file system). * * @ since 1.4.0 * / / TODO: Remove this one in Spark 2.0.def load (path: String): DataFrame = {option ("path", path) .load ()}
3. The source code for tracking load is as follows:
/ * * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. External * key-value stores). * * @ since 1.4.0 * / def load (): DataFrame = {/ / parsing a pair of incoming Source val resolved = ResolvedDataSource (sqlContext, userSpecifiedSchema = userSpecifiedSchema, partitionColumns = Array.empty [String], provider = source, options = extraOptions.toMap) DataFrame (sqlContext, LogicalRelation (resolved.relation))}
DataFrameReader.format ()
1. Format: specify the file format, which gives you a great lesson: if it is a Json file format, you can keep it as Parquet and so on. Spark SQL can specify the type of file to read when reading the file. For example, Json,Parquet.
/ * * Specifies the input data source format.Built-in options include "parquet", "json", etc. * * @ since 1.4.0 * / def format (source: String): DataFrameReader = {this.source = source / / FileType this}
DataFrame.write ()
1. Create a DataFrameWriter instance
/ * *: Experimental:: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @ group output * @ since 1.4.0 * / @ Experimentaldef write: DataFrameWriter = new DataFrameWriter (this) 1
two。 The tracking DataFrameWriter source code is as follows: write data to the external storage system in the way of DataFrame.
/ * *:: Experimental:: * Interface used to write a [[DataFrame]] to external storage systems (e.g. File systems, * key-value stores, etc). Use [[DataFrame.write]] to access this. * * @ since 1.4.0 * / @ Experimentalfinal class DataFrameWriter private [sql] (df: DataFrame) {
DataFrameWriter.mode ()
1. Overwrite is overwritten, and all the previously written data are overwritten. Append: it is appended. For normal files, it is appended in one file, but for files in parquet format, new files are created for append.
/ * Specifies the behavior when data or table already exists. Options include: *-`SaveMode.Overwrite`: overwrite the existing data. *-`SaveMode.Append`: append the data. *-`SaveMode.Ignore`: ignore the operation (i.e. No-op). / / default operation *-`SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @ since 1.4.0 * / def mode (saveMode: SaveMode): DataFrameWriter = {this.mode = saveMode this}
two。 Receive external parameters through pattern matching
/ * Specifies the behavior when data or table already exists. Options include: *-`overwrite`: overwrite the existing data. *-`append`: append the data. *-`roomre`: ignore the operation (i.e. No-op). *-`error`: default option, throw an exception at runtime. * * @ since 1.4.0 * / def mode (saveMode: String): DataFrameWriter = {this.mode = saveMode.toLowerCase match {case "overwrite" = > SaveMode.Overwrite case "append" = > SaveMode.Append case "ignore" = > SaveMode.Ignore case "error" | "default" = > SaveMode.ErrorIfExists case _ = > throw new IllegalArgumentException (s "Unknown save mode: $saveMode." + "Accepted modes are 'overwrite',' append', 'ignore',' error'.")} this}
DataFrameWriter.save ()
1. Save saves the result to the incoming path.
/ * * Saves the content of the [[DataFrame]] at the specified path. * * @ since 1.4.0 * / def save (path: String): Unit = {this.extraOptions + = ("path"-> path) save ()}
two。 Trace the save method.
/ * * Saves the content of the [[DataFrame]] as the specified table. * * @ since 1.4.0 * / def save (): Unit = {ResolvedDataSource (df.sqlContext, source, partitioningColumns.map (_ .toArray) .getOrElse (Array.empty [string]), mode, extraOptions.toMap, df)}
3. Where source is the defaultDataSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName of SQLConf and the default parameter of DEFAULT_DATA_SOURCE_NAME is parquet.
/ This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf ("spark.sql.sources.default", defaultValue = Some ("org.apache.spark.sql.parquet"), doc = "The default data source to use in input/output.")
Some functions in DataFrame.scala are explained in detail:
1. The toDF function converts RDD to DataFrame.
/ * Returns the object itself. * @ group basic * @ since 1.3.0 * / / This is declared with parentheses to prevent the Scala compiler from treating// `rdd.toDF ("1") `as invoking this toDF and then apply on the returned DataFrame.def toDF (): DataFrame = this
2. Show () method: display the results
/ * * Displays the [[DataFrame]] in a tabular form. For example: * {{year month AVG ('Adj Close) MAX (' Adj Close) * 1980 12 0.503218 0.595103 * 1981 01 0.523289 0.570307 * 1982 02 0.436504 0.475256 * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 *} * @ param numRows Number of rows to show * @ param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right * * @ group action * @ since 1.5.0 * / / scalastyle:off printlndef show (numRows: Int, truncate: Boolean): Unit = println (showString (numRows, truncate)) / / scalastyle:on println
The source code for tracking showString is as follows: action is triggered in showString to collect data.
/ * Compose the string representing rows for output * @ param _ numRows Number of rows to show * @ param truncate Whether truncate long strings and align cells right * / private [sql] def showString (_ numRows: Int, truncate: Boolean = true): String = {val numRows = _ numRows.max (0) val sb = new StringBuilder val takeResult = take (numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take (numRows) val numCols = schema.fieldNames.length read the above Do you have any further understanding of the case analysis of loading and saving Spark SQL data? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.