Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the method of configuring and using Spark SQL

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article introduces the relevant knowledge of "what is the method of Spark SQL configuration and use". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

XY's personal record

SparkSQL is a module of spark, and the main entrance is SparkSession, which seamlessly mixes SQL queries with Spark programs. DataFrames and SQL provide common ways to access a variety of data sources (via JDBC or ODBC connections), including Hive,Avro,Parquet,ORC,JSON and JDBC. You can even add data across these sources. Connect to any data source in the same way. Spark SQL also supports HiveQL syntax as well as Hive SerDes and UDF, allowing you to access existing Hive repositories.

Spark SQL includes a cost-based optimizer, column storage, and code generation for quick queries. At the same time, it extends to thousands of nodes and multi-hour queries using the Spark engine, which provides complete fault tolerance for intermediate queries. Don't worry about using different engines to get historical data.

SparkSQL version:

Before Spark2.0

Entrance: SQLContext and HiveContext

SQLContext: the construction of the main DataFrame and the execution of DataFrame. SQLContext refers to the program entry of the SQL module in spark.

HiveContext: a subclass of SQLContext, designed for integration with Hive, such as reading metadata from Hive, storing data to Hive tables, window analysis functions of Hive, etc.

After Spark2.0

Portals: SparkSession (an overall portal to spark applications), merging SQLContext and HiveContext

SparkSQL core abstraction: DataFrame/Dataset type DataFrame = Dataset [Row] / / type give individual names to a data type

SparkSQL DSL syntax

SparkSQL not only supports direct HQL query, but also supports data operations through DSL statement / API. The main DataFrame API list is as follows:

Select: similar to the select in the HQL statement, get the required field information

Where/filter: similar to the where statement in a HQL statement, filtering data based on given conditions

Sort/orderBy: global data sorting function, similar to the order by statement in Hive, sorting all data according to a given field

SortWithinPartitions: Hive-like sort by statement that sorts data by partition

GroupBy: data aggregation operation

Limit: get the first N data records

Integration of SparkSQL and Hive

Integration steps:

-1. Namenode and datanode start

-2. Soft connect or copy the hive configuration file to the conf directory of spark

$ln-s / opt/modules/apache/hive-1.2.1/conf/hive-site.xml or$ cp / opt/modules/apache/hive-1.2.1/conf/hive-site.xml. /

-3. Adopt different policies according to different configuration items in hive-site.xml

According to the hive.metastore.uris parameter

-a. When the hive.metastore.uris parameter is empty (default)

The integration of SparkSQL to hive can be completed by adding the driver jar file of Hive Metabase to the classpath environment variable of spark.

-b. When hive.metastore.uris is not empty

-1. Start the metastore service for hive

. / bin/hive-- service metastore &

-2. Complete the integration of SparkSQL and Hive

-4. An error was found when starting spark-SQL ($bin/spark-sql):

Java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver

At java.net.URLClassLoader$1.run (URLClassLoader.java:366)

At java.net.URLClassLoader$1.run (URLClassLoader.java:355)

At java.security.AccessController.doPrivileged (Native Method)

At java.net.URLClassLoader.findClass (URLClassLoader.java:354)

At java.lang.ClassLoader.loadClass (ClassLoader.java:425)

At java.lang.ClassLoader.loadClass (ClassLoader.java:358)

At java.lang.Class.forName0 (Native Method)

At java.lang.Class.forName (Class.java:270)

At org.apache.spark.util.Utils$.classForName (Utils.scala:228)

At org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain (SparkSubmit.scala:693)

At org.apache.spark.deploy.SparkSubmit$.doRunMain$1 (SparkSubmit.scala:185)

At org.apache.spark.deploy.SparkSubmit$.submit (SparkSubmit.scala:210)

At org.apache.spark.deploy.SparkSubmit$.main (SparkSubmit.scala:124)

At org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)

Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.

You need to build Spark with-Phive and-Phive-thriftserver.

Solution: copy sql/hive-thriftserver/target/spark-hive-thriftserver_2.11-2.0.2.jar from the spark source code to the jars directory of spark

Done. (check the database spark-sql (default) > show databases;, which operates on Hive)

Write two simple SQL

Spark-sql (default) > select * from emp

You can also make two changed jion.

Spark-sql (default) > select A. select. * from emp a left join dept b on a.deptno = b.deptno

You can perform a cache operation on the table 3

> cache table emp; # cache operation > uncache table dept; # clear cache operation > explain select * from emp; # execution plan

We can see the corresponding Storage information, and the following Stages operation disappears after the cache is cleared.

Start a Spark Shell, and you can write SQL statements directly in shell

Bin/spark-shell# can write sqlscala > spark.sql ("show databases") .showscala > spark.sql ("use common") .showscala > spark.sql ("select * from emp a join dept b on a.deptno = b.deptno") .show in shell.

Receive DataFrame with a variable name

For example, use registerTempTable to register a temporary table. Note: temporary tables are public to all databases and do not need to specify a database

