Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark SQL Note arrangement (3): load and save function and Spark SQL function

2025-04-11 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

[TOC]

Load and save function data loading (json file, jdbc) and saving (json, jdbc)

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.sql.p1import java.util.Propertiesimport org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf, SparkContext} import org.apache.spark.sql. {SQLContext Various practical operations of SaveMode} / * SparkSQL on loading data and data landing * / object _ 03SparkSQLLoadAndSaveOps {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext (conf) val SqlContext = new SQLContext (sc) / / readOps (sqlContext) writeOps (sqlContext) sc.stop ()} / * pay attention to the relevant exceptions when the write results are in the directory * org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists * if you still want to use the directory You need to set a specific save mode SaveMode * ErrorIfExist * default, directory exists, throw exception * Append * append * Ingore * ignore Equivalent to not performing * Overwrite * override * / def writeOps (sqlContext:SQLContext): Unit = {val df = sqlContext.read.json ("D:/data/spark/sql/people.json") df.registerTempTable ("people") val retDF = sqlContext.sql ("select * from people where age > 20") / / retDF.show () / / end the knot If landing / / retDF.coalesce (1) .write.mode (SaveMode.Overwrite) .json ("D:/data/spark/sql/people-1.json") / / landing to the database val url = "jdbc:mysql://localhost:3306/test" val table = "people1" / / will recreate a new table val properties = new Properties () properties.put ("user") "root") properties.put ("password", "root") retDF.coalesce (1) .write.JDBC (url, table) Properties)} / * / / sparkSQL read data / / java.lang.RuntimeException: what if file:/D:/data/spark/sql/people.json is not a Parquet file sparkSQL uses the default file format loaded by read.load to load other file formats for parquet (parquet.apache.org)? You need to specify the format of the loaded file .format ("json") * / def readOps (sqlContext:SQLContext): Unit = {/ / val df = sqlContext.read.load ("D:/data/spark/sql/users.parquet") / / val df = sqlContext.read.format ("json"). Load ("D:/data/spark/sql/people.json") / / Val df = sqlContext.read.json ("D:/data/spark/sql/people.json") val url = "jdbc:mysql://localhost:3306/test" val table = "people" val properties = new Properties () properties.put ("user" "root") properties.put ("password", "root") val df = sqlContext.read.jdbc (url, table, properties) df.show ()}

When performing a read operation, the output is as follows:

+-id | name | age | height | +

When performing a write operation:

1. If you save to the json file, note that there are various write modes, in addition, it saves a directory, which is compatible with HDFS directory format 2. If you save to jdbc, a table containing columns in DataFrame will be created in the database. Note that there can be no integration of Spark SQL and Hive in this table.

You need to start Hive before doing the following.

