Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

It's time to learn real spark technology.

2025-02-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Cdn.xitu.io/2018/11/21/1673560dca70a6b7?w=1433&h=534&f=jpeg&s=309760 ">"

Spark sql can be said to be the essence of spark. I feel that the overall complexity is more than five times that of spark streaming. Now spark officially promotes structed streaming, and spark streaming maintenance is not active. We build big data computing tasks based on spark, and the focus also has to shift to DataSet. The original code written based on RDD has been migrated, and the benefits are very great, especially in terms of performance. Various embedded performance optimizations in spark sql are more reliable than naked RDD following various so-called best practices, especially for beginners, for example, some best practices talk about filter operation before map operation, this kind of spark sql will automatically push down predicates, such as try to avoid using shuffle operation, if you open the relevant configuration in spark sql, you will automatically use broadcast join to broadcast small tables, convert shuffle join to map join, and so on. It can really save us a lot of worry. 


The code complexity of spark sql is caused by the essential complexity of the problem. Most of the logic of the Catalyst framework in spark sql is to do all kinds of troubles on a Tree type data structure, and it is elegant to implement based on scala. The partial function of scala and the powerful Case regular match make the whole code look clear. This article briefly describes some mechanisms and concepts in spark sql.

SparkSession is our entry point for writing spark application code. Launching a spark-shell will provide you with a way to create a SparkSession. This object is the starting point of the entire spark application. Let's take a look at some important variables and methods of sparkSession:

Cymbal

The sessionState mentioned above is a key thing that maintains all the state data currently used by session, with the following various things to maintain:

Cymbal

Spark sql internally uses dataFrame and Dataset to represent a data set, and then you can apply various statistical functions and operators to this data set. Some people may not be able to distinguish DataFrame from Dataset. In fact, DataFrame is a type of DataSet called Row.

Type DataFrame = Dataset [Row]

The Row type mentioned here is said at the API level of Spark sql's external exposure, but DataSet does not require the input type to be Row, and it can also be a strongly typed data. The underlying data type handled by DataSet is Catalyst internal InternalRow or UnsafeRow type, and there is an implicit conversion of Encoder behind it to convert the data you enter into internal InternalRow, so inference that DataFrame corresponds to RowEncoder.

Transformations on Dataset will generate a tree structure with elements of type LogicalPlan. For example, if I have a student table and a score table, the requirement is to count the total scores of all students over the age of 11.

Cymbal

Cymbal

This queryExecution is the execution engine of the entire execution plan. There are various intermediate process variables in the execution process. The whole execution process is as follows.

Cymbal

Then the sql statement in our example above will become an abstract syntax tree after parsing by Parser, and the logical plan AST after parsing is

Cymbal

Use the picture to show the image.

Cymbal

We can see that the filter condition becomes the Filter node, this node is of type UnaryNode, that is, there is only one child, the data in the two tables becomes the UnresolvedRelation node, and this node is the LeafNode type, as the name implies, the leaf node, and the JOIN operation represents the Join node, which is a BinaryNode node with two children.

These nodes mentioned above are of LogicalPlan type and can be understood as Operator for various operations, and spark sql defines various Operator corresponding to various operations.

Cymbal

The abstract syntax tree composed of these operator is the basis of the whole Catatyst optimization, and the Catatyst optimizer will do all kinds of twists and turns on the tree, moving the nodes on the tree to optimize.

Now we have an abstract syntax tree after Parser, but we don't know what score,sum is, so we need analyer to locate it. Analyzer will transform all Unresolved on AST into resolved state. Sparksql has a lot of resolve rules, which are easy to understand. For example, ResolverRelations is the basic type of parsing table (column), and ResolveFuncions is the basic information of parsed functions, such as the sum function in the example. ResolveReferences may not be easy to understand. The field we use in the sql statement, such as name in Select name, corresponds to a variable, which exists as a variable (Attribute type) when parsing the table, then the same variable in the Project node corresponding to Select becomes a reference, and they have the same ID, so after ResolveReferences processing, it becomes the AttributeReference type, ensuring that they are given the same value when the data is finally loaded. Just like when we write code to define a variable, these Rule repeatedly act on the node, and the designated tree node tends to be stable, of course, more optimizations will waste performance, so some rule functions as Once and some rule functions as FixedPoint, which is a trade-off. All right, cut the crap, let's do a little experiment.

