In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the relevant knowledge of "Z-Order methods for accelerating large-scale Hudi datasets". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
1. Background
Multidimensional analysis is a typical scenario of big data's analysis, which usually has filtering conditions. For this kind of query, especially the filtering query in the high base field, in theory, only we make a reasonable layout of the original data, combined with the relevant filtering conditions, the query engine can filter out a large number of irrelevant data, only need to read a small part of the data needed. For example, we sort the relevant fields before entering the database, so that the min-max values of the relevant fields of each file generated are not crossed. The query engine pushes the filter conditions to the data source combined with the min-max statistics of each file to filter out a large amount of irrelevant data. The above technologies are commonly referred to as data clustering and data skip. Direct sorting can produce a good effect on a single field, if multi-field sorting directly, then the effect will be greatly discounted, Z-Order can better solve the problem of multi-field sorting.
2. Z-Order introduction
Z-Order is a technology that can compress multi-dimensional data into one dimension, and it is widely used in spatio-temporal indexing and images. Z curve can fill the space of any dimension with an infinitely long one-dimensional curve. For a piece of data in the database, we can regard its multiple fields to be sorted as multiple dimensions of the data. Z-curve can map multi-dimensional data to one-dimensional data through certain rules, and then construct z-value and then sort based on the one-dimensional data. Z-value 's mapping rules ensure that after sorting, the data adjacent to the multi-dimensional dimension can still be close to each other on the one-dimensional curve.
Wiki definition: suppose there is a two-dimensional coordinate pair (x, y), these coordinates are on a two-dimensional plane, using Z sort, we can compress these coordinate pairs to one dimension.
Currently, Z-Order-based data Clustering technology is implemented in the commercial version of delta lake, while open source Spark/Hive/Presto does not support Z-Order.
3. Concrete realization
We next describe how to use Z-Order in Hudi in two parts:
Generation and sorting of z-value
Combine with Hudi
3.1Generation and sorting of z-value
This part is the core of the Z-Order strategy, and this part of the logic is common and applies to other frameworks as well.
The key to Z-Order lies in the mapping rules of z-value. Bit crossover-based techniques are given on wiki, and the bit crossover of each dimension value appears in the final z-value. For example, if we want to calculate the z-value of two-dimensional coordinates (x, y, 214), we can follow these steps
Step 1: represent each dimensional data in bits
X value:01100001y value:11010110
Step 2: starting from the leftmost bit of y, we cross x and y bit by bit to get the z value, as shown below
Z-value: 1011011000101001
For multi-dimensional data, we can use the same method to cross the bits of each dimension to form z-value, and once we generate z-values, we can use this value to sort. The natural formation of z-order curve based on z-value has a good aggregation effect for multiple dimensions that participate in the generation of z-value.
The above method of generating z-value looks very good, but in the actual production environment, we still need to solve the following problems if we want to use bit crossover technology to generate z-value:
The above introduction is based on incremental data of multiple unsigned int types, generating z-value through bit crossover. There are actually a variety of data types, how to deal with other types of data
Different types of dimension values are converted to bit bits, and how to deal with inconsistent length
How to choose the reasonable data type to save z-value and the corresponding z-value sorting strategy
In order to solve the above problems, we use two strategies to generate z value.
3.1.1 z-value generation method based on mapping strategy
The first problem: using different conversion strategies for different data types
Unsigned type integers: directly converted to bits bit representation
Data of Int type: it will be problematic to convert directly to binary representation, because the highest bit (symbol bit) of negative numbers in java is 1, while the highest bit of positive integers is 0 (as shown in the following figure). After direct conversion, negative numbers will be greater than positive numbers.
Decimal binary 00000 000010000 0000101260111 11101270111 1111-128100000000-12710000001-12610000010-21111 1110-1111111111
For this problem, we can directly reverse the highest bit of the binary to ensure that the order of the converted dictionary is the same as the original value. The figure below is as follows
Decimal highest bit inversion after decimal 00000 00001000 000012810000 00011000 000112920000 00101000 1301260111 1110111110254127011111111255-12810000000000000-1261000000000102-21111111111110126-111111110111112121111110111111111101111111111011111111011111111011111111110111111111101111111101111110111111110111111110111111110111111110111111110111111110111111110111111011111111011111101111110111111110111111110111111110111111110111111111101111111101111
Data of Long type: converts to binary form and reverses the highest bit in the same way as Int type
Data of Double and Float types: converted to Long type, then converted to binary form and inverted the highest bit
Decimal/Date/TimeStamp type data: converted to long type and then represented directly in binary.
Data of type UTF-8 String: data of type String can be represented directly in binary to maintain the original natural order, but strings are indefinite in length and cannot be directly used for bit crossover. We use the following strategy to truncate strings of type string greater than 8bytes to 8bytes, and fill string with insufficient 8bytes as 8bytes.
Null value processing:
The null of a numeric type is directly changed to the maximum value of that numeric type, and then converted as described above
The String type null is converted directly into an empty string before conversion.
The second problem: the generated binary values can be aligned uniformly according to 64 bits.
The third problem: you can use Array [Byte] to save the z value (refer to the DynamoDB of Amazon to limit the length of the array by 1024). For Array [Byte] type data sorting, hbase's rowkey sorter can be used to solve this problem directly.
The z-value generation method based on mapping strategy is convenient, fast and easy to understand, but it has some defects.
The fields involved in generating z-value theoretically need to be positive integers starting at 0 in order to generate a good z curve. It is impossible for such a perfect situation to happen in a real data set, and the effect of zorder will be reduced. For example, the value of x field (0,1,2) and the value of y field (100,200,300). The z-value generated by x and y is only part of the complete z curve, and the effect of sorting it with z value is the same as that of sorting with x directly. For example, when the base value of x is much lower than that of y, the sorting effect of the above strategy is basically the same as sorting by y, and the real effect is better than sorting by x and then by y.
String type processing, the above strategy is to take the first 8 bytes of the string type to participate in the z value calculation, which will result in a loss of precision. It can not be handled when the strings are all the same string prefix, for example, the first 8 bytes of the two strings "https://www.baidu.com"," https://www.google.com" are exactly the same, so it doesn't make any sense for such data to intercept the first 8 bytes to participate in the calculation of the z value.
The main reason for the flaw in the above strategy is that the distribution of data is not always so good. There is a simple scheme to solve the above problems: do global Rank for all dimension values involved in z-value calculation, and use Rank value instead of its original value to participate in z-value calculation. Because Rank value must be a positive integer starting from 0, it fully meets the construction condition of z-value and solves the above problems better. In the experiment, we find that this method of using Rank value is indeed very effective, but the efficiency of generating z value is very low, and the cost of computing engine to do global Rank is very high. The bottleneck of the efficiency of the method based on Rank is to do global Rank calculation. So can we sample the original data to reduce the amount of data and calculate the z value with the sampled data? the answer is yes.
/ * * Generates z-value*/val newRDD = df.rdd.map {row = > val values = zFields.map {case (index Field) = > field.dataType match {case LongType = > ZOrderingUtil.longTo8Byte (row.getLong (index)) case DoubleType = > ZOrderingUtil.doubleTo8Byte (row.getDouble (index)) case IntegerType = > ZOrderingUtil.intTo8Byte (row.getInt (index)) case FloatType = > ZOrderingUtil.doubleTo8Byte (row.getFloat (index) .toDouble) case StringType = > ZOrderingUtil.utf8To8Byte (row.getString (index) case DateType = > ZOrderingUtil.longTo8Byte (row.getDate (index) .getTime) case TimestampType = > ZOrderingUtil.longTo8Byte (row.getTimestamp (index) .getTime) case ByteType = > ZOrderingUtil.byteTo8Byte (row.getByte (index)) case ShortType = > ZOrderingUtil.intTo8Byte (row.getShort (index) .toInt) case d: DecimalType = > ZOrderingUtil.longTo8Byte (row.getDecimal (index). LongValue () case _ = > null}} .filter (v = > v! = null). ToArray val zValues = ZOrderingUtil.interleaveMulti8Byte (values) Row.fromSeq (row.toSeq + + Seq (zValues))} .sortBy (x = > ZorderingBinarySort (x.getas [Array [Byte]] (fieldNum)) 3.1.2 z-value generation strategy based on RangeBounds
Before introducing the z-value generation strategy based on RangeBounds, let's take a look at the sorting process of Spark. Spark sorting is roughly divided into two steps.
Sampling the key of the input data to estimate the distribution of key, divide it into range according to the specified number of partitions and sort them. The calculated rangeBounds is an array of length numPartition-1, in which each element represents the upper / lower bound of the key value in a partition.
During the shuffle write process, which partition each input key should be assigned to is determined by the rangeBounds calculated in the first step. Although the data in each partition is not sorted, note that the rangeBounds is ordered, so macroscopically, the data in each partition is ordered, so you only need to sort the data in each partition to ensure the global order of the data.
With reference to the sorting process of Spark, we can do this
Filter and sort the number of Range of each field participating in Z-Order, and calculate the RangeBounds of each field.
In the actual mapping process, each field is mapped to a subscript in the rangeBounds where the data resides, and then participates in the calculation of the z-value. It can be seen that because the interval subscript is a positive integer increasing from 0, the condition of z-value generation is fully satisfied, and the field mapping problem of String type is also solved. The z-value generation method based on RangeBounds solves the defects faced by the first method very well. Because the process of generating RangeBounds by one-step sampling is not as efficient as the first scheme, we implement the above two z-value generation methods for choice.
/ * * Generates z-value * / val indexRdd = internalRdd.mapPartitionsInternal {iter = > val bounds = boundBroadCast.valueval origin_Projections = sortingExpressions.map {se = > UnsafeProjection.create (Seq (se), outputAttributes)} iter.map {unsafeRow = > val interleaveValues = origin_Projections.zip (origin_lazyGeneratedOrderings). ZipWithIndex.map {case ((rowProject, lazyOrdering), index) = > val row = rowProject (unsafeRow) val decisionBound = new DecisionBound (sampleRdd LazyOrdering) if (row.isNullAt (0)) {bounds (index). Length + 1} else {decisionBound.getBound (row, bounds (index). AsInstanceOf [Array [InternalRow])}} .toArray.map (ZOrderingUtil.toBytes (_)) val zValues = ZOrderingUtil.interleaveMulti4Byte (interleaveValues) val mutablePair = new MutablePair [InternalRow, Array [Byte]] () mutablePair.update (unsafeRow, zValues)} .sortBy (x = > ZorderingBinarySort (x.row)) NumPartitions = fileNum) .map (_. _ 1) 3.2binds to Hudi
The combination with Hudi is roughly divided into two parts.
3.2.1 Z-sort reorganization of table data
This piece is relatively simple, with the help of the Clustering mechanism within Hudi combined with the above z-value generation sorting strategy, we can directly complete the data reorganization of Hudi table data, which will not be described in detail here.
3.2.2 collect and save statistics
As a matter of fact, RFC27 is already doing this, and it feels like a bit of repetitive work. Let's briefly introduce our implementation. After the data is reorganized, we need to collect the min/max/nullCount statistics of each field participating in the z-value calculation for each file after the reorganization. For statistics collection, you can read the Parquet file or collect it through SparkSQL
Read Parquet files to collect statistics
/ * * collect statistic info*/val sc = df.sparkSession.sparkContextval serializableConfiguration = new SerializableConfiguration (conf) val numParallelism = inputFiles.size/3val previousJobDescription = sc.getLocalProperty (SparkContext.SPARK_JOB_DESCRIPTION) try {val description = s "Listing parquet column statistics" sc.setJobDescription (description) sc.parallelize (inputFiles, numParallelism). MapPartitions {paths = > val hadoopConf = serializableConfiguration.value paths.map (new Path (_)). FlatMap {filePath = > val blocks = ParquetFileReader.readFooter (hadoopConf FilePath). GetBlocks (). AsScala blocks.flatMap (b = > b.getColumns (). AsScala. Map (col = > (col.getPath (). ToDotString (), FileStats (col.getStatistics (). MinAsString (), col.getStatistics (). MaxAsString (), col.getStatistics.getNumNulls.toInt)) .groupBy (x = > x.room1) .mapValues (v = > v.map (vv = > vv._2)). MapValues (value = > FileStats (value.map (_ .minVal). Min, value.map (_ .maxVal). Max, value.map (_ .num _ nulls) .max). ToSeq. Map (x = > ColumnFileStats (filePath.getName (), x.room1, x._2.minVal, x._2.maxVal, x._2.num_nulls))} .filter (p = > cols.contains (p.colName))} .collect ()} finally {sc.setJobDescription (previousJobDescription)}
Collect statistics through SparkSQL
/ * * collect statistic info*/val inputFiles = df.inputFilesval conf = df.sparkSession.sparkContext.hadoopConfigurationval values = cols.flatMap (c = > Seq (min (col (c) .as (c + "_ minValue"), max (col (c)) .as (c + "_ maxValue"), count (c) .as (c + "_ noNullCount")) val valueCounts = count ("*"). As ("totalNum") val projectValues = Seq (col ("file")) + + cols.flatMap (c = > Seq (col (c + "_ minValue")) Col (c + "_ maxValue"), expr (s "totalNum-${c +" _ noNullCount "}") .as (c + "_ num_nulls")) val result = df.select (input_file_name () as "file", col ("*")) .groupBy ($"file") .agg (valueCounts, values: _ *). Select (projectValues:_*) result
This information is then saved in the hoodie directory in the Hudi table under the index directory, and then used by the Spark query.
3.2.3 apply to Spark query
In order to apply the statistical information to the Spark query, we need to modify the file filtering logic of HudiIndex, convert the DataFilter into the filtering of the Index table, select the candidate files to be read, and return them to the query engine.
Load the index table into IndexDataFrame
Build a data filter for IndexDataFrame using the original query filter
Query IndexDataFrame to select candidate files
Use these candidate files to rebuild HudiMemoryIndex
Build a data filter for IndexDataFrame through min/ max value and null count information. Because the min/ max values of each field involved in z value calculation after z sorting are not intersected in each file, the filtering of Index table can filter out a large number of files.
/ * * convert filter * / def createZindexFilter (condition: Expression): Expression = {val minValue = (colName: Seq [String]) = > col (UnresolvedAttribute (colName) + "_ minValue"). Expr val maxValue = (colName: Seq [String]) = > col (UnresolvedAttribute (colName) + "_ maxValue"). Expr val num_nulls = (colName: Seq [String]) = > col (UnresolvedAttribute (colName) + "_ num_nulls"). Expr condition match {case EqualTo (attribute: AttributeReference) Value: Literal) = > val colName = HudiMergeIntoUtils.getTargetColNameParts (attribute) And (LessThanOrEqual (minValue (colName), value), GreaterThanOrEqual (maxValue (colName), value) case EqualTo (value: Literal, attribute: AttributeReference) = > val colName = HudiMergeIntoUtils.getTargetColNameParts (attribute) And (LessThanOrEqual (minValue (colName), value), GreaterThanOrEqual (maxValue (colName), value) case equalNullSafe @ EqualNullSafe (_: AttributeReference, _ @ Literal (null) _)) = > val colName = HudiMergeIntoUtils.getTargetColNameParts (equalNullSafe.left) EqualTo (num_nulls (colName), equalNullSafe.right) .4. Test result
The amount of test data and resource usage are consistent with databrick. The only difference is that we only generated 10000 files, compared with 100w files. The test results show that the acceleration ratio of zorder is also considerable, and the effect of Z-Order will get better and better with the increase of the number of files. We will also test it at the 100w file level later.
Table name time (s) conn_random_parquet89.3conn_zorder19.4conn_zorder_only_ip18.2 "Z-Order method for accelerating large-scale Hudi datasets" ends here, thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.