In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces the relevant knowledge of "what is the method of Spark SQL configuration and use". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
XY's personal record
SparkSQL is a module of spark, and the main entrance is SparkSession, which seamlessly mixes SQL queries with Spark programs. DataFrames and SQL provide common ways to access a variety of data sources (via JDBC or ODBC connections), including Hive,Avro,Parquet,ORC,JSON and JDBC. You can even add data across these sources. Connect to any data source in the same way. Spark SQL also supports HiveQL syntax as well as Hive SerDes and UDF, allowing you to access existing Hive repositories.
Spark SQL includes a cost-based optimizer, column storage, and code generation for quick queries. At the same time, it extends to thousands of nodes and multi-hour queries using the Spark engine, which provides complete fault tolerance for intermediate queries. Don't worry about using different engines to get historical data.
SparkSQL version:
Before Spark2.0
Entrance: SQLContext and HiveContext
SQLContext: the construction of the main DataFrame and the execution of DataFrame. SQLContext refers to the program entry of the SQL module in spark.
HiveContext: a subclass of SQLContext, designed for integration with Hive, such as reading metadata from Hive, storing data to Hive tables, window analysis functions of Hive, etc.
After Spark2.0
Portals: SparkSession (an overall portal to spark applications), merging SQLContext and HiveContext
SparkSQL core abstraction: DataFrame/Dataset type DataFrame = Dataset [Row] / / type give individual names to a data type
SparkSQL DSL syntax
SparkSQL not only supports direct HQL query, but also supports data operations through DSL statement / API. The main DataFrame API list is as follows:
Select: similar to the select in the HQL statement, get the required field information
Where/filter: similar to the where statement in a HQL statement, filtering data based on given conditions
Sort/orderBy: global data sorting function, similar to the order by statement in Hive, sorting all data according to a given field
SortWithinPartitions: Hive-like sort by statement that sorts data by partition
GroupBy: data aggregation operation
Limit: get the first N data records
Integration of SparkSQL and Hive
Integration steps:
-1. Namenode and datanode start
-2. Soft connect or copy the hive configuration file to the conf directory of spark
$ln-s / opt/modules/apache/hive-1.2.1/conf/hive-site.xml or$ cp / opt/modules/apache/hive-1.2.1/conf/hive-site.xml. /
-3. Adopt different policies according to different configuration items in hive-site.xml
According to the hive.metastore.uris parameter
-a. When the hive.metastore.uris parameter is empty (default)
The integration of SparkSQL to hive can be completed by adding the driver jar file of Hive Metabase to the classpath environment variable of spark.
-b. When hive.metastore.uris is not empty
-1. Start the metastore service for hive
. / bin/hive-- service metastore &
-2. Complete the integration of SparkSQL and Hive
-4. An error was found when starting spark-SQL ($bin/spark-sql):
Java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
At java.net.URLClassLoader$1.run (URLClassLoader.java:366)
At java.net.URLClassLoader$1.run (URLClassLoader.java:355)
At java.security.AccessController.doPrivileged (Native Method)
At java.net.URLClassLoader.findClass (URLClassLoader.java:354)
At java.lang.ClassLoader.loadClass (ClassLoader.java:425)
At java.lang.ClassLoader.loadClass (ClassLoader.java:358)
At java.lang.Class.forName0 (Native Method)
At java.lang.Class.forName (Class.java:270)
At org.apache.spark.util.Utils$.classForName (Utils.scala:228)
At org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain (SparkSubmit.scala:693)
At org.apache.spark.deploy.SparkSubmit$.doRunMain$1 (SparkSubmit.scala:185)
At org.apache.spark.deploy.SparkSubmit$.submit (SparkSubmit.scala:210)
At org.apache.spark.deploy.SparkSubmit$.main (SparkSubmit.scala:124)
At org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)
Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
You need to build Spark with-Phive and-Phive-thriftserver.
Solution: copy sql/hive-thriftserver/target/spark-hive-thriftserver_2.11-2.0.2.jar from the spark source code to the jars directory of spark
Done. (check the database spark-sql (default) > show databases;, which operates on Hive)
Write two simple SQL
Spark-sql (default) > select * from emp
You can also make two changed jion.
Spark-sql (default) > select A. select. * from emp a left join dept b on a.deptno = b.deptno
You can perform a cache operation on the table 3
> cache table emp; # cache operation > uncache table dept; # clear cache operation > explain select * from emp; # execution plan
We can see the corresponding Storage information, and the following Stages operation disappears after the cache is cleared.
Start a Spark Shell, and you can write SQL statements directly in shell
Bin/spark-shell# can write sqlscala > spark.sql ("show databases") .showscala > spark.sql ("use common") .showscala > spark.sql ("select * from emp a join dept b on a.deptno = b.deptno") .show in shell.
Receive DataFrame with a variable name
For example, use registerTempTable to register a temporary table. Note: temporary tables are public to all databases and do not need to specify a database
Scala > df.registerTempTable ("table_regis01")
Spark applications rely on third-party jar package file solutions
The jar package on which our service depends can be seen in the Classpath Entries node under the Environment node on our page 4040. Http://hadoop01.com:4040/environment/
1. Add driver jar directly to ${SPARK_HOME} / jars
two。 Add a local jar package using the parameter-- jars
. / bin/spark-shell-- jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/servlet-api-2.5.jar
If you add more than one local jar, separate them with commas
. / bin/spark-shell-- jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/*
Note: you cannot use * to add jar packages. If you want to add multiple dependent jar, you can only add them one by one.
3. Add a third-party jar file in maven using the parameter-- packages
. Bin/spark-shell-packages mysql:mysql-connector-java:5.1.28
You can use commas to separate given multiple, format (groupId:artifactId:version)
(the underlying execution principle first downloads the third-party jar files that are not available locally from the maven central library to the local, jar files will first be downloaded to the local / home/ijeffrey/.ivy2/jars directory, and finally added to the classpath through spark.jars.)
-- exclude-packages removes unwanted bags
-- repositories maven source, specifying URL connection
4. Use the SPARK_CLASSPATH environment variable to give the path to the jar file
Edit the spark-env.sh file
Path to the external jar of SPARK_CLASSPATH=/opt/modules/apache/spark-2.0.2/external_jars/*
5. Package the third-party jar file into the final jar file
Add the dependency jar in IDEA to the jar of the final spark application that needs to be run
SparkSQL's ThriftServer service
The bottom layer of ThriftServer is the HiveServer2 service of Hive. Here are the connections related to the client connection HiveServer2 method.
Https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC
Configuration of https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics # hiveserver2
Https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2
Configuration:
-1. The Spark environment in which ThriftServer services run must complete the integration of SparkSQL and Hive
-2. Parameters related to configuring hiveserver2 service in hive-site.xml
Hive.server2.thrift.bind.port 10000 hive.server2.thrift.bind.host hadoop01.com
-3. Start the metadata service for hive
$. / bin/hive-service metastore &
-4. Start the thriftserver service of spark, which is also a SparkSubmit service
$sbin/start-thriftserver.sh
You can also see the corresponding WEBUI interface, with one more JDBC/ODBC Server than the previous one
Note: if you need to start the Spark ThriftServer service, you need to shut down the hiveserver2 service
ThriftServer service testing of SparkSQL
-1. Check whether the process exists
Jps-ml | grep HiveThriftServer2
-2. Check if the WEB interface is working.
It is normal to have the option of JDBC/ODBC Server
-3. Through the beeline command that comes with spark
. / bin/beeline
-4. Access the ThriftServer interface of spark through jdbc
Use $bin/beeline # to start beeline# in Spark! help to view the corresponding command beeline >! help# such as connectbeeline >! connectUsage: connect [driver] # so that multiple users can connect beeline >! connect jdbc:hive2://hadoop01.com:10000# exit beeline >! quit
If the connection is successful, you can also see our connected hive on the 4040 page.
Note: if you report an error
No known driver to handle "jdbc:hive2://hadoop01.com:10000"
It indicates that the driver jar of hive is missing. Hive-jdbc-1.2.1.spark2.jar found it in our compiled source code and copy it to spark's jars.
Access the ThriftServer interface of spark through jdbc
Just as we java connect to mysql, we use scala to connect to ThriftServer
Package com.jeffrey import java.sql.DriverManager object SparkJDBCThriftServerDemo {def main (args: Array [String]): Unit = {/ / 1 add driver val driver = "org.apache.hive.jdbc.HiveDriver" Class.forName (driver) / / 2 build connection object val url = "jdbc:hive2://hadoop01.com:10000" val conn = DriverManager.getConnection (url, "ijeffrey" "123456") / / 3 sql statement executes conn.prepareStatement ("use common") .execute () var pstmt = conn.prepareStatement ("select empno,ename") Sal from emp ") var rs = pstmt.executeQuery () while (rs.next ()) {println (s" empno = ${rs.getInt ("empno")} "+ s" ename=$ {rs.getString ("ename")} "+ s" sal=$ {rs.getDouble ("sal")} ")} println ("-- -- ") pstmt = conn.prepareStatement (" select empno Ename,sal from emp where sal >? And ename =? ") Pstmt.setDouble (1m 3000) pstmt.setString (2 "KING") rs = pstmt.executeQuery () while (rs.next ()) {println (s "empno = ${rs.getInt (" empno ")}" + s "ename=$ {rs.getString (" ename ")}" + s "sal=$ {rs.getDouble (" sal ")} rs.close () Pstmt.close () conn.close ()}}
Execution result:
SparkSQL case 1: SparkSQL reads files in Json format on HDFS
1. Upload case data to HDFS
Sample data is at ${SPARK_HOME} / examples/src/main/resources/*
two。 Write SparkSQL programs
Start a spark-shell to write
Scala > val path = "/ spark/data/people.json" scala > val df = spark.read.json (path) scala > df.registerTempTable ("tmp04") / / Register a temporary table through DataFrame scala > spark.sql ("show tables"). Show / / operate scala > spark.sql ("select * from tmp04") through SQL statements. Show # saveAsTable should be use tablescala > spark.sql ("select * from tmp04"). Write.saveAsTable ("test01") # overwrite Cover append splicing ignore ignores scala > spark.sql ("select * from tmp01"). Write.mode ("overwrite"). SaveAsTable ("test01") scala > spark.sql ("select * from tmp01"). Write.mode ("append"). SaveAsTable ("test01") scala > spark.sql ("select * from tmp01"). Write.mode ("ignore") .saveAsTable ("test01")
SaveAsTable ("test01") is saved to a table that does not exist by default (test01 is not a temporary table). If the table exists, it will report an error.
There are four situations in SaveMode:
Append: splicing
Overwrite: rewriting
ErrorIfExists: if the table already exists, an error will be reported. This is the default. If it exists, the error will be reported.
Ignore: if the table already exists, ignore this step
In addition to reading data by spark.read.json, you can also use spark.sql to read data directly.
Scala > spark.sql ("select * from json.` / spark/data/ people.json` where age is not null") .show +-- +-- + | age | name | +-+ | 30 | Andy | | 19 | Justin | +-- +-- + # path usage on json.` (counterticket number) causes case 2: conversion between DataFrame and Dataset and RDD
To integrate Hive in IDEA, you need to put the hive-site.xml file in the resources directory
Package com.jeffrey.sql import java.util.Properties import org.apache.spark.sql. {DataFrame, SaveMode, SparkSession} object HiveJoinMySQLDemo {def main (args: Array [String]): Unit = {System.setProperty ("hadoop.home.dir", "D:\\ hadoop-2.7.3") / / 1. Build SparkSession val warehouseLocation = "/ user/hive/warehouse" val spark = SparkSession .builder () .master ("local") / / comment out .appName ("RDD 2 DataFrame") .config ("spark.sql.warehouse.dir") if you put it into the cluster to run WarehouseLocation) .enableHiveSupport () .getOrCreate () import spark.implicits._ import spark.sql val url = "jdbc:mysql://hadoop01.com:3306/test" val table = "tb_dept" val props = new Properties () props.put ("user", "root") props.put ("password") "123456") / / the 1.Hive table data can be imported into MySQL. In shell, you can use paste to write multiline spark.read.table ("common.dept") .write .mode (SaveMode.Overwrite) .JDBC (url,table). Props) / / join operation of 2.Hive and MySQL / / 2.1Reading the data of MySQL val df: DataFrame = spark .read .JDBC (url,table,props) df.createOrReplaceTempView ("tmp_tb_dept") / / 2.1data aggregation spark.sql ("" | select a.fetch b.dname B.loc | from common.emp a | join tmp_tb_dept b on a.deptno = b.deptno "" .stripMargin) .createOrReplaceTempView ("tmp_emp_join_dept_result") spark.sql ("select * from tmp_emp_join_dept_result") .show () / a method for caching a pair of tables spark.read.table ("tmp_emp") _ join_dept_result ") .cache () spark.catalog.cacheTable (" tmp_emp_join_dept_result ") / / output to HDFS / / method 1 / * spark .read .table (" tmp_emp_join_dept_result ") .write.parquet (" / spark/sql/hive_join_mysql ") ) * / / method 2 spark .read .table ("tmp_emp_join_dept_result") .write .format ("parquet") .save (s "hdfs://hadoop01.com:8020/spark/sql/hive_join_mysql/$ {System.currentTimeMillis ()}" ) / / output to Hive And is in parquet format according to deptno partition spark .read .table ("tmp_emp_join_dept_result") .write .format ("parquet") .partitionBy ("deptno") .mode (SaveMode.Overwrite) .saveAsTable ("hive_emp") _ dept ") println ("-- ") spark.sql (" show tables ") .show () / clear cache spark.catalog.uncacheTable (" tmp_emp_join_dept_result ")}}
It can be typed into jar files and executed on the cluster.
Bin/spark-submit\-class com.jeffrey.sql.HiveJoinMySQLDemo\-master yarn\-deploy-mode client\ / opt/datas/jar/hivejoinmysql.jar bin/spark-submit\-class com.jeffrey.sql.HiveJoinMySQLDemo\-master yarn\-deploy-mode cluster\ / opt/datas/logAnalyze.jar
Above even the basic use of Spark SQL.
The function of SparkSQL
SparkSQL supports basically all the functions supported by HIve. SparkSQL supports two custom functions, namely: UDF and UDAF, both of which are registered through the udf attribute of SparkSession; SparkSQL does not support the custom use of UDTF functions.
☆ UDF: a data input, a data output, an one-to-one function, that is, an ordinary function
☆ UDAF: multiple data inputs, one data output, many-to-one function, namely aggregate function
This is the end of the content of "what is the method of configuring and using Spark SQL". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.