Cymbal

We use ResolverRelations to parse our AST, and after parsing, we can see that the original UnresolvedRelation has become LocalRelation, which represents a table in local memory, which is registered in catalog when we use createOrReplaceTempView. The relove operation is nothing more than looking up the table in catalog, finding out the schema of the table, and parsing out the corresponding fields, transforming each StructField defined by the outer user into AttibuteReference, and marking it with ID.

Cymbal

If we use ResolveReferences to do this again, you will find that the same fields in the upper node become references with the same ID, and they are all of type AttibuteReference. Eventually, after all the rule is applied, the whole AST becomes 
.

Cymbal

Here's the main point. To optimize the logic, let's take a look at what the logic optimization has:

Cymbal

There are many kinds of logic optimization in sparksql, most of the logic of Catalyst framework in sparksql is based on a Tree type data structure, it is very elegant to achieve based on scala, the partial function of scala and powerful Case regular match, so that the whole code still looks clear, nonsense, let's do a small experiment.

Cymbal

See, I changed my (100 + 10) to 110.

Cymbal

Using PushPredicateThroughJoin to push down a Filter that only filters the stu table to Join, it will load a lot less data, and the performance will be optimized. Let's take a look at the final look.

Cymbal

At least with ColumnPruning,PushPredicateThroughJoin,ConstantFolding,RemoveRedundantAliases logic optimization, now my little tree has become:

After all, the logical optimization is only an abstract logical layer, and it needs to be converted into a physical execution plan first, and the logically feasible execution plan can be transformed into a plan that Spark can really execute.

Cymbal

Spark sql converts logical nodes into corresponding physical nodes, such as Join operator. Spark formulates different algorithm strategies for this operator according to different scenarios, such as BroadcastHashJoin, ShuffleHashJoin, SortMergeJoin and so on. Of course, there are many optimization points. Spark will intelligently select them according to some statistical data during the conversion, which involves cost-based optimization, which is also a big part. Later, you can write an article alone. In our example, because the amount of data is less than 10m, it automatically changes to BroadcastHashJoin. Sharp-eyed students can see that there seem to be some more nodes. Let's explain that the BroadcastExchange node inherits the Exchage class and is used to exchange data between nodes. The BroadcastExchange here will broadcast the LocalTableScan data to each executor node and use it as a map-side join. The final Aggregate operation is divided into two steps. The first step is parallel aggregation, followed by Final aggregation of the aggregated results. This is similar to the combine in the domain name map-reduce and the final reduce, with an Exchange hashpartitioning added in the middle. This is to ensure the same key shuffle to the same partition. When the Distribution of the Child output data of the current physical plan does not meet the requirements, Shuffle is required. This is the exchange data node inserted in the final EnsureRequirement phase. In the database field, there is a saying that those who win the join win the world. We focus on some trade-offs made by spark sql in the join operation.

The Join operation basically divides the two Join tables into large tables and small tables, with the large table as the streaming traversal table and the small table as the lookup table, and then take the same Key record from the lookup table according to the Key for each record in the large table.

Spark supports all types of Join:

Cymbal

The join operation in spark sql chooses different join strategies according to various conditions, which are divided into BroadcastHashJoin, SortMergeJoin and ShuffleHashJoin.

If BroadcastHashJoin:spark judges that the storage space of a table is less than the broadcast threshold (the parameter spark.sql.autoBroadcastJoinThreshold is used in Spark to control the threshold for selecting BroadcastHashJoin, the default is 10MB), it broadcasts the small table to Executor, and then puts the small table in a hash table as a lookup table. The join operation can be completed through a map operation, avoiding the shuffle operation with large performance code. However, note that BroadcastHashJoin does not support full outer join, for right outer join. Broadcast left table, for left outer join,left semi join,left anti join, broadcast right table, for inner join, which table is smaller than broadcast. 



 SortMergeJoin: if the data of both tables are very large, it is more suitable to use SortMergeJoin. SortMergeJoin uses the shuffle operation to shuffle the records of the same key into a partition, and then both tables are sorted, and the sort merge operation is acceptable. 



 ShuffleHashJoin: if you don't sort in the shuffle process and put the lookup table in the hash table to find join, when will ShuffleHashJoin be done? The size of the lookup table cannot exceed the spark.sql.autoBroadcastJoinThreshold value, otherwise BroadcastHashJoin will be used, and the average size of each partition cannot exceed spark.sql.autoBroadcastJoinThreshold, so as to ensure that the lookup table can be placed in memory without OOM, and another condition is that the large table is more than 3 times the size of the small table, so that the benefits of this Join can be brought into full play. 


