In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
What is the application practice of Delta Lake in Soul? I believe many inexperienced people are at a loss about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
First, background introduction
(1) Business scenarios
In the traditional offline data warehouse mode, the first stage before log storage is the huge amount of log data in ETL,Soul and the need for dynamic partition. On the basis of day partition, the daily dynamic partition is 1200cm, and the amount of partition data is uneven, ranging from tens of thousands to billions. The following figure shows our previous ETL process. Enter the log into Kafka, collect it from Flume to HDFS, and then enter it into Hive through the sky-level Spark ETL task. The task starts to run in the early morning, the data processing phase is about 1 hour, the load phase is 1 hour, and the overall execution time is 2-3 hours.
(2) existing problems
Under the above framework, we are faced with the following problems:
1. Heavenly ETL tasks take a long time and affect the output time of downstream dependencies.
two。 It takes up a lot of resources in the early morning, and grabs a large number of cluster resources during the peak period of the task.
The stability of 3.ETL task is not good and the error needs to be solved in the early morning, which has a wide range of influence.
Second, why choose Delta?
In order to solve the increasingly acute problem of sky-level ETL, reduce resource costs and advance data output, we decided to convert titled 1-level ETL tasks into titled 0 real-time logs for storage, so that the data can be used as soon as the data is consistent.
Previously, we also implemented offline and real-time maintenance of a piece of data under the Lambda architecture, but there are still some thorny problems in practical use, such as lack of transactionality, cluster pressure caused by too many small files, query performance and other problems, which failed to achieve the ideal use.
So this time we have chosen the data lake architecture that has gradually come into everyone's view recently. I will not repeat the concept of data lake here. I understand that it is a kind of Table Format that regards metadata as big data. At present, the mainstream data lakes are Delta Lake (divided into open source version and commercial version), Hudi, Iceberg. All of them support ACID semantics, Upsert, Schema dynamic change, Time Travel and other features. Let's make some simple summary and comparison in other aspects:
Open source version of Delta
Advantages:
1. Support for streaming as source
2.Spark3.0 supports sql operation
Disadvantages:
1. Engine strongly binds Spark
two。 Manual Compaction
3.Join-style Merge with high cost
Hudi
Advantages:
1. Fast Upsert/Delete based on primary key
Two merge methods of 2.Copy on Write / Merge on Read are adapted to read and write scenarios optimization respectively.
3. Automatic Compaction
Disadvantages:
1. Write binding Spark/DeltaStreamer
2.API is more complex.
Iceberg
Advantages:
1. Pluggable engine
Disadvantages:
1. At the time of research, it is still in the stage of development, and some functions are not yet perfect.
2.Join-style Merge with high cost
During the research period, Ali Yun's classmates provided the EMR version of Delta, which optimized the function and performance based on the open source version, such as SparkSQL/Spark Streaming SQL integration, automatically synchronizing Delta metadata information to HiveMetaStore (MetaSync function), automatic Compaction, adapting to more query engines such as Tez, Hive, Presto, etc., optimizing query performance (Zorder/DataSkipping/Merge performance), and so on.
III. Practical process
During the testing phase, we reported back a number of bug of EMR Delta. For example, Delta table cannot automatically create Hive mapping table, Tez engine cannot read Hive table of Delta type normally, and Presto and Tez read Delta table data inconsistently, all of which are quickly supported by Ali Yun and solved one by one.
With the introduction of Delta, our real-time log storage architecture is as follows:
The data is reported to Kafka from each end, and then written to HDFS in the form of Delta through Spark task minute level. Then the mapping table of Delta table is automatically created in Hive, and the data can be queried and analyzed directly through Hive MR, Tez, Presto and other query engines.
Based on Spark, we encapsulate a universal ETL tool to achieve configuration access, and users can access the whole process of source data to Hive without writing code. And, in order to better adapt to the business scenario, we have implemented a variety of practical functions in the wrapper layer:
1. To achieve a hidden partition function similar to Iceberg, users can select certain columns to make appropriate changes to form a new column, which can be used as a partitioned column or as a new column, using SparkSql operation. For example, if there is a date column date, a new column can be generated through 'substr (date,1,4) as year' and can be used as a partition.
two。 In order to avoid partition errors caused by dirty data, the function of regular detection of dynamic partitions is realized. For example, Chinese partitions are not supported in Hive, users can add regular detection of'\ wicked'to dynamic partitions, and dirty data that does not match in partition fields will be filtered.
3. The function of custom event time field is realized, and any time field in the user's optional data falls into the corresponding partition as the event time to avoid the problem of data drift.
4. Nested Json custom layer parsing, most of our log data is in Json format, in which there are inevitably many nested Json. This feature allows users to select the number of parsing layers for nested Json, and nested fields will also be included in the table in the form of a single column.
5. Realize the function of SQL-based custom configuration dynamic partition, solve the real-time task performance problems caused by the skew of buried data, and optimize the use of resources, which will be described in detail later in this scenario.
Platform construction: we have embedded the whole process of log access to Hive into the Soul data platform. Users can apply for log access through this platform, and the corresponding parameters can be configured after examination and approval by the examination and approval personnel, so that the log can be connected to the Hive table in real time, which is easy to use and reduces operation costs.
In order to solve the problem of too many small files, EMR Delta implements Optimize/Vacuum syntax, which can merge small files periodically by executing Optimize syntax on Delta table, and clean up expired files by executing Vacuum syntax, so that the files on HDFS can keep the appropriate size and quantity. It is worth mentioning that EMR Delta also implements some auto-compaction strategies, which can automatically trigger compaction through configuration. For example, when the number of small files reaches a certain value, start minor compaction tasks in the streaming job phase, and achieve the purpose of merging small files with little impact on real-time tasks.
IV. Problem & solution
Next, let's talk about the problems we encountered in the process of landing Delta.
(1) the problem of data tilt caused by the uneven distribution of data volume in dynamic zoning of buried point data
The buried point data of Soul falls into the wide table of partition. According to the type of buried point, the data amount of different types of buried point is unevenly distributed. For example, in the process of writing Delta through Spark, 5min is a Batch, most types of buried point, the amount of data of 5min is very small (less than 10m), but a small amount of buried point data can reach 1G or more in 5min. When the data hits the ground, we assume that DataFrame has M partition, the table has N dynamic partitions, and the data in each partition is uniform and chaotic, then N files corresponding to N dynamic partitions will be generated in each partition, and N small files will be generated in each Batch.
In order to solve the above problems, press the dynamic partition field repartition to the DataFrame before data landing, so as to ensure that there are different partitions of data in each partition, so that each Batch will only generate N files, that is, one file for each dynamic partition, which solves the problem of small file expansion. But at the same time, the data of several partitions with too much data will only be distributed in one partition, which leads to the skew of some partition data, and the file generated by each Batch of these partitions is too large.
Solution: as shown in the figure below, we have implemented the user-defined configuration of repartition columns through SQL. To put it simply, users can use SQL to scatter several buried points with excessive data to multiple partition by adding salt, but there is no need to operate the buried points with normal data volume. Through this scheme, we increase the execution time of the slowest partition for each Batch in the Spark task from 3min to 40s, solving the problem that the file is too small or too large, as well as the performance problem caused by data skew.
(2) dynamic schema changes based on metadata in the application layer
The data lake supports dynamic schema changes, but before Spark is written, when constructing DataFrame, you need to obtain the data schema. If you cannot dynamically change it at this time, the new fields cannot be written into the Delta table, and the dynamic schena of Delta becomes a device. Buried point data due to different types, the fields of each buried point data are not exactly the same, so when setting the table, we must merge all the data fields as the schema of the Delta table, which requires us to be able to sense whether there are new fields when building the DataFrame.
Solution: we have designed an additional set of metadata. When Spark builds DataFrame, we first determine whether there are any new fields based on this metadata. If so, update the new fields to metadata, and use this metadata to build DataFrame for schema, which ensures that we are dynamically aware of schema changes in the application layer. With the dynamic schema changes of Delta, the new fields are automatically written to the Delta table and synchronized to the corresponding Hive table.
(3) data duplication caused by Spark Kafka offset submission mechanism
When we use Spark Streaming, we submit the consumer offset to Kafka after the data processing is complete, calling the
CommitAsync API in spark-streaming-kafka-0-10. I have been under the misconception that the data will be submitted to the current Batch consumption offset after processing. However, when we encountered data duplication in the Delta table, we found that the time to submit the offset was at the beginning of the next Batch, not after the completion of the current Batch data processing. Then the problem arises: if a batch 5min completes data processing in 3 minutes, and the data is successfully written to the Delta table, but the offset is not successfully submitted until after the 5min (when the second batch starts), if the task is restarted during the 3min-5min period, the data of the current batch will be consumed repeatedly, resulting in data duplication.
Solution:
1.StructStreaming supports exactly-once for Delta, which can be solved using StructStreaming adaptation.
two。 Consumption offsets can be maintained in other ways.
(4) it takes a long time to parse metadata when querying.
Because Delta maintains its own metadata separately, when using an external query engine to query, it is necessary to parse the metadata to obtain data file information. As the data of the Delta table grows, so does the metadata, and this operation becomes longer and longer.
Solution: Ali Yun is also constantly optimizing the query scheme to minimize the cost of parsing metadata through caching and other methods.
(5) about the CDC scene
At present, we implement the log Append scenario based on Delta, and there is another classic business scenario, CDC scenario. Delta itself supports Update/Delete and can be applied in CDC scenarios. However, based on our business considerations, Delta is not used in CDC scenarios for the time being. Because the Update/Delete mode of Delta table is Join-style Merge mode, our business table has a large amount of data, updates frequently, and a wide range of partitions are involved in updating data, so there may be performance problems on Merge.
Ali Yun's students are also constantly optimizing the performance of Merge, such as Join partition clipping, Bloomfilter, etc., which can effectively reduce the number of files in Join, especially for data updates in partition sets, resulting in a significant improvement in performance. We will also try to apply Delta to CDC scenarios later.
V. follow-up plan
1. Based on Delta Lake, further build and optimize the real-time warehouse structure, improve the real-time performance of some business indicators, and meet more and more real-time business needs.
two。 Get through our internal metadata platform to achieve log access-> real-time storage-> metadata + consanguinity integration and standardized management.
3. Continuously observe and optimize the computing performance of Delta queries, and try to use more features of Delta, such as Z-Ordering, to improve performance in ad hoc query and data analysis scenarios.
After reading the above, have you mastered the application practice of Delta Lake in Soul? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 225
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.