In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
[TOC]
1. Spark SQL Overview 1.1What is spark SQL
Spark SQL is a module that Spark uses to process structured data, it provides a programming abstraction called DataFrame and acts as a distributed SQL query engine. Similar to the role of hive.
1.2 Features of spark SQL
1. Easy to integrate: when you install Spark, it is already integrated. It does not need to be installed separately.
2. Unified data access methods: JDBC, JSON, Hive, parquet files (a column storage file, which is the default data source for SparkSQL, and is also supported in hive)
3. Fully compatible with Hive. The data in Hive can be read directly into Spark SQL for processing.
Generally, in production, hive is used as a data warehouse to store data, and then spark is used to read data from hive for processing.
4. Support standard data connections: JDBC, ODBC
5. The computing efficiency is higher than that of hive based on mr, and in the version of hive2.x, hive recommends using spark as the execution engine.
Basic principles of spark SQL 2.1 DataFrame and DataSet basic Concepts 2.1.1 DataFrame
A DataFrame is a dataset that is organized into named columns. It is conceptually equivalent to a table in a relational database, with table structure and data, but with richer optimizations at the bottom. DataFrames can be built from a variety of sources
For example:
Structured data file
Tables in hive
External database or existing RDDs
The languages supported by DataFrame API are Scala,Java,Python and R.
Compared with RDD,DataFrame, has more structural information of data, namely schema. RDD is a collection of distributed Java objects. DataFrame is a collection of distributed Row objects. In addition to providing richer operators than RDD, DataFrame has more important features such as improved execution efficiency, reduced data reading and optimization of execution plans.
2.1.2 DataSet
Dataset is a distributed data collector. This is a new interface added after Spark1.6, which takes into account the advantages of RDD (strong typing, you can use powerful lambda) and the high efficiency of Spark SQL actuators. Therefore, DataFrames can be regarded as a special kind of Datasets, namely: Dataset (Row)
2.2 how to create a DataFrame 2.2.1 SparkSession object
Apache Spark 2.0 introduces SparkSession, which provides users with a unified entry point to use the functions of Spark, and allows users to call DataFrame and Dataset-related API to write Spark programs. Most importantly, it reduces some of the concepts that users need to know, making it easy for us to interact with Spark.
Before version 2. 0 of , SparkConf and SparkContext must be created before interacting with Spark. In Spark 2.0, however, we can do the same through SparkSession without explicitly creating SparkConf, SparkContext, and SQLContext, because these objects are already encapsulated in SparkSession.
should note that in this version of spark I use new SQLContext () directly to create a SQLContext object, which shows that this method has been deprecated (deprecated in IDEA), and it is recommended to use SparkSession to get the SQLContext object.
2.2.2 through the case class sample class
This method is commonly used in scala because case class is a feature of scala
/ * * the structure of table t_stu is: id name age*/object CreateDF {def main (args: Array [String]): Unit = {/ / this is the latest way to get SQLContext object / / 2, create SparkSession object, set master,appnameval spark = SparkSession.builder (). Master ("local"). AppName ("createDF case class"). GetOrCreate () / 3, get sparkContext object through spark Read data val lines = spark.sparkContext.textFile ("G:\\ test\\ t_stu.txt"). Map (_ .split (",") / 4. Map data to case class, that is, val tb = lines.map (t = > emp (t (0) .toInt, t (1), t (2) .toInt)) / / implicit conversion must be added here Otherwise, you cannot call the toDF function import spark.sqlContext.implicits._ / / 5, generate df val df2 = tb.toDF () / / equivalent to select name from t_stu df1.select ($"name"). Show () / / close the spark object spark.stop ()}} / * 1, define case class, and define each attribute corresponding to the field name and type in the table. In general, for convenience, they are all defined as string types. Then changing the string to the required type according to the actual situation is equivalent to defining the structure of the table * / case class emp (id:Int,name:String,age:Int).
The summary steps are:
1. Define case class, which is used for table structure 2, create sparkSession object, read data 3, map data in rdd and case class 4, call toDF function to convert rdd to DataFrame2.2.3 through StructType class
This method of java is more commonly used.
Package SparkSQLExerimport org.apacheimport org.apache.sparkimport org.apache.spark.sql. {Row, SparkSession} import org.apache.spark. {SparkConf, SparkContext} import org.apache.spark.sql.types. {DataTypes, StructField, StructType} / * create dataschema 2: * create through spark session object Create table structure through StructType * / object CreateDF02 {def main (args: Array [String]): Unit = {val sparkS = SparkSession.builder (). Master ("local"). AppName ("create schema"). GetOrCreate () / 1, create table structure schema through StructType Each field in the table uses StructField to define val tbSchema = StructType (List (StructField ("id", DataTypes.IntegerType), StructField ("name", DataTypes.StringType), StructField ("age", DataTypes.IntegerType)) / / 2, read data var lines = sparkS.sparkContext.textFile ("G:\\ test\\ t_stu.txt"). Map (_ .split (" ) / / 3. Map data to ROW objects val rdd1 = lines.map (t = > Row (t (0) .toInt, t (1), t (2) .toInt) / / 4, create table structure and table data mapping What is returned is df val df2 = sparkS.createDataFrame (rdd1, tbSchema) / / print the table structure df2.printSchema () sparkS.stop ()}}
The summary steps are:
1. Create a table structure schema through StructType, in which each field of the table uses StructField definition 2, reads data 3 through sparkSession.sparkContext, maps data to Row object 4, maps StructType to data Row object, and returns df2.2.4 using file types with table formats such as json. * create df method 3: import data and table structure directly through formatted files For example, a file in json format * returns a DF * / object CreateDF03 {def main (args: Array [String]): Unit = {val sparkS = SparkSession.builder (). Master ("local"). AppName ("create df through json"). GetOrCreate () / read json mode 1: val jsonrdd1= sparkS.read.json ("path") / / read json mode 2: val jsonrdd1= sparkS.read.format (" Json ") .load (" path ") sparkS.stop ()}}
This method is relatively simple, which is to read the json file directly.
When sparkS.read.xxxx reads any file, it returns a DF object.
2.3 manipulating DataFrame2.3.1 DSL statements
The DSL statement actually changes some operations of the sql statement into a function-like way, such as:
Df1.select ("name"). Show
Example:
For convenience, operate directly in spark-shell Spark-shell-- master spark://bigdata121:70771, print table structure scala > df1.printSchemaroot |-- empno: integer (nullable = true) |-- ename: string (nullable = true) |-- job: string (nullable = true) |-- mgr: integer (nullable = true) |-- hiredate: string (nullable = true) |-- sal: integer (nullable = true) |-- comm: integer (nullable = true) |-- deptno: integer (nullable = true) 2, Displays the table data of the current df or the data of the query result scala > df1.show+----+-+----+----+-+ | empno | ename | job | mgr | sal | comm | deptno | + -. | 10 | 10 | 7844 | TURNER | SALESMAN | 7698 | 1981-9-8 | 1500 | 0 | 30 | 7876 | ADAMS | CLERK | 7788 | 1987-5-23 | 1100 | 20 | 7900 | JAMES | CLERK | 7698 | 1981-12-3 | 950 | 30 | 7902 | FORD | ANALYST | 7566 | 1981-12-3 | 7788 | 0 | 20 | MILLER | CLERK | 7782 | 1982 Canner23 | 1 300 | 0 | 10 | +-- + 3, Execute select Equivalent to select xxx form xxx where xxxscala > df1.select ("ename", "sal"). Where ("sal > 2000"). Show+-+----+ | ename | sal | +-+-- + | SMITH | 1600 | ALLEN | 1600 | WARD | 1250 | JONES | 2975 | MARTIN | 1250 | BLAKE | 2850 | CLARK | 2450 | SCOTT | 3000 | KING | 5000 | TURNER | 1500 | ADAMS | 1 100 | JAMES | JAMES | 950 | FORD | 3 000 | MILLER | 1 300 | +-+ 4 when certain column operations are performed on a specified operation You need to add the $symbol, and then you can operate the $representation after it is taken out, and then do something. Note: the use of this $does not work properly in ideal The solution is scala > df1.select ($"ename", $"sal") Show+-+----+-+ | ename | sal | (sal + 100) | +-+-- + | SMITH | 800,900 | ALLEN | 1600 | 1700 | WARD | 1250 | JONES | 2975 | 3075 | MARTIN | 1250 | 1350 | BLAKE | 2850 | CLARK | 2450 | 2550 | SCOTT | 3100 | KING | 5100 | TURNER | 1500 | 1600 | ADAMS | 1100 | 1200 | JAMES | 950 | 1050 | | FORD | 3000 | 3100 | | MILLER | 1300 | 1400 | +-+-- +-- + 5. Filter line scala > df1.filter ($"sal" > 2000). Show+----+-+----+----+-+ | empno | ename | job | mgr | hiredate | comm | deptno | +-+-+ -| +-+-+ 6. Group and count scala > df1.groupBy ($"deptno"). Count.show+-+-+ | deptno | count | +-+-+ | 20 | 5 | 10 | 3 | | 30 | 6 | +-+-+
As mentioned above, it cannot be used properly in select ($"name") in ide. The solution is:
Add this sentence before the statement: import spark.sqlContext.implicits._ is mainly due to type problems, plus implicit conversion is good 2.3.2 sql statement
The df object cannot execute sql directly. You need to generate a view and then execute SQL.
You need to specify the name of the view you created, which is equivalent to the table name.
I'll talk about it in more detail later in the view. Here's a concept.
Example:
Val spark = SparkSession.builder () .master ("local") .appName ("createDF case class") .getOrCreate (). / / create a temporary view through the df object. The view name is equivalent to the table name df1.createOrReplaceTempView ("emp") / / execute spark.sql ("select * from emp") through the sparksession object. Showspark.sql ("select * from emp where sal > 2000"). Showspark.sql ("select deptno,count (1) from emp group by deptno"). Show// can create multiple views Do not conflict df1.createOrReplaceTempView ("emp12345") spark.sql ("select e.deptno from emp12345 e"). Show2.3.3 multi-table query scala > case class Dept (deptno:Int,dname:String,loc:String) defined class Deptscala > val lines = sc.textFile ("/ usr/local/tmp_files/dept.csv"). Map (_ split (",") lines: org.apache.spark.rdd.RDD [string] = MapPartitionsRDD [68] at map at: 24scala > val allDept = lines.map (x = > Dept (x (0). ToInt) X (1), x (2)) allDept: org.apache.spark.rdd.RDD [Dept] = MapPartitionsRDD [69] at map at: 28scala > val df2 = allDept.toDFdf2: org.apache.spark.sql.DataFrame = [deptno: int, dname: string... 1 more field] scala > df2.createcreateGlobalTempView createOrReplaceTempView createTempViewscala > df2.createOrReplaceTempView ("dept") scala > spark.sql ("select dname,ename from emp12345") Show+-+-+ | dname | ename | +-+-+ | RESEARCH | SMITH | | RESEARCH | JONES | | RESEARCH | SCOTT | | RESEARCH | ADAMS | | RESEARCH | FORD | | ACCOUNTING | CLARK | | ACCOUNTING | | ACCOUNTING | MILLER | | SALES | ALLEN | SALES | WARD | SALES | MARTIN | SALES | BLAKE | SALES | TURNER | SALES | JAMES | + -+-+ 2.4Create DataSet2.4.1 via case class
Similar to DataFrame, except that toDF is changed to call the toDS method
Package SparkSQLExerimport org.apache.spark.sql.SparkSessionimport org.apache.spark. {SparkConf, SparkContext} object CreateDS {def main (args: Array [String]): Unit = {val spark = SparkSession.builder (). Master ("local"). AppName ("createDF case class"). GetOrCreate () val lines = spark.sparkContext.textFile ("G:\\ test\\ t_stu.txt"). Map (_ .split (",") val tb = lines.map (t = > emp1 (t (0) .toInt, t (1)) T (2) .toInt) import spark.sqlContext.implicits._ val df1 = tb.toDS () df1.select ($"name")} case class emp1 (id:Int,name:String,age:Int) 2.4.2 through the sequence of Seq class objects package SparkSQLExerimport org.apache.spark.sql.SparkSessionimport org.apache.spark. {SparkConf SparkContext} object CreateDS {def main (args: Array [String]): Unit = {val spark = SparkSession.builder (). Master ("local"). AppName ("createDF case class"). GetOrCreate () / / create a sequence object It's full of emp1 objects, mapped data. Then directly toDS to DataSet val ds1 = Seq (emp1 (1, "king", 20)). ToDS () ds1.printSchema ()}} case class emp1 (id:Int,name:String,age:Int) 2.4.3 use the json format file to define case classcase class Person (name:String Age:BigInt) uses JSON data to generate DataFrameval df = spark.read.format ("json"). Load ("/ usr/local/tmp_files/people.json") converts DataFrame to DataSetdf.as [person] .showdf.as [Person] is a generic type in DataSetas [T] that needs to be a case class class Used to map header 2.5 to operate DataSet
The operators supported by DataSet are actually the combination of rdd and DataFrame operators.
Use emp.json to generate DataFrameval empDF = spark.read.json ("/ usr/local/tmp_files/emp.json") scala > empDF.show+----+-+----+----+ | comm | deptno | empno | ename | hiredate | mgr | sal | + | 20 | 7369 | SMITH | 1980-12-17 | CLERK | 7902 | 7902 | 30 | 7499 | ALLEN | 1981-2-20 | SALESMAN | 7698 | 1600 | 30 | 7521 | WARD | 1981-2-22 | SALESMAN | 7698 | 1250 | 20 | 7566 | JONES | 1981-4-2 | MANAGER | 7839 | 2975 | 1 400 | 7 654 | MARTIN | 1981-9-28 | SALESMAN | 7698 | 1250 | 30 | 7698 | BLAKE | 1981 Lovera 1 | MANAGER | 7 839 | 2 850 | 10 | 7 782 | CLARK | 19816his 9 | MANAGER | 7 839 | 2 450 | | 20 | 7 788 | SCOTT | | 1987-4-19 | ANALYST | 7566 | 3000 | | 10 | 7839 | KING | 1981-11-17 | PRESIDENT | 5000 | 0 | 30 | 7844 | TURNER | 1981-9-8 | SALESMAN | 7698 | 1500 | 20 | 7876 | ADAMS | 1987-5-23 | CLERK | 7788 | | 30 | 7900 | JAMES | 1981x12 JAMES | CLERK | 7698,950 | | 20 | 7902 | FORD | 1981x12x3 | ANALYST | 7566 | 3000| 10 | 7934 | MILLER | 1982Grampair23 | CLERK | 771300 | +-| -+-+ scala > empDF.where ($"sal" > = 3000). Show+----+-+----+----+ | comm | deptno | empno | ename | hiredate | job | mgr | sal | + | 20 | 7788 | SCOTT | 1987-4-19 | ANALYST | 7566 | 3000 | | 10 | 7839 | KING | 1981-11-17 | PRESIDENT | | 5000 | 20 | 7902 | FORD | 1981-12-3 | ANALYST | 7566 | 3000 | +-- +-+-- + # empDF | Conversion to DataSet requires case classscala > case class Emp (empno:BigInt Ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt) defined class Empscala > val empDS = empDF.as [Emp] empDS: org.apache.spark.sql.Dataset [Emp] = [comm: string Deptno: bigint... 6 more fields] scala > empDS.filter (_ .sal > 3000). Show+----+-+---+----+ | comm | deptno | empno | hiredate | job | mgr | sal | + -+-- +-+ | | 10 | 7839 | KING | 1981-11-17 | PRESIDENT | | 5000 | +-+ scala > empDS.filter (_ .deptno = = 10). Show+----+- -+-comm | deptno | empno | ename | hiredate | job | mgr | sal | +-| 10 | 7782 | CLARK | 1981-6-9 | MANAGER | 7839 | 2450 | | 10 | 7839 | KING | 1981-11-17 | PRESIDENT | 5000 | | 10 | 7934 | MILLER | 1982-1-23 | CLERK | 7782 | 1300 | +-+
Multi-table query:
1. Create the department table scala > val deptRDD = sc.textFile ("/ usr/local/tmp_files/dept.csv"). Map (_ .split (",") deptRDD: org.apache.spark.rdd.RDD [Array [string]] = MapPartitionsRDD [string] at map at: 24scala > case class Dept (deptno:Int,dname:String,loc:String) defined class Deptscala > val deptDS = deptRDD.map (x = > Dept (x (0) .toInt, x (1)) X (2)) .toDSdeptDS: org.apache.spark.sql.Dataset [Dept] = [deptno: int Dname: string... 1 more field] scala > deptDS.show+-+ | deptno | dname | loc | +-+ | 10 | ACCOUNTING | NEW YORK | | 20 | RESEARCH | DALLAS | | 30 | SALES | CHICAGO | | 40 | OPERATIONS | BOSTON | +-+ 2, same as empDSempDS.join (deptDS) "deptno") .where (xxxx) joins two tables View view through the deptno field empDS.joinWith (deptDS,deptDS ("deptno") = empDS ("deptno")), which is used for connections with different field names.
If wants to use standard sql statements to manipulate df or ds objects, it must first create a view for df or ds objects, and then manipulate the corresponding views through the sql function of the SparkSession object. So what is the view?
The view is a virtual table that does not store data and can be treated as an access link to the table. There are two types of views:
Normal view: also known as local view, valid only in the current session session
Global view: valid in all session, global view created in the specified namespace: global_temp is similar to a library
Operation instructions:
Val spark = SparkSession.builder () .master ("local") .appName ("createDF case class") .getOrCreate () val empDF = spark.read.json ("/ usr/local/tmp_files/emp.json") create a local view: empDF.createOrReplaceTempView (view name), empDF.createTempView (view name) will be recreated if the view exists If the view exists, it does not create a global view: empDF.createGlobalTempView (view name) performs a sql operation on the view, where the view name is similar to the table name spark.sql ("xxxxx") example: empDF.createOrReplaceTempView ("emp") spark.sql ("select * from emp") .show Note: as long as the view is created, you can manipulate the view, that is, the table, in any class through the sparksession object. This feature is very useful, when we want to manipulate some tables, we can read it as df at first, and then create a view, so we can query the table anywhere. 2.7 data sources
You can read data sources in different formats through the SparkSession object:
Val spark = SparkSession.builder () .master ("local") .appName ("createDF case class") .getOrCreate ()
The following is referred to as SparkSession with the above spark.
2.7.1 SparkSession read data way 1, loadspark.read.load (path): read the file of the specified path, require the file storage format to be Parquet file 2, formatspark.read.format ("format") .load (path): specify to read other format files, such as json example: spark.read.format ("json") .load (path) 3, directly read other format file spark.read. Format name (path), which is an abbreviation in the above 2, example: spark.read.json (path) json format file spark.read.text (path) read text file Note: these methods return DataFrame object 2.7.2 SparkSession saves data can write DataFrame object to a file in a specified format Suppose there is a DataFrame object df1.1, savedf1.write.save (path) he will save the file to this directory, the file name spark randomly generated, so using the above reading method, you can directly specify the read directory, do not specify the file name. The output file format is Parquet. You can directly specify the path to hdfs, otherwise it will be stored locally such as: df1.write.save ("/ test") spark.read.load ("/ test") 2, directly specify the format to store df1.write.json (path), so that the file will be saved in json format, and the resulting file name is similar to that above. 3. Specify save mode if no save mode is specified and the output path exists Will report an error df1.write.mode ("append"). Json (path) mode ("append") means that when the file exists, append mode ("overwrite") to overwrite the old data. 4, save as table df1.write.saveAsTable (table name) will be saved in the spark-warehouse directory of the current directory. 5. Formatdf1.write.format (format). Save () uses a specified format to output saved data For example, save it to MongoDB database in 2.7.3 Parquet format
is a columnar storage format, and you can see the previous hive article for details. This format is the default storage format, the default format when using load and save, and operates in a manner similar to that mentioned above, which is not repeated here. What I'm going to talk about here is a special feature of Parquet that supports schema (table structure) merging. Example:
Scala > val df1 = sc.makeRDD (1 to 5) .map (toDF ("single", "double") df1: org.apache.spark.sql.DataFrame = [single: int Double: int] scala > df1.show+-+-+ | single | double | +-+-+ | 1 | 2 | 2 | 4 | 3 | 6 | 4 | 8 | 5 | 10 | +-+ scala > sc.makeRDD (1 to 5) res8: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [26] at makeRDD at: 25scala > sc.makeRDD (1 to 5). Collectres9: Array [Int] = Array (1) 2, 3, 4, 5) / / Export table 1scala > df1.write.parquet ("/ usr/local/tmp_files/test_table/key=1") scala > val df2 = sc.makeRDD (6 to 10) .map (I = > (iGrain3)) .toDF ("single", "triple") df2: org.apache.spark.sql.DataFrame = [single: int Triple: int] scala > df2.show+-+-+ | single | triple | +-+-+ | 6 | 18 | 7 | 21 | 8 | 24 | 9 | 27 | 10 | 30 | +-+ / / Export table 2scala > df2.write.parquet ("/ usr/local/tmp_files/test_table/key=2") scala > val df3 = spark.read.parquet ("/ usr/local/tmp") _ files/test_table ") df3: org.apache.spark.sql.DataFrame = [single: int Double: int... 1 more field] / / the field scala > df3.show+---+ will be lost when reading directly | single | double | key | +-- + | 8 | null | 2 | 9 | null | 2 | 10 | null | 2 | 3 | 6 | 1 | 4 | 8 | 1 | 5 | 10 | 1 | 6 | null | 2 | 7 | null | 2 | 1 | 2 | 1 | 2 | 2 | 2 | 2 | 4 | 1 | +- -+-+-- + / / plus option Specify "mergeSchema" as true You can merge scala > val df3 = spark.read.option ("mergeSchema", true) .parquet ("/ usr/local/tmp_files/test_table") df3: org.apache.spark.sql.DataFrame = [single: int Double: int... 2 more fields] scala > df3.show+---+ | single | double | triple | key | +-- + | 8 | null | 24 | 2 | 9 | null | 27 | 2 | 10 | null | 30 | 2 | 3 | 6 | null | 1 | 4 | 8 | null | 1 | 5 | 10 | null | 1 | 6 | null | 18 | 2 | 7 | null | 21 | 2 | | 1 | 2 | null | 1 | | 2 | 4 | null | 1 | +-- + Supplementary question: what is key? Do I have to use key? Key is a distinguishing field of different tables. When merged, it will be used as a field of the merged table, and the value is equal to the value set in key=xx. If the directory names of the two tables under the directory are different, they cannot be merged, and the merged field names can be arbitrary, such as: one is key, the other is test, the two cannot be merged, and the key or test2.7.4 json files must be unified.
This is a file with tabular fields, for example:
Scala > val peopleDF = spark.read.json ("/ usr/local/tmp_files/people.json") peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala > peopleDF.printSchema () root |-- age: long (nullable = true) |-- name: string (nullable = true) scala > peopleDF.createOrReplaceTempView ("people") scala > spark.sql ("select * from people where age=19") res25: org.apache.spark.sql.DataFrame = [age: bigint Name: string] scala > spark.sql ("select * from people where age=19") .show+---+-+ | age | name | +-- +-+ | 19 | Justin | +-+ scala > spark.sql ("select age" Count (1) from people group by age ") .show+----+-+ | age | count (1) | +-+-+ | 19 | 1 | null | 1 | 30 | 1 | +-+-+ 2.7.5 JDBC connection
Df objects support connecting to the database through jdbc, writing data to the database, or reading data from the database.
Example:
1. Read data from mysql through jdbc:
Use format (xx). Option () to specify some parameters to connect to the database, such as user name and password, connection driver used, etc.
Import java.util.Propertiesimport org.apache.spark.sql.SparkSessionobject ConnMysql {def main (args: Array [String]): Unit = {val sparkS = SparkSession.builder (). AppName ("spark sql conn mysql"). Master ("local"). GetOrCreate () / connect mysql mode 1: / / create properties configuration object The parameters used to store the connection mysql val mysqlConn = new Properties () mysqlConn.setProperty ("user", "root") mysqlConn.setProperty ("password", "wjt86912572") / / connect using jdbc, specify the connection string, table name, and other connection parameters And return the corresponding dataframeval mysqlDF1 = sparkS.read.jdbc ("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn) mysqlDF1.printSchema () mysqlDF1.show () mysqlDF1.createTempView ("customer") sparkS.sql ("select * from customer limit 2") .show () / / connection mysql mode 2 This method is more commonly used: val mysqlConn2 = sparkS.read.format ("jdbc") .option ("url", "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8") .option ("user", "root") .option ("password", "wjt86912572") .option ("driver", "com.mysql.jdbc.Driver") .option ("dbtable") "customer") .load () mysqlConn2.printSchema ()}
These are two ways for connections to read data.
2. Jdbc writes data to mysql
Similar to reading, except that it is replaced by a write operation
Import java.util.Propertiesimport org.apache.spark.sql.SparkSessionobject WriteToMysql {def main (args: Array [String]): Unit = {val spark = SparkSession.builder (). AppName ("write to mysql"). Master ("local"). GetOrCreate () val df1 = spark.read.text ("G:\\ test\\ t_stu.json") / / Mode 1: df1.write.format ("jdbc") .option ("url") "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8") .option ("user", "root") .option ("password", "wjt86912572") .option ("driver", "com.mysql.jdbc.Driver") .option ("dbtable", "customer") .save () / method 2: val mysqlConn = new Properties () mysqlConn.setProperty ("user", "root") mysqlConn.setProperty ("password") "wjt86912572") df1.write.jdbc ("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn)}} must ensure that the table format of df is the same as that of the written mysql. The field name should be the same 2.7.6 hive
1. Connect to hive through jdbc
The method is similar to normal jdbc, for example:
Import java.util.Propertiesimport org.apache.spark.sql.SparkSession/** * there are two ways to connect to hive: * 1. If you run the spark program directly in ideal, you must specify the address of the hiveserver connected to jdbc in the program * and hiveserver must expose port 10000 as a background service. This method is to connect to hive * * 2 directly through jdbc. If the program is packaged and run in a spark cluster, the configuration file of hive client * is already available in the conf directory of the spark cluster, so you will directly launch hive client to connect to hive. There is no need to start the hiveserver service at this time. * this way is to connect hive / object ConnHive {def main (args: Array [String]): Unit = {val spark = SparkSession.builder (). AppName ("spark sql conn mysql"). Master ("local"). GetOrCreate () val properties = new Properties () properties.setProperty ("user", ") properties.setProperty (" password ",") val hiveDF = spark.read.jdbc ("jdbc:hive2://bigdata121:10000/default", "customer") Properties) hiveDF.printSchema () spark.stop ()}}
There is one thing to note in this way:
Hiveserver must expose port 10000 as a background service. This way is to connect to hive directly through jdbc.
2. Connect to hive through hive client
is generally used in production, because tasks are generally submitted to the cluster to run through spark-submit, so the hive will be connected directly through hive client instead of jdbc.
Note to : you need to configure hive client on all the nodes of spark, and then copy the hive-site.xml configuration file to the conf directory of spark. At the same time, you need to copy the core-site.xml hdfs-site.xml of hadoop. On the other hand, because you want to use hive client, you usually have to configure metastore server on the hive server side. See the article on hive for specific configuration.
so that programs in the spark cluster can be used directly
For operations such as spark.sql ("xxxx"). Show, the corresponding table is read from hive by default. You don't have to do anything else to connect to the hive.
Or directly into the spark-shell, or you can directly manipulate the hive table in the above way
For example:
Import org.apache.spark.sql.SparkSessionobject ConnHive02 {def main (args: Array [String]): Unit = {val spark = SparkSession.builder (). AppName ("spark sql conn hive"). GetOrCreate () spark.sql ("select * from customer"). Show ()}} so that the direct operation is the table of hive 2.8. small case-read hive data analysis results into mysqlimport java.util.Propertiesimport org.apache.spark.sql.SparkSessionobject HiveToMysql { Def main (args: Array [String]): Unit = {/ / directly connect to hive through hive client in the spark cluster There is no need for jdbc and hive server val spark = SparkSession.builder () .appName ("hive to mysql") .enableHiveSupport () .getOrCreate () val resultDF = spark.sql ("select * from default.customer") / / the general processing logic written in the middle is to process data read from hive. Write to mysql val mysqlConn = new Properties () mysqlConn.setProperty ("user", "root") mysqlConn.setProperty ("password", "wjt86912572") / / write mysql resultDF.write.mode ("append") .jdbc ("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn) spark.stop ()}} via jdbc after processing is completed.
Start a spark-shell first.
Spark-shell-master spark://bigdata121:7077 wants to operate mysql in spark-shell, so remember to find a jar of mysql-connector and put it in the jars directory of spark.
Example:
Create df Read table scala > val mysqDF = spark.read.format ("jdbc") from mysql. Option ("url", "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8"). Option ("user", "root"). Option ("password", "wjt86912572"). Option ("driver", "com.mysql.jdbc.Driver"). Option ("dbtable", "customer"). Load () mysqDF: org.apache.spark.sql.DataFrame = [id: int " Name: string... 1 more field] scala > mysqDF.show+---+ | id | name | last_mod | +-- + | 1 | neil | 2019-07-20 17more field 09more field. | | 2 | jack | 2019-07-20 17more field 09more field. | | 3 | martin | 2019-07- 20 17 king 09 king. | | 7 | tao | 2019-07-20 17 king 45... | +-+ must be registered as a table. Can be cached. Scala > mysqDF.registerTempTable ("customer") warning: there was one deprecation warning Re-run with-deprecation for details identifies that this table can be cached, but now the data is not directly cached scala > spark.sqlContext.cacheTable ("customer") the first time to query the table to read data from mysql And cache it in memory scala > spark.sql ("select * from customer") .show+---+ | id | name | last_mod | +-- + | 1 | neil | 2019-07-20 17select 09select. | | 2 | jack | 2019-07-20 17lv 09virtual. | | 3 | martin | 2019-07-20 17 eric 09eric... | 4 | 4 | tony | 2019-07-20 17V 09V... | 5 | eric | 2019-07-20 17 V 09V... | | 6 | king | 2019-07-20 17V 42V... | 7 | tao | 2019-07-20 17V V 45V... | +-+ this query returned scala > spark.sql from memory. ("select * from customer"). Show+---+ | id | name | last_mod | +-+ | 1 | neil | 2019-07-20 17show+---+ 09bureau. | | 2 | jack | 2019-07-20 17pura 09displacement... | | 3 | martin | 2019-07-20 17:09. | | 4 | tony | 2019-07-20 17king 09king. | 5 | 5 | eric | 2019-07-20 17king. | | 6 | king | 2019-07-20 17lv 42king. | 7 | tao | 2019-07-20 17415... scala | +-+ clear cache scala > spark.sqlContext.clearCache3.2 tuning related parameters to cache data to memory. The relevant optimization parameter spark.sql.inMemoryColumnarStorage.compressed defaults to true Spark SQL, which automatically selects a compression encoding for each column based on statistics. spark.sql.inMemoryColumnarStorage.batchSize default value: 10000 cache batch size. When caching data, a larger batch size can improve memory utilization and compression, but it also carries the risk of OOM (Out Of Memory). Other performance-related configuration options (although manual modifications are not recommended and may be automatically adapted in subsequent versions) spark.sql.files.maxPartitionBytes default value: 128 MB maximum number of bytes a single partition can hold when reading files default: 4m estimated cost of opening files, measured by the number of bytes that can be scanned at the same time. Used when writing multiple files to a partition. Overestimating is better, so that small file partitions will be faster than large file partitions (scheduled first). Spark.sql.autoBroadcastJoinThreshold default: 10m is used to configure the maximum byte size that a table can broadcast to all worker nodes when performing join operations. You can disable broadcasting by setting this value to-1. Note that the current statistics only support Hive Metastore tables that have already run the ANALYZE TABLE COMPUTE STATISTICS noscan command. Spark.sql.shuffle.partitions default: 200 is the number of partitions used to configure join or aggregation operation shuffle data.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.