In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "what is the use of Flink integrated iceberg in the production environment". In daily operation, I believe that many people have doubts about the use of Flink integrated iceberg in the production environment. Xiaobian consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts of "what is the use of Flink integrated iceberg in the production environment?" Next, please follow the editor to study!
Background
In the field of big data processing, there is a very common but troublesome problem, that is, the problem of hdfs small files, which we have been plagued by for a long time. At the beginning, we wrote a small file compression tool ourselves, which merged regularly. The principle is to write the data to be compressed into a new temporary folder. After the data is compressed, it is checked with the original data. After the data is consistent, the original data is overwritten with the compressed data, but there are a lot of problems because the transaction cannot be guaranteed. For example, when the data is written at the same time, the verification will fail. Causes the merge of small files to fail, and can not be merged in real time, can only be merged according to the partition one day ago. Or an hour ago, the latest data still has small file problems, resulting in poor query performance.
So based on some of the above problems, I investigated the data lake technology. Because our streaming data is mainly flink, the query engine is presto, and hudi is strongly coupled with spark, the support for flink is not very friendly, so I decided to introduce iceberg. After functional testing of iceberg and simple code review, I found that iceberg still needs to be optimized and improved in flink, but I think we should be able to live in hold, imperfections and places that need to be optimized, so we finally introduced iceberg to solve the problem of small files.
In addition, some other problems, such as the access of cdc data and the deletion of data according to query conditions, can also be solved through data lake technology.
Flink streaming data is written to iceberg
Our main usage scenario is to use flink to write the streaming data of kafka to Iceberg. For the sake of simplicity and maintainability of the code, we try to write the program in sql. The sample code is as follows:
/ / create catalog CREATE CATALOG iceberg WITH ('type'='iceberg',' catalog-type'='hive', "+ 'warehouse'='hdfs://localhost/user/hive/warehouse',' uri'='thrift://localhost:9083') / / create table CREATE TABLE iceberg.tmp.iceberg_table (id BIGINT COMMENT 'unique id', data STRING, d int) PARTITIONED BY (d) WITH (' connector'='iceberg' 'write.format.default'='orc') / / insert into insert into iceberg.tmp.iceberg_table select * from kafka_table
Tip: remember to turn on checkpoint
Compress small files
At present, compressing small files is carried out with an additional batch task, Iceberg provides a spark version of action, I found some problems when doing functional testing, in addition, I am not very familiar with spark, worried that the problem is not easy to troubleshoot, so reference to the spark version of their own implementation of a flink version, and repaired some bug, made some functional optimization.
Since the metadata of our iceberg is stored in hive, the logic of the compression program is that I look up all the iceberg tables in hive and compress them in turn. There is no filtering condition for compression, and full table compression is performed for both partitioned and non-partitioned tables. This is done to handle some flink tasks that use eventtime, if there is a delay in the arrival of data. The data is written to the previous partition, and the newly written data for other days will not be compressed if only the current day's partition is not compressed by full table compression.
Code examples refer to:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); Actions.forTable (env, table) .rewriteDataFiles () / / .maxParallelism (parallelism) / / .filter (Expressions.equal ("day", day)) / / .targetSizeInBytes (targetSizeInBytes). Execute ()
For specific information about compressed small files, please refer to this article [merging small files in Flink integrated iceberg data lake].
Snapshot expiration processing
Our snapshot expiration policy is written together with the batch task of compressing small files. After compressing small files, we perform snapshot expiration processing of tables. The current retention time is one hour. This is because for some larger tables, there are more partitions, and the checkpoint is relatively short. If the snapshot is too long, we will retain too many small files. We do not need to query historical snapshots for the time being. So I set the snapshot retention time for one hour.
Long olderThanTimestamp = System.currentTimeMillis ()-TimeUnit.HOURS.toMillis (1); table.expireSnapshots () / / .retainLast (20) .commitreOlderThan (olderThanTimestamp) .commit (); data management
After writing the data, sometimes I want to see how many data files are under the corresponding snapshot and directly query hdfs you don't know which is useful and which is useless. Therefore, we need to have corresponding management tools. At present, flink is not very mature, we can use the tools provided by spark3 to check it out.
Ddl
At present, we do these operations through create table through flink sql client. Other related ddl operations can be done using spark:
Https://iceberg.apache.org/spark/#ddl-commands
Dml
Some related data operations, such as deleting data, can be realized through spark. Currently, presto only supports deletion at the partition level.
Remove orphaned files timing task deletion
In the process of using iceberg, there is sometimes a situation where I submit a flink task, and for various reasons, I stop it, and at this time iceberg has not yet submitted the corresponding snapshot. In addition, due to some exceptions that cause the program to fail, there will be some isolated data files that are not in the iceberg metadata, which are unreachable and useless to iceberg. So we need to clean up these files like jvm garbage collection.
At present, iceberg provides a spark version of action to deal with these useless files, and we adopt the same strategy as compressing small files to get all the iceberg tables in hive. Perform scheduled tasks every other hour to delete these useless files.
SparkSession spark =. Actions.forTable (spark, table) .removeOrphanFiles () / / .deleteWith (...) .execute (); step on the pit
During the running of the program, there was a problem that the normal data files were deleted. After investigation, because my snapshot retention setting is one hour, the cleanup time of this cleanup program is also set to one hour. Through the log, it is found that the cleanup program deleted the normal data. Check the code, think that they should be set the same time, when cleaning isolated files, there are other programs are reading and writing tables, because this cleanup program has no transactions, resulting in the deletion of normal data. Finally, the cleanup time of the cleanup program was changed to the default three days, and there was no problem of deleting data files. Of course, to be on the safe side, we can overwrite the original method of deleting files, change the files to a backup folder, check that there is no problem, and delete them manually.
Use presto for query
The version we are currently using is prestosql 346.It is relatively easy to install this version by querying iceberg with jdk11,presto. The official provides the corresponding conncter, we just need to configure it.
/ / iceberg.propertiesconnector.name=iceberghive.metastore.uri=thrift://localhost:9083 batch task processing manual execution of sql batch task
To query the batch task of iceberg and the client of flink, we first start a yarn session-based flink cluster, and then submit the task to the cluster through the sql client.
The main configuration is that we need to set the parallelism of sql task execution according to the size of the data, which can be set by the following parameters.
Set table.exec.resource.default-parallelism = 100
In addition, I have configured the corresponding catalog for hive and iceberg in the configuration file of the sql client, so that there is no need to build catalog every time the client starts.
Catalogs: # empty list-name: iceberg type: iceberg warehouse: hdfs://localhost/user/hive2/warehouse uri: thrift://localhost:9083 catalog-type: hive cache-enabled: false-name: hive type: hive hive-conf-dir: / Users/user/work/hive/conf default-database: default scheduled task
Currently, flink's sql client is not as good as hive for scheduled batch tasks, such as executing hive-f to execute a file. And different tasks require different resources, parallelism and so on. So I encapsulated a flinK program by calling this program to process, read the sql in a specified file, and submit the batch task. Control the resources and parallelism of tasks on the command line.
/ home/flink/bin/flink run-p 10-m yarn-cluster / home/work/iceberg-scheduler.jar my.sql optimization
In the query section of batch tasks, some optimizations have been made, such as limit push-down, filter push-down, query parallelism optimization, etc., which can greatly improve the speed of the query, and these optimizations have been pushed back to the community.
Data migration
At present, all our data is stored in the hive table. After verifying iceberg, we decided to migrate the data from hive to iceberg, so I wrote a tool that can use the data from hive, and then create a new iceberg table to create the corresponding metadata for it, but during the test, we found that if we use this method, we need to stop the program that writes to hive, because if iceberg and hive use the same data file. The compression program will constantly compress the small files of the iceberg table, and after the compression, the old data will not be deleted immediately, so the hive table will find double data. In view of the fact that there is still some instability during the iceberg test, we adopt the double-write strategy, the original program that writes hive remains unchanged, and a new set of programs is started to write to iceberg, so that we can observe the iceberg table for a period of time. It can also compare with the data in the original hive to verify the correctness of the program.
After a period of observation, every day nearly 2 billion data hive table and iceberg table, a piece of data is not bad. So after you finally have no problem comparing the data, stop writing the hive table, use the new iceberg table, and then import the old data from hive into iceberg.
At this point, the study on "what is the use of Flink integrated iceberg in the production environment" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.