In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "what is the structure and principle of Spark-S3-SparkSQL". Interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what is the structure and principle of Spark-S3-SparkSQL"?
The development of Spark SQL
HDFS-> HIVE
Due to the extensive use of Hadoop in enterprise production, a large amount of data has been accumulated on HDFS. In order to provide quick tools for technicians who are familiar with RDBMS but do not understand MapReduce, Hive arises at the historic moment. The principle of Hive is to translate SQL statements into MapReduce calculations.
HIVE-> SHARK
In the process of MapReduce computing, the landing process of a large number of intermediate disks consumes a lot of Icano, which reduces the running efficiency. In order to provide the efficiency of SQL-on-Hadoop, Shark appeared.
Shark is one of the components of the Spark ecological environment of Berkeley AMPLab Lab. It modifies the three modules of memory management, physical planning and execution in Hive to make SQL statements run directly on Spark, thus improving the speed of SQL queries by 10-100 times.
SHARK-> SPARK SQL
On June 1st, 2014, Reynold Xin, the host of Shark project and SparkSQL project, announced that the development of Shark would be stopped and the team would put all resources on the sparkSQL project. At this point, the development of Shark came to an end.
With the development of Spark, Shark's too much dependence on Hive restricts the policy of Spark's One Stack rule them all and restricts the integration of various components of Spark. At the same time, Shark can not make use of the characteristics of Spark for deep optimization, so it is decided to give up Shark and put forward the SparkSQL project.
With the end of Shark, two new projects came into being: SparkSQL and Hive on Spark. Among them, SparkSQL continues to develop as a member of Spark ecology, and is no longer limited to Hive, but is compatible with Hive; and Hive on Spark is a Hive development plan, which takes Spark as one of the underlying engines of Hive, that is to say, Hive will no longer be limited to one engine, but can use Map-Reduce, Tez, Spark and other engines.
SparkSQL advantage
SparkSQL gets rid of the dependence on Hive and gets great convenience in terms of data compatibility, performance optimization and component expansion.
1. Data compatibility
It is not only compatible with Hive, but also can obtain data from RDD, parquet files and JSON files. Future versions even support the acquisition of RDBMS data and NOSQL data such as cassandra.
2. Performance optimization
In addition to adopting optimization techniques such as In-Memory Columnar Storage and byte-code generation, Cost Model will be introduced to dynamically evaluate the query, obtain the best physical plan, and so on.
3. Component extension
SQL syntax parsers, parsers, or optimizers can be redefined and extended
Performance Optimization of Spark SQL
Memory column storage (In-Memory Columnar Storage)
For memory column storage, columns of all native data types are stored in native arrays, and complex data types supported by Hive (such as array, map, etc.) are serialized and then connected into a byte array to store.
In this way, one JVM object is created for each column, resulting in fast GC and compact data storage.
In addition, an efficient compression method with low CPU overhead can be used to reduce memory overhead.
More interestingly, the performance of aggregate-specific columns that are frequently used in analysis queries is greatly improved because the data of these columns are put together and are easier to read into memory for calculation.
Bytecode generation technology (bytecode generation, i.e. CG)
An expensive operation in a database query is the expression in the query statement, which is mainly caused by JVM's memory model. For example, SELECT axib FROM table, if the general SQL syntax approach is used in this query, it will form an expression tree and design virtual function calls many times, which will interrupt the normal pipeline processing of CPU and slow down the execution speed.
Spark-1.1.0 adds a codegen module to the expressions of the catalyst module. If dynamic bytecode generation technology is used, Spark SQL will dynamically compile the matching expressions with specific code and then run them when executing the physical plan.
Optimization of Scala Code
When writing code in Scala, Spark SQL should try to avoid inefficient code that is easy to GC. Although it makes it more difficult to write code, it still uses a unified interface for users, making it easier for developers to use.
The operational architecture of Spark SQL
SparkSQL has two branches, sqlContext and hiveContext,sqlContext now only support SQL syntax parser; hiveContext now supports SQL syntax parser and hivesql syntax parser, the default is hiveSQL syntax parser, users can configure to switch to SQL syntax parser to run syntax that hiveSQL does not support.
Spark SQL consists of four parts: Core, Catalyst, Hive and Hive-ThriftServer.
1.Core: responsible for handling data input and output, such as obtaining data, outputting query results to DataFrame, etc.
2.Catalyst: responsible for handling the entire query process, including parsing, binding, optimization, etc.
3.Hive: responsible for processing Hive data
4.Hive-ThriftServer: mainly used for access to hive
SparkSQL has two branches, sqlContext and hiveContext,sqlContext now only support SQL syntax parser; hiveContext now supports SQL syntax parser and hivesql syntax parser, the default is hiveSQL syntax parser, users can configure to switch to SQL syntax parser to run syntax that hiveSQL does not support.
The order in which Spark SQL statements are executed
1. Parse (Parse) the read SQL statements to distinguish which words are keywords (such as SELECT, FROM, WHERE), which are expressions, which are Projection, which are Data Source, etc., in the SQL statement, so as to judge whether the SQL statement is standard or not.
two。 Bind (Bind) the SQL statement to the database data dictionary (columns, tables, views, etc.). If the relevant Projection, Data Source, etc., exist, it means that the SQL statement can be executed.
3. The general database will provide several execution plans, these plans generally have operation statistics, and the database will choose an optimal plan (Optimize) among these plans.
4. Plan execution (Execute), which is carried out in the order of Operation-- > Data Source-- > Result, and sometimes returns the result without even reading the physical table, such as rerunning the just run.
The operation principle of Spark SQL
1. Use SessionCatalog to save metadata
Before parsing the SQL statement, a SparkSession is created, or if a version prior to 2.0 initializes SQLContext,SparkSession, it just encapsulates the creation of SparkContext and SQLContext. The metadata is saved in SessionCatalog, involving table names, field names, and field types. When you create a temporary table or view, you will actually register with SessionCatalog.
2. Parsing SQL uses ANTLR to generate an unbound logical plan
When we call the SQL of SparkSession or the SQL method of SQLContext, we use SparkSqlParser to parse the SQL. The ANTLR is used for lexical and grammatical parsing. It is divided into two steps to generate Unresolved LogicalPlan:
Lexical analysis: Lexical Analysis, responsible for dividing token into symbolic categories.
Build a parse tree or syntax tree AST.
3. Use the parser Analyzer to bind the logical plan
At this stage, Analyzer uses Analyzer Rules, combined with SessionCatalog, to parse unbound logical plans to generate bound logical plans.
4. Optimize the logical plan using the optimizer Optimizer
The optimizer also defines a set of Rules, which is used to iterate the logical plan and Exepression, so that the nodes of the tree are merged and optimized.
5. Use SparkPlanner to generate a physical plan
SparkSpanner uses Planning Strategies to transform the optimized logical plan to generate an executable physical plan SparkPlan.
6. Use QueryExecution to execute the physical plan
When the execute method of SparkPlan is called, the bottom layer actually triggers JOB again, and then returns RDD.
The operational architecture of Spark SQL
TreeNode
Logical plans, expressions, etc., can be expressed in tree, which is only maintained in memory and does not persist the disk, and the tree modifications made by the parser and optimizer just replace existing nodes.
TreeNode has two direct subclasses, QueryPlan and Expression. Under QueryPlan, there are LogicalPlan and SparkPlan. Expression is an expression system, which does not need to perform engine calculations, but can be directly processed or calculated nodes, including projection operations, operator operations, etc.
Rule & RuleExecutor
Rule refers to the rules to be applied to the logical plan to achieve binding and optimization. His implementation class is RuleExecutor. Both the optimizer and the parser need to inherit RuleExecutor.
Batch, Once, FixPoint are defined in each subclass. Each Batch represents a set of rules, Once for one operation on the tree, and FixPoint for multiple iterations on the tree.
RuleExecutor provides a Seq [batch] attribute, which defines the processing logic of RuleExecutor, and the specific processing logic is implemented by a specific Rule subclass.
Catalyst optimizer
In general, SparkSQL1.1 consists of four modules: core, catalyst, hive, and hive-Thriftserver:
Core handles the input and output of data, obtains data from different data sources (RDD, Parquet, json, etc.), and outputs the query results to schemaRDD
Catalyst deals with the whole process of query statements, including parsing, binding, optimization, physical planning, etc., which is said to be an optimizer rather than a query engine.
The processing of hive data by hive
Hive-ThriftServer provides CLI and JDBC/ODBC interfaces
In these four modules, catalyst is the core part, and its performance will affect the overall performance. As the development time is still short, there are still many deficiencies, but its plug-in design leaves a lot of room for future development.
From the figure above, the main implementation components of catalyst are:
1.sqlParse, which completes the syntax parsing of sql statements, and currently provides only a simple sql parser
2.Analyzer, mainly completes the binding work, binds Unresolved LogicalPlan and data metadata from different sources (such as hive metastore, Schema catalog) to generate resolved LogicalPlan
3.optimizer optimizes resolved LogicalPlan to generate optimized LogicalPlan
4.Planner converts LogicalPlan to PhysicalPlan;,
5.CostModel, which selects the best physical execution plan based on past performance statistics
The basic implementation of these components:
1. First, the sql statement is parsed to generate Tree, and then different Rule is applied to the Tree at different stages, and the function of each component is completed through transformation.
2.Analyzer uses Analysis Rules, together with data metadata (such as hive metastore, Schema catalog), to improve the properties of Unresolved LogicalPlan and convert it to resolved LogicalPlan
3.optimizer uses Optimization Rules to convert resolved LogicalPlan to optimized LogicalPlan through optimization operations such as merge, column clipping, filter push-down, etc.
4.Planner uses Planning Strategies, for optimized LogicalPlan
Perform optimization
To illustrate query optimization, let's take a look at the example of population data analysis shown in the following figure. In the figure, two DataFrame are constructed, join them and then do another filter operation. If the implementation plan is carried out intact, the final implementation efficiency is not high. Because join is a costly operation, it may also result in a larger dataset. If we can push down the filter below the join, filter the DataFrame first, and then filter the smaller result set after join filtering, we can effectively reduce the execution time. This is exactly what Spark SQL's query optimizer does. In short, logical query plan optimization is a process of using equivalent transformation based on relational algebra to replace high-cost operations with low-cost operations.
In the process of converting the optimization execution plan into the physical management execution plan, the filtering conditions can also be pushed into the data source according to the characteristics of the specific data source. The Filter in the rightmost physical execution plan disappears because it is integrated into the table scan node used to perform the final read operation.
A preliminary study of DataFrame
In Spark, DataFrame is a distributed data set based on RDD, similar to two-dimensional tables in traditional databases. The main difference between DataFrame and RDD is that the former has schema meta-information, that is, each column of the two-dimensional table dataset represented by DataFrame has a name and type. This enables Spark SQL to gain insight into more structural information, so as to optimize the data sources hidden behind DataFrame and the transformations acting on DataFrame, and finally achieve the goal of greatly improving runtime efficiency. In contrast, RDD, because there is no way to know the specific internal structure of the stored data elements, Spark Core can only do simple and general pipeline optimization at the stage level.
Characteristics of DataFrame
Data from Kilobytes to Petabytes on a single node cluster can be processed as a large cluster.
Supports different data formats (Avro,csv, flexible search and Cassandra) and storage systems (HDFS,HIVE tables, mysql, etc.).
State-of-the-art optimization and code generation through the Spark SQL Catalyst optimizer (tree transformation framework).
You can easily integrate with all big data tools and frameworks through Spark-Core.
Provides API for Python,Java,Scala and R programming.
Create DataFrame
In Spark SQL, developers can easily convert all kinds of internal and external stand-alone and distributed data into DataFrame.
# construct from users table in Hive
DataFrame users = sqlContext.table ("users")
# load the JSON file on S3
Logs = sqlContext.load ("s3n://path/to/data.json", "json")
# load the Parquet file on HDFS
Clicks = sqlContext.load ("hdfs://path/to/data.parquet", "parquet")
# access MySQL comments = sqlContext.jdbc ("jdbc:mysql://localhost/comments", "user") through JDBC
# transform normal RDD into
DataFrame rdd = sparkContext.textFile ("article.txt")\ .flatMap (lambda line: line.split ())\ .map (lambda word: (word, 1))\ .reduceByKey (lambda a, b: a + b)\ wordCounts = sqlContext.createDataFrame (rdd, ["word", "count"])
# transform the local data container into
DataFrame data = [("Alice", 21), ("Bob", 24)] people = sqlContext.createDataFrame (data, ["name", "age"])
Use DataFrame
Like R and Pandas, Spark DataFrame provides a complete set of DSL for manipulating data. These DSL are semantically very similar to SQL relational queries (which is one of the important reasons why Spark SQL can provide seamless support for DataFrame).
# create a DataFrame that contains only "young" users
Df = users.filter (users.age < 21)
# you can also use Pandas-style syntax
Df = users [users.age < 21]
# add everyone's age to 1
Df.select (young.name, young.age + 1)
# Statistics on the number of young users by gender
Df.groupBy (gender) .count ()
# Connect all young users with another DataFrame named logs
Df.join (logs, logs.userId = = users.userId, "left_outer")
Save DataFrame
When the data analysis logic is written, we can save or present the final results.
# append to the Parquet file on HDFS
Df.save (path= "hdfs://path/to/data.parquet", source= "parquet", mode= "append")
# overwrite the JSON file on S3
Df.save (path= "s3n://path/to/data.json", source= "json", mode= "append")
# Save as SQL table
Df.saveAsTable (tableName= "young", source= "parquet" mode= "overwrite")
# convert to Pandas DataFrame (Python API specific feature)
PandasDF = young.toPandas ()
# print output in tabular form
Df.show ()
At this point, I believe you have a deeper understanding of "what is the structure and principle of Spark-S3-SparkSQL". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.