Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Getting started with 10.spark sql

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Past life and present life Hive&Shark

   with the advent of the era of big data, Hadoop is all the rage. In order to enable technicians who are familiar with RDBMS but do not understand MapReduce to develop big data quickly, Hive arises at the historic moment. Hive was the only SQL-on-Hadoop tool running on Hadoop at that time.

However, in the process of MapReduce computing, a large number of intermediate disks landing process consumes a lot of    O, which reduces the running efficiency. In order to improve the efficiency of SQL-on-Hadoop, a large number of SQL-on-Hadoop tools began to emerge, among which the most prominent ones are:

MapR's DrillCloudera's ImpalaShark.

   Shark is one of the components of the Berkeley Lab Spark ecosystem. It modifies the memory management, physical planning and execution modules of Hive Driver to run on the Spark engine, thus increasing the speed of SQL queries by 10-100 times.

Shark&Spark SQL

Too much dependence of    Shark on Hive (such as using Hive syntax parser, query optimizer, etc.) restricts the established policy of Spark's One Stack Rule Them All and restricts the integration of various components of Spark, so the SparkSQL project is proposed.

   SparkSQL abandons the original Shark code, absorbs some advantages of Shark, such as In-Memory Columnar Storage, Hive compatibility and so on, and redevelops SparkSQL code. As a result of getting rid of the dependence on Hive, SparkSQL has been greatly improved in terms of data compatibility, performance optimization and component extension.

Data compatibility

   is not only compatible with Hive, but also can obtain data from RDD, parquet files, JSON files, RDBMS data and NOSQL data such as cassandra.

Performance optimization

   not only adopts In-Memory Columnar Storage, byte-code generation and other optimization techniques, but also introduces Cost Model to dynamically evaluate the query and obtain the best physical plan.

Component extension aspect

  , whether it is SQL's syntax parser, parser, or optimizer, can be redefined and extended.

   Shark stopped development in 2014, and the team put all its resources on the SparkSQL project, which put an end to the development of Shark, but also developed two lines: SparkSQL and Hive on Spark.

   in which SparkSQL continues to develop as a member of Spark ecology, and is no longer limited to Hive, but is compatible with Hive; and Hive on Spark is a Hive development plan, which takes Spark as one of the underlying engines of Hive, that is to say, Hive will no longer be limited to one engine, but can use Map-Reduce, Tez, Spark and other engines.

Brief introduction

   Spark SQL is a module for structured data processing. Spark SQL gives some structured information to the data to be processed, and you can use SQL statements or DataSet API interfaces to interact with Spark SQL.

SQL

   Spark SQL can use sql to read and write data in Hive, or you can use sql in a programming language to return Dataset/DataFrame result sets.

DataSets&DataFrames

   Dataset is a distributed dataset that combines the advantages of RDD and SparkSQL execution engines. Dataset can be constructed from a JVM object and then processed using operator operations. Both Java and Scala have Dataset API;Python and R native support for Dataset features.

   DataFrame is a DataSet with a two-dimensional structure, which is equivalent to a table in RDBMS. DataFrame can be constructed in many ways, such as structured data files, hive tables, external databases, RDD, and so on. DataFrame API is found in Scala, Java, Python and R.

DataFrame and DataSetDataFrame create and manipulate scalaimport org.apache.spark.sql.SparkSession// construction SparkSessionval spark = SparkSession .builder () .appName ("Spark SQL basic example") .config ("spark.some.config.option") "some-value") .getOrCreate () / / create DataFrameval df = spark.read.json ("examples/src/main/resources/people.json") / / Displays the content of the DataFrame to stdoutdf.show () / / +-- +-- +-+ / / | age | name | / / +-- +-+ / / | null | Michael | / / | 30 | Andy | / / | 19 | Justin | / / +-+-- -+ / / DataFrame operation / / This import is needed to use the $- notationimport spark.implicits._// Print the schema in a tree formatdf.printSchema () / / root// |-- age: long (nullable = true) / / |-- name: string (nullable = true) / / Select only the "name" columndf.select ("name"). Show () / / +-+ / / | name | / / +-+ / / | Michael | / / | Andy | / / | Justin | / / +-+ / / Select everybody But increment the age by 1df.select ($"name" $"age" + 1). Show () / / +-+-+ / / | name | (age + 1) | / / +-+-+ / / | Michael | null | / | Andy | 31 | / | Justin | 20 | / / +-+ / / Select people older than 21df.filter ($") Age "> 21). Show () / / +-- +-+ / | age | name | / / +-- +-+ / / | 30 | Andy | / / +-- +-+ / / Count people by agedf.groupBy (" age "). Count (). Show () / / +-+-- + / / | age | count | / / +-+ / / | 19 | 1 | / | null | 1 | / / | 30 | 1 | / / +-+-+ javaimport org.apache.spark.sql.SparkSession / / construct SparkSessionSparkSession spark = SparkSession .builder () .appName ("Java Spark SQL basic example") .config ("spark.some.config.option", "some-value") .getOrCreate (); / / create DataFrameimport org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;Dataset df = spark.read () .appName ("examples/src/main/resources/people.json"); / / Displays the content of the DataFrame to stdoutdf.show () / / +-- +-+ / / | age | name | / / +-- +-+ / / | null | Michael | / / | 30 | Andy | / / | 19 | Justin | / / +-- +-+ / DataFrame operation / / col ("...") Is preferable to df.col ("...") import static org.apache.spark.sql.functions.col;// Print the schema in a tree formatdf.printSchema (); / / root// |-- age: long (nullable = true) / / |-- name: string (nullable = true) / / Select only the "name" columndf.select ("name"). Show () / / +-+ / / | name | / / +-+ / / | Michael | / | Andy | / / | Justin | / / +-+ / / Select everybody, but increment the age by 1df.select (col ("name"), col ("age"). Show (1)) / / +-+-+ / / | name | (age + 1) | / / +-+-+ / / | Michael | null | / / | Andy | 31 | / / | Justin | 20 | / / +-+-+ / / Select people older than 21df.filter (col ("age"). Gt (21). Show () / / +-- +-+ / / | age | name | / / +-- +-+ / / | 30 | Andy | / / +-+ / / Count people by agedf.groupBy ("age"). Count (). Show () / / +-- +-+ / / | age | count | / / +-- +-+ / / | 19 | 1 | / | null | 1 | / | 30 | 1 | / +-+-+ pythonfrom pyspark.sql import SparkSession# construct SparkSessionspark = SparkSession\ .builder\ .appName ("Python Spark SQL basic example")\ .config ("spark.some.config.option") "some-value")\ .getOrCreate () # create DataFrame# spark is an existing SparkSessiondf = spark.read.json ("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdoutdf.show () # +-- +-+ # | age | name | # +-+-+ # | null | Michael | # | 30 | Andy | # | Justin | # +-+-+ # DataFrame Operation # spark Df are from the previous example# Print the schema in a tree formatdf.printSchema () # root# |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" columndf.select ("name"). Show () # +-+ # | name | # +-# | Michael | # | Andy | # | Justin | # +-+ # Select everybody, but increment the age by 1df.select (df ['name'] Df ['age'] + 1). Show () # +-+ # | name | (age + 1) | # +-+ # | Michael | null | # | Andy | 31 | # | Justin | 20 | # +-+ # Select people older than 21df.filter (df [' age'] > 21) Show () # +

   Datasets is similar to RDD, but uses specialized Encoder encoders to serialize data objects that need to be transmitted over the network, instead of the Java serialization or Kryo libraries used by RDD. Encoder encoders are dynamically generated code that allows various operator operations to be performed directly without deserialization.

