In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
[TOC]
Principle and Analysis of DataFrame Spark SQL and DataFrame
1. Spark SQL is a module in Spark, which is mainly used to process structured data. The core programming abstraction it provides is DataFrame. At the same time, Spark SQL can be used as a distributed SQL query engine. One of the most important functions of Spark SQL is to query data from Hive.
2 、 DataFrame
In terms of ease of use, it is not too much to say that Spark's RDD API has made a quantitative leap compared to traditional MapReduce API. However, there are still some barriers to RDD API for beginners who have no experience in MapReduce and functional programming. On the other hand, although the traditional data frameworks such as R and Pandas, which are familiar to data scientists, provide intuitive API, they are limited to stand-alone processing and are not suitable for big data scenarios. In order to solve this contradiction, Spark SQL provides DataFrame API similar to R and Pandas style on the basis of the original SchemaRDD. The new DataFrame API can not only greatly reduce the learning threshold for ordinary developers, but also support Scala, Java and Python. More importantly, because it is born out of SchemaRDD,DataFrame, it is naturally suitable for distributed big data scenarios.
Analysis of the principle of DataFrame
In Spark, DataFrame is a distributed data set based on RDD, similar to two-dimensional tables in traditional databases. The main difference between DataFrame and RDD is that the former has schema meta-information, that is, each column of the two-dimensional table dataset represented by DataFrame has a name and type. This enables Spark SQL to gain insight into more structural information, so as to optimize the data sources hidden behind DataFrame and the transformations acting on DataFrame, and finally achieve the goal of greatly improving runtime efficiency. In contrast, RDD, because there is no way to know the specific internal structure of the stored data elements, Spark Core can only do simple and general pipeline optimization at the stage level.
DataFrame basic Operation case Spark SQLContext
To use Spark SQL, you first have to create an object that creates a SQLContext object, or an object that is a subclass of it, such as an HiveContext object.
Java version:
JavaSparkContext sc =...; SQLContext sqlContext = new SQLContext (sc)
Scala version:
Val sc: SparkContext =... Val sqlContext = new SQLContext (sc) import sqlContext.implicits._Spark HiveContext
1. In addition to the basic SQLContext, you can also use its subclass, HiveContext. In addition to all the functions provided by SQLContext, the functions of HiveContext also include additional features specific to Hive. These additional features include writing and executing SQL using HiveQL syntax, using the UDF function in Hive, and reading data from the Hive table.
2. To use HiveContext, data sources supported by Hive,SQLContext must be pre-installed, and HiveContext also supports-- not just Hive. For versions above 1.3.x of Spark, HiveContext is recommended because of its richer and more complete features.
3. Spark SQL also supports setting the dialect of SQL with the spark.sql.dialect parameter. You can set it using setConf () of SQLContext. For SQLContext, it supports only one dialect, "sql". For HiveContext, its default dialect is "hiveql".
Create DataFrame
Using SQLContext or HiveContext, you can create a DataFrame from RDD, Hive, ZMQ, Kafka, RabbitMQ, etc., or other data sources. Let's use the JSON file as an example to create a DataFrame.
Java version:
JavaSparkContext sc = new JavaSparkContext (); SQLContext sqlContext = new SQLContext (sc); DataFrame df = sqlContext.read (). Json ("hdfs://ns1/spark/sql/person.json"); df.show ()
Scala version:
Val sc: SparkContext = new SparkContext (); val sqlContext = new SQLContext (sc) val df = sqlContext.read.json ("hdfs://ns1/spark/sql/person.json") df.show () case
The json data are as follows:
{"name": "Michael", "age": 10, "height": 168.8} {"name": "Andy", "age": 30, "height": 168.8} {"name": "Justin", "age": 19, "height": 169.8} {"name": "Jack", "age": 32, "height": 188.8} {"name": "John", "age": 10 "height": 158.8} {"name": "Domu", "age": 19, "height": 179.8} {"name": "Yuan Shuai", "age": 13, "height": 179.8} {"name": "Yin Jie", "age": 30, "height": 175.8} {"name": "Sun Rui", "age": 19, "height": 179.9}
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.sql.p1import org.apache.log4j. {Level, Logger} import org.apache.spark.sql. {Column, DataFrame, SQLContext} import org.apache.spark. {SparkConf, SparkContext} / * * SparkSQL basic Operation Learning * the core of operation SparkSQL is that DataFrame,DataFrame has a two-dimensional table in memory. Include metadata information and table data * / object _ 01SparkSQLOps {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext (conf) val sqlContext = new SQLContext (sc) val df:DataFrame = sqlContext.read.json ("D:/data/spark/sql/people.json") / / 1. Print all records in DF println ("1. Print all records in DF") df.show () / / default output table data operation, equivalent to select * from t limit 20 / / 2 in db. Print out all the schema information in DF println ("2. Print all schema information in DF") df.printSchema () / / 3. Query the columns of name and print out select name from t / / df.select ("name"). Show () println ("3. Query the columns of name and print out") df.select (new Column ("name")). Show () / / 4. Filter and print out println ("4. Filter and print people over 14") df.select (new Column ("name"), new Column ("age"). Where ("age > 14"). Show () / / 5. Add 10 years println to everyone's age ("5. Add 10 years to everyone's age") df.select (new Column ("name"), new Column ("age"). + (10). As ("age in 10 years"). Show () / / 6. Grouped according to height println ("6. Grouped according to height ") / / select height, count (1) from t group by height Df.select (new Column ("height")) .groupBy (new Column ("height")) .count () .show () / registry df.registerTempTable ("people") / / execute sql operation var sql = "select height, count (1) from people group by height" sqlContext.sql (sql). Show () sc.stop ()}}
The output is as follows:
1. Print all the records in DF at 16:06:09 on 18-05-08 INFO FileInputFormat: Total input paths to process: 1 Mr. Murray + | age | height | name | +-- + | 10 | 168.8 | Michael | | 30 | 168.8 | Andy | 19 | 169.8 | Justin | 32 | 188.8 | Jack | 10 | 158.8 | John | 19 | 179.8 | Domu | 13 | 179.8 | Yuan Shuai | 30 | 175.8 | Yin Jie | | 19 | 179.9 | Sun Rui | +-- + 2. Print out all the schema information in DF root |-age: long (nullable = true) |-- height: double (nullable = true) |-- name: string (nullable = true) 3. Query out the columns of name and print them out at 16:06:10 on 18-05-08 INFO FileInputFormat: Total input paths to process: 1conversation + | name | +-+ | Michael | | Andy | | Justin | | Jack | | John | | Domu | | Yuan Shuai | | Yin Jie | Sun Rui | +-+ 4. Filter and print out people over 14 years old at 16:06:10 on 18-05-08 INFO FileInputFormat: Total input paths to process: 1 people over 14 years old | name | age | +-+-- + | Andy | 30 | Justin | 19 | Jack | 32 | Domu | 19 | Yin Jie | 30 | Sun Rui | 19 | +-+ 5. Add 10 years to everyone's age at 16:06:10 on 18-05-08 INFO FileInputFormat: Total input paths to process: 1 "INFO FileInputFormat +" | name | 10 years later | +-+-+ | Michael | 20 | Andy | 40 | Justin | 29 | Jack | 42 | John | 20 | Domu | 29 | Yuan Shuai | 23 | Yin Jie | 40 | Sun Rui | 29 | +-- + 6. Grouped by height at 16:06:11 on 18-05-08 INFO FileInputFormat: Total input paths to process: 1 INFO FileInputFormat + | height | count | +-+-+ | 179.9 | 1 | 188.8 | 1 | 158.8 | 1 | 179.8 | 2 | 169.8 | 168.8 | 2 | 175.8 | +-+-+ 16:06:14 on 18-05-08: Total input paths to process: 1 height | height | _ C1 | +-+-+ | 179.9 | 1 | 188.8 | 1 | 158.8 | 1 | 179.8 | 169.8 | 1 | 168.8 | 2 | 175.8 | 1 | +-+-- + conversion case and resolution between DataFrame and RDD (Java, Scala) related usage data
In the test code involved below, the source data sql-rdd-source.txt to be used is as follows:
1, zhangsan, 13, 1752, lisi, 14, 1803, wangwu, 15, 1754, zhaoliu, 16, 1955, zhouqi, 17, 1656, weiba, 18, 155
The Person class used is as follows:
Public class Person {private int id; private String name; private int age; private double height; public Person () {} public Person (int id, String name, int age, double height) {this.id = id; this.name = name; this.age = age; this.height = height;}} uses reflection mechanism to convert RDD to DataFrame
1. A question is right in front of everyone: why convert RDD to DataFrame?
The main thing is to be able to use Spark SQL for SQL queries. This function is extremely powerful.
2. The reflection mechanism is used to infer the metadata of RDD that contains a specific data type. This reflection-based approach, the code is relatively concise, know in advance the metadata of the POJO to be defined, when you already know the metadata of your RDD, it is a very good way.
Use reflection mechanism to infer metadata
1. Java version:
Spark SQL supports the conversion of RDD containing POJO to DataFrame. The information of POJO defines the metadata. Spark SQL currently does not support POJO that contains complex data such as nested POJO or List as metadata. Only one POJO containing field of simple data types is supported.
2. Scala version:
Scala has the feature of implicit conversion, so the Scala interface of Spark SQL supports the automatic conversion of RDD containing case class to DataFrame. Case class defines metadata. Spark SQL reads the name of the parameter passed to case class through reflection and then uses it as the column name.
Differences:
3. Unlike Java, Spark SQL supports case class that contains nested data structures as metadata, such as Array.
Scala version
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.sql.p1import java.utilimport java.util. {Arrays, List} import cn.xpleaf.bigdata.spark.java.sql.p1.Personimport org.apache.log4j. {Level, Logger} import org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf, SparkContext} import org.apache.spark.sql. {DataFrame, SQLContext} / * * conversion operation between SparkRDD and DataFrame * 1. Convert RDD to DataFrame * 2 by reflection. Convert RDD to DataFrame by dynamic programming * here is the first * / object _ 02SparkRDD2DataFrame {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 01SparkSQLOps.getClass.getSimpleName) / / serialization using kryo Conf.set ("spark.serializer" "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses (Array (conf) val sqlContext = new SQLContext (sc) val lines = sc.textFile ("D:/data/spark/sql/sql-rdd-source.txt") val personRDD:RDD [Person] = lines.map (line = > {val fields = line.split (" ") val id = fields (0). Trim.toInt val name = fields (1). Trim val age = fields (2). Trim.toInt val height = fields (3). Trim.toDouble new Person (id, name, age, height)}) val persons: util.List [Person] = util.Arrays.asList (new Person (1, Sun Talent, 25179) New Person (2, "Liu Yinpeng", 22,176), new Person (3, "Guo Shaobo", 27,178), new Person (1, "Qi Yanpeng", 24175) / / val df:DataFrame = sqlContext.createDataFrame (persons, classOf [Person]) / / this way can also be val df:DataFrame = sqlContext.createDataFrame (personRDD ClassOf [Person]) df.show () sc.stop ()}
The output is as follows:
+-age | height | id | name | 13 | 175.0 | 1 | zhangsan | 14 | 180.0 | 2 | lisi | 15 | 175.0 | 3 | wangwu | 16 | 195.0 | 4 | zhaoliu | 17 | 165.0 | 5 | zhouqi | 18 | 155.0 | 6 | weiba | +-+-- Java version
The test code is as follows:
Conversion operation between package cn.xpleaf.bigdata.spark.java.sql.p1;import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.Column;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;import java.util.Arrays;import java.util.List;/** * SparkRDD and DataFrame * 1. Convert RDD to DataFrame * 2 by reflection. Convert RDD to DataFrame by dynamic programming * here is the first * / public class _ 01SparkRDD2DataFrame {public static void main (String [] args) {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) SparkConf conf = new SparkConf () .setMaster ("local [2]") .setAppName (_ 01SparkRDD2DataFrame.class.getSimpleName ()) .set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses (new Class [] {Person.class}); JavaSparkContext jsc = new JavaSparkContext (conf); SQLContext sqlContext = new SQLContext (jsc) List persons = Arrays.asList (new Person (1, Sun Talent, 25,179), new Person (2, Liu Yinpeng, 22,176), new Person (3, Guo Shaobo, 27,178), new Person (1, Qi Yanpeng, 24175)) DataFrame df = sqlContext.createDataFrame (persons, Person.class) / / there are multiple construction methods It is also possible to use personsRDD method / / where age > 23 and height > 176 df.select (new Column ("id"), new Column ("name"), new Column ("age") New Column ("height")) .where (new Column ("age") .gt (23) .and (new Column ("height") .lt (179)) .show () Df.registerTempTable ("person"); sqlContext.sql ("select * from person where age > 23 and height"
< 179").show(); jsc.close(); }} 输出结果如下: +---+----+---+------+| id|name|age|height|+---+----+---+------+| 3| 郭少波| 27| 178.0|| 1| 齐彦鹏| 24| 175.0|+---+----+---+------++---+------+---+----+|age|height| id|name|+---+------+---+----+| 27| 178.0| 3| 郭少波|| 24| 175.0| 1| 齐彦鹏|+---+------+---+----+使用编程的方式将RDD转换为DataFrame 1、通过编程接口来创建DataFrame,在Spark程序运行阶段创建并保持一份最新的元数据信息,然后将此元数据信息应用到RDD上。 2、优点在于编写程序时根本就不知道元数据的定义和内容,只有在运行的时候才有元数据的数据。这种方式是在动态的时候进行动态构建元数据方式。 Scala版 测试代码如下: package cn.xpleaf.bigdata.spark.scala.sql.p1import cn.xpleaf.bigdata.spark.java.sql.p1.Personimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.sql.types.{DataTypes, StructField, StructType}import org.apache.spark.sql.{DataFrame, Row, SQLContext}import org.apache.spark.{SparkConf, SparkContext}/** * SparkRDD与DataFrame之间的转换操作 * 1.通过反射的方式,将RDD转换为DataFrame * 2.通过动态编程的方式将RDD转换为DataFrame * 这里演示的是第2种 */object _03SparkRDD2DataFrame { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName) // 使用kryo的序列化方式 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses(Array(classOf[Person])) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines = sc.textFile("D:/data/spark/sql/sql-rdd-source.txt") val rowRDD:RDD[Row] = lines.map(line =>{val fields = line.split (",") val id = fields (0). Trim.toInt val name = fields (1). Trim val age = fields (2). Trim.toInt val height = fields (3). Trim.toDouble Row (id, name, age, height)}) val scheme = StructType (StructField ("id", DataTypes.IntegerType, false) StructField ("name", DataTypes.StringType, false), StructField ("age", DataTypes.IntegerType, false), StructField ("height", DataTypes.DoubleType, false)) val df = sqlContext.createDataFrame (rowRDD, scheme) df.registerTempTable ("person") sqlContext.sql ("select max (age) as max_age" Min (age) as min_age from person ") .show () sc.stop ()}}
The output is as follows:
+-+ | max_age | min_age | +-+-+ | 18 | 13 | +-+-+ Java version
The test code is as follows:
Package cn.xpleaf.bigdata.spark.java.sql.p1;import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.Metadata;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.Arrays;import java.util.List Import java.util.function.Function;import java.util.stream.Collectors;import java.util.stream.Stream;public class _ 02SparkRDD2DataFrame {public static void main (String [] args) {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) SparkConf conf = new SparkConf () .setMaster ("local [2]") .setAppName (_ 02SparkRDD2DataFrame.class.getSimpleName ()) .set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses (new Class [] {Person.class}); JavaSparkContext jsc = new JavaSparkContext (conf); SQLContext sqlContext = new SQLContext (jsc) List persons = Arrays.asList (new Person (1, Sun Talent, 25,179), new Person (2, Liu Yinpeng, 22,176), new Person (3, Guo Shaobo, 27,178), new Person (1, Qi Yanpeng, 24175)) Stream rowStream = persons.stream () .map (new Function () {@ Override public Row apply (Person person) {return RowFactory.create (person.getId (), person.getName (), person.getAge (), person.getHeight ();}}); List rows = rowStream.collect (Collectors.toList ()) StructType schema = new StructType (new StructField [] {new StructField ("id", DataTypes.IntegerType, false, Metadata.empty ()), new StructField ("name", DataTypes.StringType, false, Metadata.empty ()), new StructField ("age", DataTypes.IntegerType, false, Metadata.empty ()), new StructField ("height", DataTypes.DoubleType, false Metadata.empty ()}) DataFrame df = sqlContext.createDataFrame (rows, schema); df.registerTempTable ("person"); sqlContext.sql ("select * from person where age > 23 and height")
< 179").show(); jsc.close(); }} 输出结果如下: +---+----+---+------+| id|name|age|height|+---+----+---+------+| 3| 郭少波| 27| 178.0|| 1| 齐彦鹏| 24| 175.0|+---+----+---+------+缓存表(列式存储)案例与解析缓存和列式存储 Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率。 sqlContext.cacheTable(tableName) 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构或者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage、Column Based Storage、 PAX Storage。 Spark SQL 的内存数据是如何组织的? Spark SQL 将数据加载到内存是以列的存储结构。称为In-Memory Columnar Storage。 若直接存储Java Object 会产生很大的内存开销,并且这样是基于Row的存储结构。查询某些列速度略慢,虽然数据以及载入内存,查询效率还是低于面向列的存储结构。 基于Row的Java Object存储 内存开销大,且容易FULL GC,按列查询比较慢。Column-based ByteBuffer storage (Spark SQL)
The memory cost is small, and the query speed by column is faster.
In-Memory Columnar Storage code distribution
The In-Memory Columnar Storage of Spark SQL is located in the org.apache.spark.sql.columnar package under the spark column:
The core classes are ColumnBuilder, InMemoryColumnarTableScan, ColumnAccessor, ColumnType.
If there is compression in the column: there are classes with specific build columns and access columns under the compression package.
Performance tuning
For some workloads, you can improve performance by caching data in memory or by opening some experimental options.
Cache data in memory
Spark SQL can cache tables that use column format by calling the sqlContext.cacheTable ("tableName") method. Spark will then browse only the required columns and automatically compress the data to reduce memory usage and garbage collection pressure. You can delete the table in memory by calling the sqlContext.uncacheTable ("tableName") method.
Note that if you call schemaRDD.cache () instead of sqlContext.cacheTable (...), the table will not be cached in columnar format. In this case, sqlContext.cacheTable (...) It is a highly recommended usage.
You can configure the memory cache by using the setConf method on SQLContext or by running the SET key=value command when using SQL.
DataFrame commonly used API1, collect and collectAsList to convert the data in df into Array and List2, count to count the total number of records in df, first to obtain the first record in df, data type Row4, head to obtain the first few records of df, show6, take to obtain the first few records in df 7, cache to cache df 8, columns to display the schema column names of all columns Type Array [string] 9, dtypes displays schema information for all columns, type Array [(String, String)] 10, explain shows current df execution plan 11, isLocal current spark sql execution is local, true is true, false is non-local 12, printSchema13, registerTempTable14, schema15, toDF: remarks: when toDF has parameters The number of parameters must be the same as the column data that calls the DataFrame, similar to that in sql: toDF:insert into t select * from T1 16. Intersect: return two Rows with the same DataFrame
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.