In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains the "Facebook database query engine Presto in Meituan what applications", the article explains the content is simple and clear, easy to learn and understand, the following please follow the editor's ideas slowly in depth, together to study and learn "Facebook database query engine Presto in Meituan what applications!"
Facebook's data warehouse is stored in a small number of large Hadoop/HDFS clusters. Hive is a data warehouse tool built by Facebook for Hadoop a few years ago. In the past, Facebook scientists and analysts have relied on Hive for data analysis. However, Hive uses MapReduce as the underlying computing framework and is designed for batch processing. However, with more and more data, a simple data query using Hive may take several minutes to several hours, which obviously can not meet the needs of interactive query. Facebook has also investigated other tools that are faster than Hive, but they are either limited in functionality or too simple to operate Facebook's vast data warehouse.
Some of the external projects that were piloted in 2012 were not appropriate, and they decided to develop their own, which is Presto. Development began in the fall of 2012, and the project is currently in use among more than 1000 Facebook employees, running more than 30000 queries and daily data at the 1PB level. Facebook says Presto's performance is 10 times better than Hive's. In 2013, Facebook officially announced open source Presto.
This paper first introduces the process of Presto from user submitting SQL to execution, then tries to analyze and summarize the principle of real-time query by Presto, and finally introduces the use of Presto in Meituan.
Presto architecture
The Presto query engine is a Master-Slave architecture, which consists of a Coordinator node, a Discovery Server node, and multiple Worker nodes, and the Discovery Server is usually embedded in the Coordinator node. Coordinator is responsible for parsing SQL statements, generating execution plans, and distributing execution tasks to Worker nodes for execution. The Worker node is responsible for actually performing the query task. After the Worker node starts, it registers with the Discovery Server service, and Coordinator gets the working Worker node from Discovery Server. If Hive Connector is configured, you need to configure a Hive MetaStore service to provide Hive meta-information to Presto, and the Worker node interacts with HDFS to read data.
A brief introduction to the query execution process of Presto
Since Presto is an interactive query engine, what we are most concerned about is the principle of implementing low-latency queries with Presto. I think it is mainly the following key points, and of course, there are some traditional SQL optimization principles, which are not introduced here.
Completely memory-based parallel computing
Pipeline
Localized computing
Dynamic compilation execution plan
Careful use of memory and data structures
Approximate query like BlinkDB
GC control
In order to introduce the above points, let's first introduce the process of executing the query by Presto.
Submit query
After the user submits a query statement using Presto Cli, Cli communicates with Coordinator using HTTP protocol. After receiving the query request, Coordinator calls SqlParser to parse the SQL statement to get the Statement object, and encapsulates the Statement into a QueryStarter object into a thread pool to wait for execution.
SQL compilation process
Presto, like Hive, uses Antlr to write SQL syntax, and syntax rules are defined in Statement.g and StatementBuilder.g files.
The physical execution plan compiled from SQL to the final physical execution plan shown in the following figure is roughly divided into five parts, and the final LocalExecutionPlan running on each Worker node is generated. The process of parsing SQL to a logical execution plan is not described in detail here, and the calculation process after query plan generation is understood through a SQL statement.
Sample SQL:
The code is as follows:
Select c1.rank, count (*) from dim.city C1 join dim.city c2 on c1.id = c2.id where c1.id > 10 group by c1.rank limit 10
The logical execution plan Plan generated by the SQL statement above is shown in the figure above. So how does Presto split the above logical execution plan to execute it with a high degree of parallelism? let's take a look at the physical execution plan.
Physical execution plan
The dotted line in the logic execution plan diagram is the syncopation point of the Presto to the logic execution plan. The SubPlan generated by the logic plan Plan is divided into four parts, and each SubPlan is submitted to one or more Worker nodes for execution.
SubPlan has several important properties-planDistribution, outputPartitioning, and partitionBy.
PlanDistribution represents the distribution of a query Stage. There are three different PlanDistribution modes for the four SubPlan in the logical execution plan: Source indicates that the SubPlan is a data source, and tasks of Source type determine how many nodes are assigned to execute according to the size of the data source; Fixed indicates that the SubPlan will be assigned a fixed number of nodes for execution (query.initial-hash-partitions parameter configuration in Config configuration, default is 8) None indicates that the SubPlan is assigned to only one node for execution. In the following execution plan, SubPlan1 and SubPlan0 PlanDistribution=Source, these two SubPlan are the nodes that provide the data source, and the read data of all nodes in SubPlan1 will be sent to each node in SubPlan0; SubPlan2 allocates eight nodes to perform the final aggregation operation; and SubPlan3 is only responsible for outputting the final calculated data.
The OutputPartitioning attribute has only two values, HASH and NONE, indicating whether the output of this SubPlan Shuffle the data according to the key value of partitionBy. In the following execution plan, there is only SubPlan0's OutputPartitioning=HASH, so the data received by SubPlan2 is based on the data after Partition in the rank field.
Completely memory-based parallel computing
Parallel execution process of query
The execution flow of Presto SQL is shown in the following figure
After Cli submits the SQL query through the HTTP protocol, the query request is encapsulated into a SqlQueryExecution object and handed to Coordinator's SqlQueryManager# queryExecutor thread pool for execution.
After each SqlQueryExecution thread (QmurX thread in the figure) starts, it parses and optimizes the SQL of the query request, and finally generates multiple Stage SqlStageExecution tasks, and each SqlStageExecution task is still assigned to the same thread pool for execution.
After each SqlStageExecution thread (Smurx thread in the figure) starts, the task of each Stage is constructed according to the PlanDistribution attribute and one or more RemoteTask is assigned to the remote Worker node for execution through the HTTP protocol.
After the Worker node receives the RemoteTask request, it starts a SqlTaskExecution thread (TmurX thread in the figure) and wraps each Split of the task as a PrioritizedSplitRunner task (SR-X in the figure) to the TaskExecutor# executor thread pool of the Worker node for execution.
The actual execution effect of the above execution plan is shown in the following figure.
Coordinator calls the / v1/task interface of the Worker node through the HTTP protocol to assign the execution plan to all Worker nodes (blue arrow in the figure)
Each node of SubPlan1 reads a Split's data, filters it, and distributes the data to each SubPlan0 node for Join and Partial Aggr operations.
After the calculation of each node of SubPlan1 is completed, the data is distributed to different SubPlan2 nodes according to the hash value of GroupBy Key.
Distribute the data to the SubPlan2 node after the calculation of all SubPlan3 nodes
After the calculation of the SubPlan3 node is completed, notify Coordinator to end the query and send the data to Coordinator.
Parallel reading of source data
In the above execution plan, SubPlan1 and SubPlan0 are both Source nodes. In fact, they read HDFS file data by calling HDFS InputSplit API, and then each InputSplit is assigned a Worker node to execute. The upper limit of the number of InputSplit assigned by each Worker node is configurable. The query.max-pending-splits-per-node parameter in Config is configured by default.
Distributed Hash aggregation
The above execution plan performs an aggregate calculation of Partial in SubPlan0, calculating the partial aggregate results of some data read by each Worker node, and then the output of SubPlan0 allocates different computing nodes according to the hash value of the group by field. Finally, SubPlan3 merges all the results and outputs them.
Pipeline
Data model
The smallest data unit processed in Presto is a Page object, and the data structure of the Page object is shown in the following figure. A Page object contains multiple Block objects, and each Block object is a byte array that stores several rows of a field. A row of multiple Block crosscuts is a real row of data. A Page has a maximum 1MB with a maximum of 16024 rows of data.
Internal pipeline calculation of nodes
The following figure is a calculation flow chart inside the Worker node, and on the left is the execution flow chart of the task.
The Worker node encapsulates the finest-grained tasks into a PrioritizedSplitRunner object, which is placed in the pending split priority queue. Every one of them.
The Worker node starts a certain number of threads for calculation, and the number of threads task.shard.max-threads=availableProcessors () * 4 is configured in config.
Each idle thread takes a PrioritizedSplitRunner object from the queue for execution. If the execution completes a cycle, the maximum execution time exceeds 1 second, it determines whether the task is completed, if it is completed, it is removed from the allSplits queue, and if not, it is put back into the pendingSplits queue.
The execution flow of each task is as follows: on the right side of the figure, iterate through all the Operator in turn, and try to fetch a Page object from the previous Operator. If the Page obtained is not empty, give it to the next Operator for execution.
Pipeline calculation between nodes
The following figure is the execution flow chart of ExchangeOperator. ExchangeOperator starts a HttpPageBufferClient object for each Split and actively pulls data to the Worker node of the previous Stage. The smallest unit of data is also a Page object, which is fetched and put into the Pages queue.
Localized computing
When Presto selects the Source task computing node, for each Split, select some minCandidates according to the following strategy
Give priority to the Worker node with the same Host as Split
If the node is not enough to give priority to the Worker node with the same Rack as Split
If the node is not enough to randomly select other Rack nodes
For all Candidate nodes, select the node with the least assignedSplits.
Dynamic compilation execution plan
Presto dynamically compiles the ScanFilterAndProjectOperator and FilterAndProjectOperator in the execution plan into Byte Code, which is handed over to JIT to compile into native code. Presto also uses the Byte Code generated by the LoadingCache cache provided by Google Guava.
Of the two code snippets above, the first is the code without dynamic compilation, and the second is the optimized generation that is restored after Byte Code decompilation generated by dynamic compilation
Code, we can see that the optimization method of loop expansion is used here.
Loop unwrapping is most commonly used to reduce loop overhead and provide instruction-level parallelism for processors with multiple functional units. It is also beneficial to the scheduling of instruction pipeline.
Careful use of memory and data structures
Slice is used for memory operations, and Slice uses Unsafe#copyMemory to achieve efficient memory copying. Slice Repository reference: https://github.com/airlift/slice
In another article introducing ORCFile optimization, Facebook engineers also mentioned the use of Slice to improve the write performance of ORCFile by 20% and 30%. Refer to: https://code.facebook.com/posts/229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/
Approximate query like BlinkDB
In order to speed up the query speed of aggregate functions such as avg, count distinct, percentile, etc., the Presto team cooperated with Sameer Agarwal, one of the authors of BlinkDB, to introduce some approximate query functions approx_avg, approx_distinct, approx_percentile. Approx_distinct is implemented using the HyperLogLog Counting algorithm.
GC control
The Presto team found a BUG for JIT when using hotspot java7, and when the contemporary code cache is about to reach its limit, JIT may stop working, thus unable to dynamically compile frequently used code into native code.
The Presto team used a method of comparing Hack to solve this problem, adding a thread to explicitly GC when the code cache reached more than 70%, so that the loaded Class was removed from the perm, avoiding the BUG where the JIT was not working properly.
How to use Presto by Meituan
Reasons for choosing presto
We also used impala for a period of time in 2013. At that time, impala did not support the online 1.x hadoop community version, so we built a small cluster of CDH to import hot spot data from large clusters into small clusters every day. However, after the hadoop cluster completed the upgrade 2.2 years ago, impala did not support the 2.2 hadoop version at that time. While Presto has just begun to support 2.x hadoop Community Edition, and Presto can be successfully widely used in the environment of large amount of Facebook 300PB data, we believe it can also well support our real-time analysis needs in Meituan, so we decided to launch the test and use it for a period of time.
Deployment and use form
Considering two reasons: 1. Because the Hadoop cluster mainly completes yesterday's computing tasks at night, the computing load of the cluster is low except for log writes during the day. 2. Presto Worker nodes and DataNode nodes are arranged on the same machine and can be calculated locally. So we deployed Presto to all DataNode machines, and stopped the Presto service at night to avoid taking up cluster resources, and there were almost no users querying data at night.
Presto secondary development and BUG repair
It was not until 20 years later that the Presto query engine, version 0.60, was officially launched. It has not been used for a long time, but it has also encountered some problems:
Meituan's Hadoop uses version 2.2, and Security mode is enabled, but Presto does not support Kerberos authentication. We have modified the Presto code to add the function of Kerberos authentication.
Presto does not support implicit type conversion of SQL, but Hive supports it. Many users of self-service query are accustomed to Hive, which leads to the mismatch of left and right variable types in expressions when using Presto. We have added the function of implicit type conversion, which greatly reduces the probability of user SQL error.
Presto does not support querying lzo compressed data, so you need to modify the code of hadoop-lzo.
Solved a BUG that failed when there was a distinct field in the having clause, and gave feedback to the Presto team https://github.com/facebook/presto/pull/1104
For all code changes, please refer to our warehouse https://github.com/MTDATA/presto/commits/mt-0.60 on github.
Actual use effect
Here is a test report of a company's internal query center that is open to analysts, PM, and engineers for self-help queries. 5000 Hive queries are selected here, and the comparison of Presto queries is shown in the table below.
Thank you for your reading, the above is the content of "what are the applications of Facebook's database query engine Presto in Meituan". After the study of this article, I believe you have a deeper understanding of the application of Facebook's database query engine Presto in Meituan, and the specific use still needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.