Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

4 common data sources in Spark SQL (detailed)

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The generic load/write method specifies options manually

The DataFrame interface of Spark SQL supports the operation of multiple data sources. A DataFrame can operate in RDDs mode, or it can be registered as a temporary table. After registering the DataFrame as a temporary table, you can perform an SQL query on the DataFrame.

The default data source for Spark SQL is in Parquet format. When the data source is a Parquet file, Spark SQL can easily perform all operations.

Modify the configuration item spark.sql.sources.default to modify the default data source format.

Scala > val df = spark.read.load ("hdfs://hadoop001:9000/namesAndAges.parquet") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala > df.select ("name"). Write.save ("names.parquet")

When the data source format is not an parquet format file, you need to manually specify the format of the data source. The data source format needs to specify the full name (for example: org.apache.spark.sql.parquet). If the data source format is built-in format, you only need to specify the abbreviation json, parquet, jdbc, orc, libsvm, csv, text to specify the data format.

You can use the read.load method provided by SparkSession to load data in general, and use write and save to save the data.

Scala > val peopleDF = spark.read.format ("json"). Load ("hdfs://hadoop001:9000/people.json") peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala > peopleDF.write.format ("parquet"). Save ("hdfs://hadoop001:9000/namesAndAges.parquet") scala >

In addition, you can run SQL directly on the file:

Val sqlDF = spark.sql ("SELECT * FROM parquet.`hdfs: / / hadoop001:9000/ namesAndAges.parquet`") sqlDF.show () file save option

SaveMode can be used to perform storage operations, and SaveMode defines the processing mode for the data. It is important to note that these save modes do not use any locking, not atomic operations. In addition, when executed in Overwrite mode, the original data is deleted before the new data is output. SaveMode describes the following table in detail:

Scala/JavaAny LanguageMeaningSaveMode.ErrorIfExists (default) "error" (default) if the file exists, error SaveMode.Append "append" append SaveMode.Overwrite "overwrite" overwrite SaveMode.Ignore "ignore" data exists, ignore Parquet file Parquet read and write

The Parquet format is often used in the Hadoop ecosystem, and it also supports all data types of Spark SQL. Spark SQL provides a way to read and store files in Parquet format directly.

/ / Encoders for most common types are automatically provided by importing spark.implicits._import spark.implicits._val peopleDF = spark.read.json ("examples/src/main/resources/people.json") / / DataFrames can be saved as Parquet files Maintaining the schema informationpeopleDF.write.parquet ("hdfs://hadoop001:9000/people.parquet") / / Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFrameval parquetFileDF = spark.read.parquet ("hdfs://hadoop001:9000/people.parquet") / / Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView ("parquetFile") val namesDF = spark.sql ("SELECT name FROM parquetFile WHERE age" BETWEEN 13 AND 19 ") namesDF.map (attributes = >" Name: "+ attributes (0)). Show () / / +-+ / / | value | / / +-+ / / | Name: Justin | / / +-+ parse partition information

Partitioning tables is one of the ways to optimize data. In partitioned tables, data is stored in different directories through partitioned columns. Parquet data sources can now automatically discover and parse partition information. For example, partition storage of population data, partitioned as gender and country, using the following directory structure:

Path └── to └── table ├── gender=male │ ├──... │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └──... gender=female ├──... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └──...

By passing path/to/table to SQLContext.read.parque

Or SQLContext.read.load,Spark SQL will automatically resolve the partition information.

The Schema of the returned DataFrame is as follows:

Root |-name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)

It is important to note that the data type of the partitioned column of the data is automatically parsed. Currently, numeric and string types are supported. The parameters for automatically resolving partition types are:

Spark.sql.sources.partitionColumnTypeInference.enabled. The default value is true.

If you want to turn off the feature, set the parameter to disabled directly. At this point, the partition column data format is set to string by default, and type parsing is no longer performed.

Schema merger

Like ProtocolBuffer, Avro, and Thrift, Parquet also supports Schema evolution (Schema evolution). Users can first define a simple Schema, and then gradually add column descriptions to the Schema. In this way, users can get multiple Schema files that are compatible with each other with different Parquet. Now the Parquet data source can automatically detect this situation and merge the schemas of these files.

