In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
= = > what is Spark SQL?
-> Spark SQL is a module used by Spark to process structured data
Function: provides a programming abstraction (DataFrame) and acts as a distributed SQL query engine
-> working principle: convert Spark SQL to RDD, and then submit it to the cluster for execution
-- > Features:
-easy to integrate
-uniform data access
-compatible with Hive
-Standard data connection
= = > SparkSession
-> Features: (2.0 quotes SparkSession)
-provide users with a unified entry point to use the functions of Spark
-allows users to use it to call DataFrame and Dataset related API to write programs
-reduces some concepts that users need to know, and can easily interact with Spark
-create SparkConf, SparkContext and SQlContext without displaying when interacting with Spark. These objects are enclosed in SparkSession.
= = > DataFrames organizes datasets into named columns, which are equivalent to tables in the database
-- > compared with RDD:
-RDD is a collection of distributed Java objects
-DataFrame is a collection of distributed Row objects
-> create DataFrames
-create DataFrames through case class
/ / define case class (equivalent to table structure) case class Emp (Empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int) / / read the data on HDFS into RDD And associate RDD with case class val lines = sc.textFile ("hdfs://bigdata0:9000/input/emp.csv"). Map (_ .split (",") val emp = lines.map (x = > Emp (x (0). ToInt, x (1), x (2), x (3), x (4), x (5). ToInt, x (6), x (7) .toInt) `/ / convert RDD to DataFramesval empDF = emp.toDF// to query data empDF.show through DataFrames
-create DataFrames through SparkSession
/ / create a StructType to define the structure, note You need to import the module import org.apache.spark.sql.types._val myschema = StructType (List ("empno", DataTypes.IntegerType), StructField ("ename", DataTypes.StringType), StructField ("job", DataTypes.StringType), StructField ("mgr", DataTypes.StringType), StructField ("hiredate", DataTypes.StringType) StructField ("sal", DataTypes.IntegerType), StructField ("comm", DataTypes.StringType), StructField ("deptno", DataTypes.IntegerType)) / / read the data and split the data val empcsvRDD = sc.textFile ("hdfs://bigdata0:9000/input/emp.csv"). Map (_ .split (" ")) / / Map RDD data to Row Need import org.apache.spark.sql.Rowimport org.apache.spark.sql.Rowval rowRDD = empcsvRDD.map (line= > Row (line (0). ToInt, line (1), line (2), line (3), line (4), line (5). ToInt, line (6), line (7) .toInt) / / create DataFramesval df = spark.createDataFrame (rowRDD, myschema) / / View table df.show
-use Json files to create DataFrame
Val df = spark.read.json ("Json file") / / View data df.show
-> DataFrame operation DataFrame operation is also called untyped Dataset operation
-query the names of all employees
Df.select ("ename"). Show
-check the names and salaries of all employees, and add 100 yuan to the salary
Df.select ($"ename", $"sal", $"sal" + 100) .show
-inquire about employees whose salary is more than 2000
Df.select ($"sal" > 2000). Show
-ask for the number of employees in each department
Df.groupBy ($"deptno") .count.show
-use SQL statement in DataFrame Note: you need to register DataFrame as a table (view) first
Df.createOrReplaceTempView ("emp") / / execute query spark.sql ("select * from emp") .show
-- > temporary views (2):
-df.createOrReplaceTempView ("emp1") is only valid in the current session
-globally valid df.createGlobalTempView ("emp2")
= = > Datasets
-- > distributed collection of data
-- > Features:
-the new interface added in Spark1.6 is a higher level of abstraction above DataFrame
-provides the advantages of RDD (strongly typed, ability to use lambda functions)
-Spark SQL optimized execution engine
-can be constructed from a JVM object and then manipulated using function transformations (map, flatMap, filter, etc.)
-support Scala and Java, not Python
-> create DataSet
-use sequence
/ / define case classcase class MyData (a:String, b:String) / / generate the sequence and create DataSetval ds = Seq (MyData (1, "Tom"), MyData (2, "Mary")) .toDS// to view the result ds.show
-use Json data
/ / define case class case class Person (name:String, gender:String) / / generate DataFrameval df = spark.read.json from Json data (sc.parallelize ("" {"gender": "Male", "name": "Tom"} ":: Nil)) / / convert DataFrame to DataSetdf.as [person]. Showdf.as [person].
-execute the WordCount program by using DHFS
/ / read HDFS data and create DataSetval linesDS = spark.read.text ("hdfs://bigdata0:9000/input/data.txt"). As [string] / / A pair of DataSet operations: after word segmentation Query length greater than 3 words val words = linesDS.flatMap (_ .split (")). Filter (_ .length > 3) / / View the result words.showwords.collect// executes the wordcount program val result = linesDS.flatMap (_ .split (") .map ((_ .1)) .groupByKey (x = > x.split (") .count) result.show// sort result.orderBy ($" split ") .show
= = > Datasets operation
The difference between joinWith and join is that the schema of the connected new Dataset will be different.
/ / use emp.json to generate DataFrameval empDF = spark.read.json ("/ root/resources/emp.json") / / query employees whose salary is greater than 3000 empDF.where ($"sal" > 3000). Show// create case classcase class Emp (empno:Lone, ename:String, job:String, hiredate:String, mgr:String, sal:Long, comm:String, deptno:Long) / / generate DataSets And query data val empDS = empDF.as [Emp] / / query employees whose salary is greater than 3000 empDS.filter (_ .sal > 3000). Show// to view the employee empDS.filter of department 10 (_ .deptno = = 10) / / multi-table query / / 1. Create the department table val deptRDD = sc.textFile ("/ test/dept.csv"). Map (_ .split (",") case class Dept (deptno:Int, dname:String, loc:String) val deptDS = deptRDD.map (x = > Dept (x (0). ToInt, x (1), x (2)). ToDS// 2. Create the employee table case class Emp (empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int) val empRDD = sc.textFile ("/ test/emp.csv"). Map (_ split (",") val empDS = empRDD.map (x = > Emp (x (0). ToInt, x (1), x (2), x (3), x (4), x (5). ToInt, x (6), x (7) .toInt) / / 3. Execute multi-table query: equivalent link val result = deptDF.join (empDS, "deptno") / / another way of writing: note that there are three equal signs val result = deptDS.joinWith (empDS, deptDS ("deptno") = = empDS ("deptno")) / / View the execution plan result.explain
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.