Code writing

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.sql.p2import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOpsimport org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext/** * manipulate the data of tables in Hive by creating HiveContext * data source: * teacher_info.txt * name (String) height (double) * zhangsan,175 * lisi * wangwu,175 * zhaoliu,195 * zhouqi,165 * weiba,185 * * create table teacher_info (* name string, * height double *) row format delimited * fields terminated by',' * * teacher_basic.txt * name (String) age (int) married (boolean) children (int) * zhangsan,23,false,0 * lisi,24,false,0 * wangwu,25,false,0 * zhaoliu,26,true,1 * zhouqi,27,true,2 * weiba,28,true,3 * * create table teacher_basic (* name string, * age int * married boolean, * children int *) row format delimited * fields terminated by',' * demand: * 1. Create the corresponding table in hive through sparkSQL and load the data into the corresponding table * 2. Execute the sparkSQL job, calculate the associated information between teacher_info and teacher_basic, and store the results in a table teacher * * when performing hive operations in the cluster, the following configuration is required: * 1. Copy the hive-site.xml to the spark/conf directory Copy mysql connector to the spark/lib directory 2. Add a record export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar * / object _ 01HiveContextOps {def main (args: Array [String]) to $SPARK_HOME/conf/spark-env.sh: Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf () / / .setMaster ("local [ 2] ") .setAppName (_ 01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext (conf) val hiveContext = new HiveContext (sc) / / create teacher_info table hiveContext.sql (" CREATE TABLE teacher_info ("+" name string) "+" height double) "+" ROW FORMAT DELIMITED "+" FIELDS TERMINATED BY','") hiveContext.sql (" CREATE TABLE teacher_basic ("+" name string, "" + "age int," + "married boolean) "+" children int) "+" ROW FORMAT DELIMITED "+" FIELDS TERMINATED BY' '") / / load data hiveContext.sql (" LOAD DATA LOCAL INPATH' / home/uplooking/data/hive/sql/teacher_info.txt' INTO TABLE teacher_info ") hiveContext.sql (" LOAD DATA LOCAL INPATH'/ home/uplooking/data/hive/sql/teacher_basic.txt' INTO TABLE teacher_basic ") / / the second step calculates the associated data val of the two tables JoinDF = hiveContext.sql ("SELECT" + "b.name" "+" b.age, "+" if (b.married, 'married', 'unmarried') as married, "+" b.children "+" i.height "+" FROM teacher_info I "+" INNER JOIN teacher_basic b ON i.name = b.name ") joinDF.collect () .foreach (println) joinDF.write.saveAsTable (" teacher ") sc.stop ()} Packaging, upload and configuration

After being packaged, upload it to the cluster environment, and then make the following configuration for Spark:

When performing the hive operation in the cluster, the following configurations are required: 1. Copy the hive-site.xml to the spark/conf directory, copy the mysql connector to the spark/lib directory 2, add a record export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar to the $SPARK_HOME/conf/spark-env.sh and submit the spark job

The script for submitting the job using spark is as follows:

[uplooking@uplooking01 spark] $cat spark-submit-standalone.sh # export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop/home/uplooking/app/spark/bin/spark-submit\-- class $2\-master spark://uplooking02:7077\-executor-memory 1G\-- num-executors 1\ $1\

Execute the following command:

. / spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps verification

You can see the desired output in the output of the job execution, or you can verify it by operating directly in Hive:

Hive > show tables;OKhpeoplepeoplet1teacherteacher_basicteacher_infoTime taken: 0.03seconds, Fetched: 6 row (s) hive > select * from teacher OKzhangsan 23 unmarried 0 175.0lisi 24 unmarried 0 180.0wangwu 25 unmarried 0 175.0zhaoliu 26 married 1 195.0zhouqi 27 married 2 165.0weiba 28 married 3 185.0Time taken: 0.369 seconds, Fetched: 6 row (s) Spark and ES integration

You need to make sure that the ElasticSearch environment is in place.

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.sql.p2import org.apache.log4j. {Level, Logger} import org.apache.spark.sql.SQLContextimport org.apache.spark. {SparkConf Integration of SparkContext} import org.elasticsearch.spark.sql._import org.elasticsearch.spark._/** * Spark and ES * introduce maven dependency of Spark and es * elasticsearch-hadoop * 2.3.0 * load account.json into es's index library spark/account * refer to the official document: https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark .html * / object _ 02SparkElasticSearchOps {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf () .setAppName (_ 02SparkElasticSearchOps.getClass () .setAppName) .setMaster ("local [2]") / * * Spark and Integrated configuration of es * / conf.set ("es.index.auto.create" "true") conf.set ("es.nodes", "uplooking01") conf.set ("es.port" "9200") val sc = new SparkContext (conf) val sqlContext = new SQLContext (sc) / / write2ES (sqlContext) readFromES (sc) sc.stop ()} / * read data from es * (operate using sparkContext) * / def readFromES (sc:SparkContext): Unit = {val resources = "spark/account" / / Index library / type val jsonRDD = sc.esJsonRDD (resources) jsonRDD.foreach (println)} / * write data to es * (operate using sqlContext) * / def write2ES (sqlContext:SQLContext): Unit = {val jsonDF = sqlContext.read.json ("D:/data/spark/sql/account.json") val resources = " Spark/account "/ / Index Library / Type jsonDF.saveToEs (resources)}} Overview of Spark SQL functions (built-in functions for Spark 1.5.x ~ 1.6.x)

Use the built-in function in Spark SQL to analyze the data. The difference in Spark SQL API is that the result of the built-in function operation in DataFrame is to return a Column object, and DataFrame is born "A distributed collection of data organized into named columns." this establishes a solid foundation for complex analysis of data and provides great convenience. For example, we can call built-in functions to process business needs at any time in the method of operating DataFrame. This can greatly reduce unnecessary time consumption for our business logic of building attachments (based on the mapping of the actual model above). Let's focus on data analysis, which is very valuable for improving the productivity of engineers. Spark 1.5.x begins to provide a large number of built-in functions, as well as max, mean, min, sum, avg, explode, size, sort_array, day, to_date, abs, acos, asin, atan.

In general, built-in functions contain five basic types:

1. Aggregate functions, such as countDistinct, sumDistinct, etc. 2. Aggregate functions, such as sort_array, explode, etc. 3, date and time functions, such as hour, quarter, next_day4, mathematical functions, such as asin, atan, sqrt, tan, round, etc. 5. Windowing functions, such as rowNumber, 6, string functions, concat, format_number, rexexp_extract7, other functions, isNaN, sha, randn, callUDF are the knowledge contents of Hive, but it is obvious that Spark SQL also has the same concept UDF user-defined function: User Definded Function input all the way, output AUV-> A strlen ("adbad") = 5UDAF user-defined aggregate function: User Definded Aggregation Function multiple input One output sum (a, b, c, d)-> summary result table function UDTF: user-defined table function: User Definded Table Function multi-input, multi-output "hello you"hello me"-> conversion operation -> split (")-> Array [] [" hello, "you"]-> "hello"you"-> column conversion

A basic case study is as follows:

Package cn.xpleaf.bigdata.spark.scala.sql.p2import org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf SparkContext} import org.apache.spark.sql.SQLContext/** * SparkSQL built-in function operation * / object _ 03SparkSQLFunctionOps {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf () .setAppName (_ 03SparkSQLFunctionOps.getClass () .getSimpleName) .setMaster ("local [2]") val Sc = new SparkContext (conf) val sqlContext = new SQLContext (sc) val pdf = sqlContext.read.json ("D:/data/spark/sql/people.json") pdf.show () pdf.registerTempTable ("people") / / Statistics sqlContext.sql ("select count (1) from people"). Show () / / minimum age sqlContext.sql ("select age") "+" max (age) as max_age, "+" min (age) as min_age, "+" avg (age) as avg_age, "+" count (age) as count "+" from people group by age order by age desc ". Show () sc.stop ()}}

The output is as follows:

+-- + | age | height | name | +-- + | 10 | 168.8 | Michael | | 30 | 168.8 | Andy | 19 | 169.8 | Justin | 32 | 188.8 | Jack | | 10 | 158.8 | John | 19 | 179.8 | Domu | 13 | 179.8 | Yuan Shuai | | 30 | 175.8 | Yin Jie | 19 | 179.9 | Sun Rui | +-- + 18 | / 05Accord09 17:53:23 INFO FileInputFormat: Total input paths to process: 1copyright Musashi + | _ c0 | +-- + | 9 | +-+ 18-05-09 17:53:24 INFO FileInputFormat: Total input paths to process: 1roomMelay + | age | max_age | min_age | avg_age | count | + +-+ | 32 | 32 | 32 | 32.0 | 1 | | 30 | 30 | 30 | 30.0 | 2 | 19 | 19 | 19 | 19.0 | 3 | | 13 | 13 | 13.0 | 1 | | 10 | 10 | 10.0 | 2 | +-- + Spark SQL windowing function

1. After version 1.5.x of Spark, windowing functions have been introduced into Spark SQL and DataFrame, such as our row_number (), which allows us to implement the logic of grouping and fetching topn.

2. Do a case to take the value of topn (using the windowing function of Spark). I don't know if the students still have an impression. We have done the calculation of topn in the earliest time, which was very troublesome at that time. But now with Spark SQL, it's very convenient.

UDF operation of Spark SQL

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.sql.p2import org.apache.log4j. {Level, Logger} import org.apache.spark.sql.types. {DataTypes, StructField, StructType} import org.apache.spark.sql. {Row, SQLContext} import org.apache.spark. {SparkConf SparkContext} / * SparkSQL built-in function operation * / object _ 04SparkSQLFunctionOps {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf (). SetAppName (_ 04SparkSQLFunctionOps.getClass () .getSimpleName) .setMaster ("local [2]") val sc = new SparkContext (conf) User-defined function UDF operation in val sqlContext = new SQLContext (sc) / * hive (that is, it is operated by analogy to hive in SparkSQL Because both hive and SparkSQL are interactive calculations) * 1. Create a normal function * 2. Register (register in SqlContext) * 3. Just use it directly * * case: create a udf * / 1 that gets the length of the string. Create a common function def strLen (str:String): Int = str.length / / 2. Register (register with SqlContext) sqlContext.udf.register [Int, String] ("myStrLen", strLen) val list = List ("Hello you", "Hello he") "Hello me") / / convert RDD to DataFrameval rowRDD = sqlContext.sparkContext.parallelize (list). FlatMap (_ .split (")) .map (word = > {Row (word)}) val scheme = StructType (List (" word ", DataTypes.StringType, false)) val df = sqlContext.createDataFrame (rowRDD) Scheme) df.registerTempTable ("test") / / 3. You can use sqlContext.sql ("select word, myStrLen (word) from test") directly. Show () sc.stop ()}}

The output is as follows:

+-Hello | 5 | you | 3 | Hello | 5 | he | 2 | Hello | 5 | me | 2 | +-+ Spark SQL wordcount operation

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.sql.p2import org.apache.log4j. {Level, Logger} import org.apache.spark.sql.types. {DataTypes, StructField, StructType} import org.apache.spark. {SparkConf, SparkContext} import org.apache.spark.sql. {Row, SQLContext} / * these two parts are more important: * 1. Use SparkSQL to complete word statistics * 2. The windowing function uses * / object _ 05SparkSQLFunctionOps2 {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf () .setAppName (_ 05SparkSQLFunctionOps2.getClass () .getSimpleName) .setMaster ("local [2]") val sc = new SparkContext (conf) val sqlContext = new SQLContext (sc) ) val list = List ("Hello you" "Hello he", "Hello me") / / convert RDD to DataFrameval rowRDD = sqlContext.sparkContext.parallelize (list) .map (line = > {Row (line)}) val scheme = StructType (List ("line", DataTypes.StringType, false)) val df = sqlContext.createDataFrame (rowRDD Scheme) df.registerTempTable ("test") df.show () / execute wordcount val sql = "select t.word, count (1) as count" + "from" + "(select" + "explode (split (line) ) as word "+" from test) as t "+" group by t.word order by count desc "sqlContext.sql (sql) .show () sc.stop ()}}

The output is as follows:

+-+ | line | +-+ | Hello you | | Hello he | | Hello me | +-+-+ | word | count | +-+-+ | Hello | 3 | me | 1 | he | 1 | you | 1 | +-+-+

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report