Because Schema merge is a costly operation and is not required in most cases, Spark SQL has increased from 1.5.0

This feature is turned off by default. You can turn on this feature in two ways:

When the data source is a Parquet file, set the data source option mergeSchema to true.

Set global SQL options:

Spark.sql.parquet.mergeSchema is true.

/ / sqlContext from the previous example is used in this example.// This is used to implicitly convert an RDD to a DataFrame.import spark.implicits._// Create a simple DataFrame, stored into a partition directoryval df1 = sc.makeRDD (1 to 5) .map (I = > (I, I * 2)) .toDF ("single", "double") df1.write.parquet ("hdfs://hadoop001:9000/data/test_table/key=1") / / Create another DataFrame in a new partition directory / / adding a new column and dropping an existing columnval df2 = sc.makeRDD (6 to 10) .map (I = > (I, I * 3). ToDF ("single", "triple") df2.write.parquet ("hdfs://hadoop001:9000/data/test_table/key=2") / / Read the partitioned tableval df3 = spark.read.option ("mergeSchema") "true") .parquet ("hdfs://hadoop001:9000/data/test_table") df3.printSchema () / / The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths.// root// |-- single: int (nullable = true) / / |-- double: int (nullable = true) / / |-- triple: int (nullable = true) / / |-- key: int (nullable = true) Hive data source

Apache Hive is the SQL engine on Hadoop, and Spark SQL can be compiled with or without Hive support. Spark SQL with Hive support can support Hive table access, UDF (user-defined functions), Hive query language (HiveQL/HQL), and so on. It is important to emphasize that if you want to include Hive's libraries in Spark SQL, you do not need to install Hive in advance. In general, it is best to introduce Hive support when compiling Spark SQL so that these features can be used. If you are downloading a binary version of Spark, it should have added Hive support at compile time.

To connect Spark SQL to a deployed Hive, you must copy the hive-site.xml to the configuration file directory of Spark ($SPARK_HOME/conf). Hive,Spark SQL can be run even if it is not deployed.

It is important to note that if you do not deploy Hive,Spark SQL, you will create your own Hive metadata warehouse called metastore_db in the current working directory. In addition, if you try to create tables using the CREATE TABLE (not CREATE EXTERNAL TABLE) statement in HiveQL, the tables will be placed in the / user/hive/warehouse directory in your default file system (if you have a hdfs-site.xml in your classpath, the default file system is HDFS, otherwise it is the local file system).

Import java.io.Fileimport org.apache.spark.sql.Rowimport org.apache.spark.sql.SparkSessioncase class Record (key: Int, value: String) / / warehouseLocation points to the default location for managed databases and tablesval warehouseLocation = new File ("spark-warehouse"). GetAbsolutePathval spark = SparkSession.builder () .appName ("Spark Hive Example"). Config ("spark.sql.warehouse.dir", warehouseLocation). EnableHiveSupport (). GetOrCreate () import spark.implicits._import spark.sqlsql ("CREATE TABLE IF NOT EXISTS src (key INT") Value STRING) ") sql (" LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src ") / / Queries are expressed in HiveQLsql (" SELECT * FROM src "). Show () / +-- + / / | key | value | / / +-- +-- + / / | 238 | val_238 | / / | 86 | val_86 | / / | 311 | val_311 | / /... / / Aggregation queries are also supported.sql (" SELECT COUNT ( *) FROM src ") .show () / / +-+ / / | count (1) | / / +-+ / / | 500 | / / +-+ / / The results of SQL queries are themselves DataFrames and support all normal functions.val sqlDF = sql (" SELECT key Value FROM src WHERE key

< 10 ORDER BY key")// The items in DataFrames are of type Row, which allows you to access each column by ordinal.val stringsDS = sqlDF.map {case Row(key: Int, value: String) =>

S "Key: $key, Value: $value"} stringsDS.show () / / +-+ / / | value | / / +-+ / / | Key: 0, Value: val_0 | / / | Key: 0, Value: val_0 | / / | Key: 0 Value: val_0 | / /... / / You can also use DataFrames to create temporary views within a SparkSession.val recordsDF = spark.createDataFrame ((1 to 100) .map (I = > Record (I) S "val_$i")) recordsDF.createOrReplaceTempView ("records") / / Queries can then join DataFrame data with data stored in Hive.sql ("SELECT * FROM records r JOIN src s ON r.key = s.key"). Show () / +-- +-+-- +-- + / / | key | value | key | value | / / +-- + / / | 2 | val_2 | 2 | val _ 2 | / / | 4 | val_4 | 4 | val_4 | / / | 5 | val_5 | 5 | val_5 | / /. Embedded Hive application