Scala// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecase class Person (name: String, age: Long) / / Encoders are created for case classesval caseClassDS = Seq (Person ("Andy", 32)). ToDS () caseClassDS.show () / +-- +-+ / / | name | age | / / +-+-/ / | Andy | 32 | / +-+-- / / Encoders for most common types are automatically provided by importing spark.implicits._val primitiveDS = Seq (1,2) 3). ToDS () primitiveDS.map (_ + 1). Collect () / / Returns: Array (2,3,4) / / DataFrames can be converted to a Dataset by providing a class. Mapping will be done by nameval path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json (path). As [person] peopleDS.show () / / +-- +-- +-+ / / | age | name | / / +-- +-+ / / | null | Michael | / / | 30 | Andy | / / | 19 | Justin | / / +-- +-+ javaimport java.util.Arrays;import java.util.Collections;import java.io.Serializable Import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.Encoder;import org.apache.spark.sql.Encoders;public static class Person implements Serializable {private String name; private int age; public String getName () {return name;} public void setName (String name) {this.name = name;} public int getAge () {return age } public void setAge (int age) {this.age = age;}} / / Create an instance of aBean classPerson person = new Person (); person.setName ("Andy"); person.setAge (32); / / Encoders are created for Java beansEncoder personEncoder = Encoders.bean (Person.class); Dataset javaBeanDS = spark.createDataset (Collections.singletonList (person), personEncoder); javaBeanDS.show () / / +-- +-+ / / | age | name | / / +-- +-+ / / | 32 | Andy | / / +-+ / / Encoders for most common types are provided in class EncodersEncoder integerEncoder = Encoders.INT (); Dataset primitiveDS = spark.createDataset (Arrays.asList (1,2,3), integerEncoder); Dataset transformedDS = primitiveDS.map ((MapFunction) value-> value + 1, integerEncoder); transformedDS.collect () / / Returns [2, 3, 4] / / DataFrames can be converted to a Dataset by providing a class. Mapping based on nameString path = "examples/src/main/resources/people.json"; Dataset peopleDS = spark.read () .json (path) .as (personEncoder); peopleDS.show () / / +-- +-+ / / | age | name | / / +-- +-+ / / | null | Michael | / / | 30 | Andy | / / | 19 | Justin | / / +-+ SQL operation scala// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView ("people") / / df.createGlobalTempView ("people") val sqlDF = spark.sql ("SELECT * FROM people") sqlDF.show () / / + -+-- + / / | age | name | / / +-- +-+ / / | null | Michael | / / | 30 | Andy | / / | 19 | Justin | / / +-- +-+ javaimport org.apache.spark.sql.Dataset Import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView ("people"); / / df.createGlobalTempView ("people") Dataset sqlDF = spark.sql ("SELECT * FROM people"); sqlDF.show () / / +-- +-+ / / | age | name | / / +-- +-+ / / | null | Michael | / / | 30 | Andy | / / | 19 | Justin | / / +-- +-- + python# Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView ("people") # df.createGlobalTempView ("people") sqlDF = spark.sql ("SELECT * FROM people") sqlDF.show () # +-+-- -+ # | age | name | # +-+-+ # | null | Michael | # | 30 | Andy | # | 19 | Justin | # +-+-+

Loyal to technology, love sharing. Welcome to the official account: java big data programming to learn more technical content.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report