In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Spark component Spark SQL example analysis, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can gain something.
Spark SQL is a Spark component used to deal with structured data, its predecessor is shark, but shark relies too much on hive, such as hive syntax parser, query optimizer and so on, which restricts the integration of Spark components, so Spark SQL arises at the historic moment. On the basis of absorbing many advantages of shark, such as memory column storage and compatibility with hive, Spark SQL has been reconstructed, so it also gets rid of the dependence on hive, but is compatible with hive at the same time. In addition to optimizing the performance of memory column storage, bytecode generation technology, CBO and RBO are introduced to dynamically evaluate the query to obtain the optimal logical plan, physical plan execution and so on. Based on these optimizations, the performance of Spark SQL has been effectively improved compared with the original SQL on Hadoop technology. At the same time, Spark SQL supports a variety of data sources, such as JDBC, HDFS, HBase. Its internal components, such as SQL syntax parser and parser, support redefinition and extension, which can better meet different business scenarios. Seamless integration with Spark Core provides a programmable abstract data model for DataSet/DataFrame and can be regarded as a distributed SQL query engine.
DataSet/DataFrameDataSet/DataFrame is a distributed data set provided by Spark SQL. Compared with RDD, it records not only the data but also the schema information of the table.
DataSet is a distributed dataset provided since Spark1.6, with RDD features such as strong typing, the use of powerful lambda expressions, and the use of Spark SQL's optimization execution engine. DataSet API supports Scala and Java languages, not Python. But given the dynamic nature of Python, it can still benefit from DataSet API (for example, you can get this field row.columnName from Row through a column name), as well as the R language.
DataFrame is a distributed dataset organized by DataSet in named columns, similar to tables in RDBMS, or data frame in R and Python. DataFrame API supports Scala, Java, Python, R. In Scala API, DataFrame becomes a Dataset of type Row:
Type DataFrame = Dataset [Row].
DataFrame does not check the type of fields in the data at compile time, but at run time. DataSet, by contrast, is strongly typed. In addition, both use catalyst for sql parsing and optimization. For convenience, the following unified use of the general name DataSet.
DataSet creation
DataSet is usually created by loading external data or by RDD transformation. 1. Load external data
Take loading json and mysql as an example:
Val ds = sparkSession.read.json ("/ path / people.json")
Val ds = sparkSession.read.format ("jdbc")
.options (Map ("url"-> "jdbc:mysql://ip:port/db")
"driver"-> "com.mysql.jdbc.Driver"
"dbtable"-> "tableName", "user"-> "root", "root"-> "123") .load ()
When 2.RDD is converted to DataSet and DataSet is created through RDD conversion, the key is to specify schema for RDD. There are usually two ways (pseudo code):
1. Define a case class that uses the reflection mechanism to infer
1) loading files from HDFS to normal RDD
Val lineRDD = sparkContext.textFile ("hdfs://ip:port/person.txt") .map (_ .split (""))
2) define case class (equivalent to the schema of a table)
Case class Person (id:Int, name:String, age:Int)
3) associate RDD with case class
Val personRDD = lineRDD.map (x = > Person (x (0) .toInt, x (1), x (2) .toInt))
4) convert RDD to DataFrame
Val ds= personRDD.toDF
two。 Define a schema StructType manually and specify it directly on the RDD
Val schemaString = "name age"
Val schema = StructType (schemaString.split ("") .map (fieldName = > StructField (fieldName, StringType, true)
Val rowRdd = peopleRdd.map (p = > Row (p (0), p (1)
Val ds = sparkSession.createDataFrame (rowRdd,schema) two styles of syntax for manipulating DataSet
DSL Syntax 1. Query the contents of DataSet partial columns personDS.select (col ("name")) personDS.select (col ("name"), col ("age")) 2. Query all name and age and salary, and add salary to 1000personDS.select (col ("name"), col ("age"), col ("salary") + 1000) personDS.select (personDS ("name"), personDS ("age"), personDS ("salary") + 1000) 3. Filter personDS.filter with age greater than 18 (col ("age") > 18) 4. Group by age and count the number of people of the same age personDS.groupBy ("age"). Count ()
Note: import org.apache.spark.sql.functions._ is required to use the col method directly
SQL syntax if you want to use SQL-style syntax, you need to register DataSet as table personDS.registerTempTable ("person")
/ / query the top two oldest
Val result = sparkSession.sql ("select * from person order by age desc limit 2") / / saves the result as a json file. Note: if you do not specify a storage format, the default storage is parquet
Several uses of result.write.format ("json"). Save ("hdfs://ip:port/res2") Spark SQL 1.sparksql-shell interactive query is to use the shell command line provided by Spark to execute SQL2. Programming
The first thing to do is to get the Spark SQL programming "entry": SparkSession (of course, in earlier versions, you may be more familiar with SQLContext, or HiveContext if you are manipulating hive). Here, take reading parquet as an example:
Val spark = SparkSession.builder ()
.appName ("example") .master ("local [*]") .getOrCreate (); val df = sparkSession.read.format ("parquet") .load ("/ path / parquet file") is then ready for business processing for df. The 3.Thriftserverbeeline client connection operation starts the thrift service and sbin/start-thriftserver.sh of spark-sql. The Spark cluster service resource, address and other information are configured in the startup script. Then connect to the thrift service through beeline for data processing. Hive-jdbc driver package to access spark-sql 's thrift service
Introducing relevant driver packages into the project pom file is similar to accessing jdbc data sources such as mysql. Example:
Class.forName ("org.apache.hive.jdbc.HiveDriver")
Val conn = DriverManager.getConnection ("jdbc:hive2://ip:port", "root", "123")
Try {
Val stat = conn.createStatement ()
Val res = stat.executeQuery ("select * from people limit 1")
While (res.next ()) {
Println (res.getString ("name"))
}
} catch {
Case e: Exception = > e.printStackTrace ()
} finally {
If (connexual null) conn.close ()
}
Spark SQL acquires Hive data
The key for Spark SQL to read hive data is to expose hive metadata to Spark as a service. In addition to connecting to hive through the thriftserver jdbc above, it can also be done in the following ways:
First, configure $HIVE_HOME/conf/hive-site.xml by adding the following:
Hive.metastore.uris
Thrift://ip:port
Then, start hive metastore
Finally, copy or softlink the hive-site.xml to $SPARK_HOME/conf/. If hive's metadata is stored in mysql, you need to put mysql's connection-driven jar package, such as mysql-connector-java-5.1.12.jar, under $SPARK_HOME/lib/, and start spark-sql to manipulate the libraries and tables in hive. In this case, the way to obtain SparkSession using hive metadata is:
Val spark = SparkSession.builder ()
.config (sparkConf) .enableHiveSupport () .getOrCreate () UDF, UDAF, AggregatorUDFUDF are the most basic user-defined functions. Take customizing a udf for string length as an example:
Val udf_str_length = udf {(str:String) = > str.length}
Spark.udf.register ("str_length", udf_str_length)
Val ds = sparkSession.read.json ("path / people.json")
Ds.createOrReplaceTempView ("people")
SparkSession.sql ("select str_length (address) from people") UDAF
To define UDAF, you need to inherit the abstract class UserDefinedAggregateFunction, which is weakly typed, and the following aggregator is strongly typed. Take the average as an example:
Import org.apache.spark.sql. {Row, SparkSession}
Import org.apache.spark.sql.expressions.MutableAggregationBuffer
Import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
Import org.apache.spark.sql.types._
Object MyAverage extends UserDefinedAggregateFunction {
/ / Data types of input arguments of this aggregate function
Def inputSchema: StructType = StructType (StructField ("inputColumn", LongType):: Nil)
/ / Data types of values in the aggregation buffer
Def bufferSchema: StructType = {
StructType (StructField ("sum", LongType):: StructField ("count", LongType):: Nil
}
/ / The data type of the returned value
Def dataType: DataType = DoubleType
/ / Whether this function always returns the same output on the identical input
Def deterministic: Boolean = true
/ / Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
/ / standard methods like retrieving a value at an index (e.g., get (), getBoolean ()), provides
/ / the opportunity to update its values. Note that arrays and maps inside the buffer are still
/ / immutable.
Def initialize (buffer: MutableAggregationBuffer): Unit = {
Buffer (0) = 0L
Buffer (1) = 0L
}
/ / Updates the given aggregation buffer `buffer`with new input data from `input`
Def update (buffer: MutableAggregationBuffer, input: Row): Unit = {
If (! input.isNullAt (0)) {
Buffer (0) = buffer.getLong (0) + input.getLong (0)
Buffer (1) = buffer.getLong (1) + 1
}
}
/ / Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
Def merge (buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
Buffer1 (0) = buffer1.getLong (0) + buffer2.getLong (0)
Buffer1 (1) = buffer1.getLong (1) + buffer2.getLong (1)
}
/ / Calculates the final result
Def evaluate (buffer: Row): Double = buffer.getLong (0). ToDouble / buffer.getLong (1)
}
/ / Register the function to access it
Spark.udf.register ("myAverage", MyAverage)
Val df = spark.read.json ("examples/src/main/resources/employees.json")
Df.createOrReplaceTempView ("employees")
Df.show ()
Val result = spark.sql ("SELECT myAverage (salary) as average_salary FROM employees")
Result.show ()
Aggregator
Import org.apache.spark.sql. {Encoder, Encoders, SparkSession}
Import org.apache.spark.sql.expressions.Aggregator
Case class Employee (name: String, salary: Long)
Case class Average (var sum: Long, var count: Long)
Object MyAverage extends Aggregator [Employee, Average, Double] {
/ / A zero value for this aggregation. Should satisfy the property that any b + zero = b
Def zero: Average = Average (0L, 0L)
/ / Combine two values to produce a new value. For performance, the function may modify `buffer`
/ / and return it instead of constructing a new object
Def reduce (buffer: Average, employee: Employee): Average = {
Buffer.sum + = employee.salary
Buffer.count + = 1
Buffer
}
/ / Merge two intermediate values
Def merge (b1: Average, b2: Average): Average = {
B1.sum + = b2.sum
B1.count + = b2.count
B1
}
/ / Transform the output of the reduction
Def finish (reduction: Average): Double = reduction.sum.toDouble / reduction.count
/ / Specifies the Encoder for the intermediate value type
Def bufferEncoder: Encoder [Average] = Encoders.product
/ / Specifies the Encoder for the final output value type
Def outputEncoder: Encoder [Double] = Encoders.scalaDouble
}
Val ds = spark.read.json ("examples/src/main/resources/employees.json"). As [Employee]
Ds.show ()
/ / Convert the function to a `TypedColumn` and give it a name
Val averageSalary = MyAverage.toColumn.name ("average_salary")
Val result = ds.select (averageSalary)
Comparison between result.show () Spark SQL and Hive
Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.