If you want to use the embedded Hive, you don't have to do anything, just use it. -conf:

Spark.sql.warehouse.dir=

Note: if you are using an internal Hive, after Spark2.0, spark.sql.warehouse.dir is used to specify the address of the data warehouse. If you need to use HDFS as the path, you need to add core-site.xml and hdfs-site.xml to the Spark conf directory, otherwise the warehouse directory on the master node will only be created, and the file will not be found when querying. If you need to use HDFS, you need to delete the metastore and restart the cluster.

External Hive applications

If you want to connect to an externally deployed Hive, you need to go through the following steps.

A copy or soft connect the hive-site.xml in Hive to the conf directory under the Spark installation directory.

B Open spark shell and note that you bring the JDBC client that accesses the Hive Metabase.

$bin/spark-shell-- master spark://hadoop001:7077-- jars mysql-connector-java-5.1.27-bin.jarJSON dataset

Spark SQL can automatically guess the structure of the JSON dataset and load it as a dataset [Row]. You can use SparkSession.read.json () to load a Dataset [String] or a JSON file. Note that this JSON file is not a traditional JSON file, and each line has to be a JSON string.

{"name": "Michael"} {"name": "Andy", "age": 30} {"name": "Justin", "age": 19} / / Primitive types (Int, String) Etc) and Product types (case classes) encoders are// supported by importing this when creating a Dataset.import spark.implicits._// A JSON dataset is pointed to by path.// The path can be either a single text file or a directory storing text filesval path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json (path) / / The inferred schema can be visualized using the printSchema () methodpeopleDF.printSchema () / / root// |-- age: long (nullable = true) / / |-- name: string (nullable = true) / / Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView ("people") / / SQL statements can be run by using the sql methods provided by sparkval teenagerNamesDF = spark.sql ("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show () / / +-+ / / | name | / / +-+ / / | Justin | / / +-+ / / Alternatively A DataFrame can be created for a JSON dataset represented by// a Dataset [String] storing one JSON object per stringval otherPeopleDataset = spark.createDataset ("" {"name": "Yin", "address": {"city": "Columbus" "state": "Ohio"} "":: Nil) val otherPeople = spark.read.json (otherPeopleDataset) otherPeople.show () / / +-+-+ / / | address | name | / / +-+ / / | [Columbus,Ohio] | Yin | / / +-+ JDBC

Spark SQL can create DataFrame by reading data from relational database through JDBC, and after a series of calculations of DataFrame, the data can be written back to relational database.

Note that you need to put the relevant database driver under the classpath of spark.

$bin/spark-shell-- master spark://hadoop001:7077-- jars mysql-connector-java-5.1.27-bin.jar// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods// Loading data from a JDBC sourceval jdbcDF = spark.read.format ("jdbc"). Option ("url", "jdbc:mysql://hadoop001:3306/rdd"). Option ("dbtable", "rddtable"). Option ("user", "root"). Option ("password") "hive"). Load () val connectionProperties = new Properties () connectionProperties.put ("user", "root") connectionProperties.put ("password", "hive") val jdbcDF2 = spark.read.jdbc ("jdbc:mysql://hadoop001:3306/rdd", "rddtable", connectionProperties) / / Saving data to a JDBC sourcejdbcDF.write.format ("jdbc"). Option ("url", "jdbc:mysql://hadoop001:3306/rdd"). Option ("dbtable", "rddtable2"). Option ("user") "root"). Option ("password", "hive"). Save () jdbcDF2.write.jdbc ("jdbc:mysql://hadoop001:3306/mysql", "db", connectionProperties) / / Specifying create table column data types on writejdbcDF.write.option ("createTableColumnTypes", "name CHAR (64), comments VARCHAR (1024)"). Jdbc ("jdbc:mysql://hadoop001:3306/mysql", "db", connectionProperties)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report