As mentioned above, the nodes above the AST have been converted into physical nodes, and these physical nodes will eventually call the execute method recursively from the head node. Calling the transform operation on the RDD generated by child will result in a stringed RDD chain, just like the recursive call on DStream in spark stremaing. The final figure is as follows: 


Cymbal

You can see that the final execution is divided into two stage, the broeadcastExechage of the small table is divided into two HashAggregate sum, and there is no evolution of the shuffle operation. Then, in the final step of aggregation, we first perform a HashAggregate sum function in the map segment, and then the Exchage operation shuffle the data of the same key to the same partition according to name, and then do the final HashAggregate sum operation. Here is a strange WholeStageCodegen, what is this, because we are performing Filter? When Project these operator, these operator contain a lot of Expression, such as SELECT sum (v), name, where sum and v are Expression, where v belongs to Attribute variable expression, expression is also a tree data structure, sum (v) is a tree structure composed of sum node and sum child node v, these expressions can be evaluated and generated code, the most basic function of expression is to evaluate. To calculate the input Row, Expression needs to implement the def eval (input: InternalRow = null): Any function to implement its function.

The expression is a process of Row, and the output can be of any type, but the type of Plan output of Project and Filter is def output: Seq [Attribute], which represents a set of variables, such as Filter (age > = 11) in our example, where age > 11 is an expression, this > expression depends on two child nodes, and a Literal constant expression evaluates to 11. The other is the Attribute variable expression age, which is converted to the AttributeReference type in the analyze phase, but it is Unevaluable. In order to get the corresponding value of the attribute in the input Row, you have to bind the index of this variable in a row of data according to the schema association to generate BoundReference, and then the expression BoundReference can get the value in Row according to index when eval. The final output type of the expression age > 11 is of type boolean, but the Plan output type of Filter is type Seq [Attribute].

You can imagine that the data flows in a plan, and then the expressions in each plan will deal with the data, which is equivalent to the call processing of a small function, and there is a lot of function call overhead, so can we inline these small functions as a big function, which is what WholeStageCodegen does?

Cymbal

You can see that there is a * sign in front of each node in the final execution plan, indicating that the whole code generation is enabled. In our example, Filter and Project,BroadcastHashJoin,Project,HashAggregate enable the whole code generation. If you are interested in cascading for two large functions, you can use a.queryExecution.debug.codegen to see what the generated code looks like. However, the Exchange operator does not achieve the entire code generation because it needs to send data over the network.

That's all for today's sharing. In fact, there are a lot of interesting things in spark sql, but because of the complexity of the problem, it takes a high degree of abstraction to straighten it out, which makes it difficult for code readers to understand, but if you really look into it, you will gain a lot. If you have any comments on this article, you are welcome to leave a message at the end of the article to express your thoughts. 


* * the awesome man said * *

The column of "Niu Ren Shuo" is devoted to the discovery of the thoughts of technical people, including technological practice, technical practical information, technical insights, growth experiences, and all the contents worth discovering. We hope to gather the best technical people to dig out unique, sharp, contemporary voices.

Contribution email: marketing@qiniu.combr/ > 


* * the awesome man said * *

The column of "Niu Ren Shuo" is devoted to the discovery of the thoughts of technical people, including technological practice, technical practical information, technical insights, growth experiences, and all the contents worth discovering. We hope to gather the best technical people to dig out unique, sharp, contemporary voices.

Contribution email: marketing@qiniu.com

Cymbal

Cymbal

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report