Scala > df.registerTempTable ("table_regis01")

Spark applications rely on third-party jar package file solutions

The jar package on which our service depends can be seen in the Classpath Entries node under the Environment node on our page 4040. Http://hadoop01.com:4040/environment/

1. Add driver jar directly to ${SPARK_HOME} / jars

two。 Add a local jar package using the parameter-- jars

. / bin/spark-shell-- jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/servlet-api-2.5.jar

If you add more than one local jar, separate them with commas

. / bin/spark-shell-- jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/*

Note: you cannot use * to add jar packages. If you want to add multiple dependent jar, you can only add them one by one.

3. Add a third-party jar file in maven using the parameter-- packages

. Bin/spark-shell-packages mysql:mysql-connector-java:5.1.28

You can use commas to separate given multiple, format (groupId:artifactId:version)

(the underlying execution principle first downloads the third-party jar files that are not available locally from the maven central library to the local, jar files will first be downloaded to the local / home/ijeffrey/.ivy2/jars directory, and finally added to the classpath through spark.jars.)

-- exclude-packages removes unwanted bags

-- repositories maven source, specifying URL connection

4. Use the SPARK_CLASSPATH environment variable to give the path to the jar file

Edit the spark-env.sh file

Path to the external jar of SPARK_CLASSPATH=/opt/modules/apache/spark-2.0.2/external_jars/*

5. Package the third-party jar file into the final jar file

Add the dependency jar in IDEA to the jar of the final spark application that needs to be run

SparkSQL's ThriftServer service

The bottom layer of ThriftServer is the HiveServer2 service of Hive. Here are the connections related to the client connection HiveServer2 method.

Https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC

Configuration of https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics # hiveserver2

Https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2

Configuration:

-1. The Spark environment in which ThriftServer services run must complete the integration of SparkSQL and Hive

-2. Parameters related to configuring hiveserver2 service in hive-site.xml

Hive.server2.thrift.bind.port 10000 hive.server2.thrift.bind.host hadoop01.com

-3. Start the metadata service for hive

$. / bin/hive-service metastore &

-4. Start the thriftserver service of spark, which is also a SparkSubmit service

$sbin/start-thriftserver.sh

You can also see the corresponding WEBUI interface, with one more JDBC/ODBC Server than the previous one

Note: if you need to start the Spark ThriftServer service, you need to shut down the hiveserver2 service

ThriftServer service testing of SparkSQL

-1. Check whether the process exists

Jps-ml | grep HiveThriftServer2

-2. Check if the WEB interface is working.

It is normal to have the option of JDBC/ODBC Server

-3. Through the beeline command that comes with spark

. / bin/beeline

-4. Access the ThriftServer interface of spark through jdbc

Use $bin/beeline # to start beeline# in Spark! help to view the corresponding command beeline >! help# such as connectbeeline >! connectUsage: connect [driver] # so that multiple users can connect beeline >! connect jdbc:hive2://hadoop01.com:10000# exit beeline >! quit

If the connection is successful, you can also see our connected hive on the 4040 page.

Note: if you report an error

No known driver to handle "jdbc:hive2://hadoop01.com:10000"

It indicates that the driver jar of hive is missing. Hive-jdbc-1.2.1.spark2.jar found it in our compiled source code and copy it to spark's jars.

Access the ThriftServer interface of spark through jdbc

Just as we java connect to mysql, we use scala to connect to ThriftServer

Package com.jeffrey import java.sql.DriverManager object SparkJDBCThriftServerDemo {def main (args: Array [String]): Unit = {/ / 1 add driver val driver = "org.apache.hive.jdbc.HiveDriver" Class.forName (driver) / / 2 build connection object val url = "jdbc:hive2://hadoop01.com:10000" val conn = DriverManager.getConnection (url, "ijeffrey" "123456") / / 3 sql statement executes conn.prepareStatement ("use common") .execute () var pstmt = conn.prepareStatement ("select empno,ename") Sal from emp ") var rs = pstmt.executeQuery () while (rs.next ()) {println (s" empno = ${rs.getInt ("empno")} "+ s" ename=$ {rs.getString ("ename")} "+ s" sal=$ {rs.getDouble ("sal")} ")} println ("-- -- ") pstmt = conn.prepareStatement (" select empno Ename,sal from emp where sal >? And ename =? ") Pstmt.setDouble (1m 3000) pstmt.setString (2 "KING") rs = pstmt.executeQuery () while (rs.next ()) {println (s "empno = ${rs.getInt (" empno ")}" + s "ename=$ {rs.getString (" ename ")}" + s "sal=$ {rs.getDouble (" sal ")} rs.close () Pstmt.close () conn.close ()}}

Execution result:

SparkSQL case 1: SparkSQL reads files in Json format on HDFS

1. Upload case data to HDFS

Sample data is at ${SPARK_HOME} / examples/src/main/resources/*

two。 Write SparkSQL programs

Start a spark-shell to write

Scala > val path = "/ spark/data/people.json" scala > val df = spark.read.json (path) scala > df.registerTempTable ("tmp04") / / Register a temporary table through DataFrame scala > spark.sql ("show tables"). Show / / operate scala > spark.sql ("select * from tmp04") through SQL statements. Show # saveAsTable should be use tablescala > spark.sql ("select * from tmp04"). Write.saveAsTable ("test01") # overwrite Cover append splicing ignore ignores scala > spark.sql ("select * from tmp01"). Write.mode ("overwrite"). SaveAsTable ("test01") scala > spark.sql ("select * from tmp01"). Write.mode ("append"). SaveAsTable ("test01") scala > spark.sql ("select * from tmp01"). Write.mode ("ignore") .saveAsTable ("test01")

SaveAsTable ("test01") is saved to a table that does not exist by default (test01 is not a temporary table). If the table exists, it will report an error.

There are four situations in SaveMode:

Append: splicing

Overwrite: rewriting

ErrorIfExists: if the table already exists, an error will be reported. This is the default. If it exists, the error will be reported.

Ignore: if the table already exists, ignore this step

In addition to reading data by spark.read.json, you can also use spark.sql to read data directly.

Scala > spark.sql ("select * from json.` / spark/data/ people.json` where age is not null") .show +-- +-- + | age | name | +-+ | 30 | Andy | | 19 | Justin | +-- +-- + # path usage on json.` (counterticket number) causes case 2: conversion between DataFrame and Dataset and RDD

To integrate Hive in IDEA, you need to put the hive-site.xml file in the resources directory

Package com.jeffrey.sql import java.util.Properties import org.apache.spark.sql. {DataFrame, SaveMode, SparkSession} object HiveJoinMySQLDemo {def main (args: Array [String]): Unit = {System.setProperty ("hadoop.home.dir", "D:\\ hadoop-2.7.3") / / 1. Build SparkSession val warehouseLocation = "/ user/hive/warehouse" val spark = SparkSession .builder () .master ("local") / / comment out .appName ("RDD 2 DataFrame") .config ("spark.sql.warehouse.dir") if you put it into the cluster to run WarehouseLocation) .enableHiveSupport () .getOrCreate () import spark.implicits._ import spark.sql val url = "jdbc:mysql://hadoop01.com:3306/test" val table = "tb_dept" val props = new Properties () props.put ("user", "root") props.put ("password") "123456") / / the 1.Hive table data can be imported into MySQL. In shell, you can use paste to write multiline spark.read.table ("common.dept") .write .mode (SaveMode.Overwrite) .JDBC (url,table). Props) / / join operation of 2.Hive and MySQL / / 2.1Reading the data of MySQL val df: DataFrame = spark .read .JDBC (url,table,props) df.createOrReplaceTempView ("tmp_tb_dept") / / 2.1data aggregation spark.sql ("" | select a.fetch b.dname B.loc | from common.emp a | join tmp_tb_dept b on a.deptno = b.deptno "" .stripMargin) .createOrReplaceTempView ("tmp_emp_join_dept_result") spark.sql ("select * from tmp_emp_join_dept_result") .show () / a method for caching a pair of tables spark.read.table ("tmp_emp") _ join_dept_result ") .cache () spark.catalog.cacheTable (" tmp_emp_join_dept_result ") / / output to HDFS / / method 1 / * spark .read .table (" tmp_emp_join_dept_result ") .write.parquet (" / spark/sql/hive_join_mysql ") ) * / / method 2 spark .read .table ("tmp_emp_join_dept_result") .write .format ("parquet") .save (s "hdfs://hadoop01.com:8020/spark/sql/hive_join_mysql/$ {System.currentTimeMillis ()}" ) / / output to Hive And is in parquet format according to deptno partition spark .read .table ("tmp_emp_join_dept_result") .write .format ("parquet") .partitionBy ("deptno") .mode (SaveMode.Overwrite) .saveAsTable ("hive_emp") _ dept ") println ("-- ") spark.sql (" show tables ") .show () / clear cache spark.catalog.uncacheTable (" tmp_emp_join_dept_result ")}}

It can be typed into jar files and executed on the cluster.

Bin/spark-submit\-class com.jeffrey.sql.HiveJoinMySQLDemo\-master yarn\-deploy-mode client\ / opt/datas/jar/hivejoinmysql.jar bin/spark-submit\-class com.jeffrey.sql.HiveJoinMySQLDemo\-master yarn\-deploy-mode cluster\ / opt/datas/logAnalyze.jar

Above even the basic use of Spark SQL.

The function of SparkSQL

SparkSQL supports basically all the functions supported by HIve. SparkSQL supports two custom functions, namely: UDF and UDAF, both of which are registered through the udf attribute of SparkSession; SparkSQL does not support the custom use of UDTF functions.

☆ UDF: a data input, a data output, an one-to-one function, that is, an ordinary function

☆ UDAF: multiple data inputs, one data output, many-to-one function, namely aggregate function

This is the end of the content of "what is the method of configuring and using Spark SQL". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report