In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
The purpose of this article is to share with you about the improvements in the ease of use of Flink 1.11. the editor thinks it is very practical, so I share it with you. I hope you can get something after reading this article.
Flink 1.11.0 was officially released on July 7. As one of this version of release manager, I would like to share with you the experience and interpretation of some representative feature. Before entering into the in-depth interpretation, let's briefly understand the general process of community release to help you better understand and participate in the work of the Flink community.
First of all, at the beginning of the planning of each version, 1-2 volunteers will be selected as Release Manager. Version 1.11.0 I am the Release Manager on the Chinese side, and there is also a Piotr Nowojski from Ververica as the Release Manager of the German side, which to some extent shows that the proportion of Chinese developers and contributions in the whole community is very important.
This version of Feature Kickoff will be done next. In some major directions, the planning cycle of the community may be relatively long, and it will be completed in stages and steps across multiple versions to ensure quality. The focus of each version will also be different, for example, the first two versions focus on batch enhancement, while this version focuses more on the improvement of streaming ease of use. The Feature list of community planning initiates discussions in the mailing list to collect more user / developer comments and feedback.
The general development cycle is 2-3 months, and the approximate Feature Freeze time is clearly planned in advance, followed by Release Candidate release and testing, as well as Bug Fix. Usually after several iterations, a relatively stable version of Candidate will be formally voted for, and then officially released based on this version.
Flink 1.11.0 has enhanced and improved the ecology, ease of use, production availability and stability of Flink in almost 4 months from the functional planning in early March to the official release in early July. I will share with you below.
A summary
Flink 1.11.0 Candidate was released 4 times after Feature was frozen before it finally passed. According to statistics, a total of 236 contributors participated in this version development, solved 1474 Jira problems, involving more than 30 FLIP, and submitted 2325 Commit.
Throughout the last five releases, we can see that Flink has entered a stage of rapid development since 1.9.0, and the indicators of all dimensions have almost doubled compared with the previous ones. Also from 1.9.0, Alibaba's internal Blink project began to be integrated by open source Flink, to 1.10.0 after two major versions have been fully integrated, the ecological construction, functionality, performance and production stability of Flink have been greatly enhanced.
The initial positioning of Flink version 1.11.0 is to focus on solving ease-of-use problems and improving the production and use experience of users' business, without making large architectural adjustments and functional development as a whole, and tend to develop small versions with rapid iteration. However, judging from the above statistical indicators, the data of the so-called "small version" in all dimensions is not inferior to that of the first two major versions, and the number of problem-solving and participating contributors has also continued to increase. The proportion of contributors from China reached 62%.
Below, we will take an in-depth analysis of the long-awaited features brought about by Flink 1.11.0. From the API layer directly used by users to the executive engine layer, we will choose some representative Feature to interpret it from different dimensions. For a more complete Feature list, please follow the released Release Blog.
Improvement of ecology and improvement of ease of use
To some extent, these two dimensions complement each other, and it is difficult to distinguish them strictly. The lack of ecological compatibility often causes inconvenience in use, and the process of improving ease of use is often a process of constantly improving the relevant ecology. The most obvious user perception in this respect should be the use of the Table & SQL API level.
1 Table & SQL supports Change Data Capture (CDC)
CDC is widely used in scenarios such as copying data, updating caches, synchronizing data between microservices, and auditing logs. Many companies are using open source CDC tools, such as MySQL CDC. It is a strong requirement to access and parse CDC in Table & SQL through Flink, which has been mentioned in many previous discussions. It can help users deal with Changelog streams in real-time, and further expand the application scenarios of Flink, such as synchronizing the data in MySQL to PG or ElasticSearch, low-latency Temporal Join, Changelog and so on.
In addition to considering the real requirements above, the concept of "Dynamic Table" defined in Flink has two models on the flow: the Append pattern and the Update pattern. Converting streams to "Dynamic Table" through Append mode has been supported in previous versions, so further support for Update pattern in 1.11.0 also fully implements "Dynamic Table" at the conceptual level.
In order to support parsing and outputting Changelog, how to encode and decode these update operations between external systems and Flink systems is the primary problem to be solved. Considering that Source and Sink are a bridge to external systems, FLIP-95 solves this problem when defining new Table Source and Table Sink interfaces.
In the public CDC research report, Debezium and Canal are the most popular CDC tools used by users to synchronize Changelog to other systems, such as message queues. Accordingly, FLIP-105 first supports both Debezium and Canal formats, and Kafka Source already supports parsing the above formats and outputting update events, and Avro (Debezium) and Protobuf (Canal) will be further supported in subsequent releases.
CREATE TABLE my_table (...) WITH ('connector'='...',-- e.g.' kafka' 'format'='debezium-json',' debezium-json.schema-include'='true'-- default: false (Debezium can be configured to include or exclude the message schema) 'debezium-json.ignore-parse-errors'='true'-- default: false)
2 Table & SQL supports JDBC Catalog
Before 1.11.0, if users relied on Flink's Source/Sink to read and write relational databases or read Changelog, they had to create the corresponding Schema manually. And when the Schema in the database changes, you also need to manually update the corresponding Flink job to maintain consistency and type matching. Any mismatch will cause the job to fail by running the Times. Users often complain about this seemingly redundant and tedious process with a very poor experience.
In fact, there may be similar problems for any external system connected to Flink, which focused on interfacing with relational databases in 1.11.0. FLIP-93 provides the basic interface of JDBC catalog and the implementation of Postgres catalog, which is convenient for subsequent implementations to interface with other types of relational databases.
After version 1.11.0, users can automatically get the Schema of the table when using Flink SQL without the need to enter DDL. In addition, any Schema mismatch errors will be checked in advance at compile time to avoid job failures caused by previous errors in running the Times. This is a typical example of improving ease of use and user experience.
3 Hive real-time data warehouse
From version 1.9.0, Flink is committed to integrating Hive from an ecological point of view, with the goal of creating an integrated Hive warehouse. After the iteration of the first two versions, it has achieved Batch compatibility and production availability, and its performance is more than 7 times that of Hive 3.0 under TPC-DS 10T Benchmark.
1.11.0 the real-time data warehouse scheme is implemented in Hive ecology, which improves the user experience of end-to-end flow ETL and achieves the goal of batch-flow integrated Hive data warehouse. At the same time, it has been further strengthened in the aspects of compatibility, performance and ease of use.
In the solution of real-time data warehouse, we can read and write Hive in real time by virtue of the streaming advantage of Flink:
Hive write: FLIP-115 improves and expands the basic capabilities and implementation of FileSystem Connector. Sink in the Table/SQL layer can support a variety of formats (CSV, Json, Avro, Parquet, ORC) and all formats of Hive Table.
Partition support: data import Hive introduces Partition submission mechanism to control visibility, controls the timing of Partition submission through sink.partition-commit.trigger, selects submission strategy through sink.partition-commit.policy.kind, and supports SUCCESS file and Metastore submission.
Hive read: real-time streaming read Hive, incremental reading of new files by monitoring Partition generation increments, or monitoring the generation of new files in folders.
Improvements in Hive usability:
FLIP-123 provides syntax compatibility for users through Hive Dialect, so that users do not need to switch between Flink and Hive's CLI, and can directly migrate Hive scripts to Flink for execution.
Provide built-in support for Hive-related dependencies to avoid users downloading the related dependencies they need. Now you just need to download a separate package and configure HADOOP_CLASSPATH to run it.
In terms of Hive performance, ORC (Hive 2 +) vectorization reading is already supported in 1.10.0, and we have completed all versions of Parquet and ORC vectorization support in 1.11.0 to improve performance.
4 New Source API
As mentioned earlier, Source and Sink are a bridge between Flink and external systems, and are very important for improving ecology, usability, and end-to-end user experience. The community has planned a thorough refactoring of the Source side as early as a year ago, and it can be seen from FLIP-27 's ID that it is an early Feature. However, because it involves a lot of complex internal mechanisms and takes into account the implementation of various Source Connector, the design needs to be very comprehensive. I started to implement POC from 1.10.0, and finally caught up with the release of 1.11.0.
Let's briefly review the main issues before Source:
For users, it is not easy to transform the existing Source or re-implement a production-level Source Connector in Flink, which is embodied in that there is no common code to reuse, and it needs to understand a lot of Flink internal details and implement specific Event Time allocation, Watermark output, Idleness monitoring, threading model and so on.
The batch and stream scenarios need to implement different Source.
The concept of Partitions/Splits/Shards is not explicitly expressed in the interface, such as the discovery logic and data consumption of Split are coupled in the implementation of Source Sunction, which adds complexity to the implementation of Source of type Kafka or Kinesis.
In the Runtime execution layer, the preemption of Checkpoint locks by Source Function brings a series of problems, and it is difficult for the framework to optimize.
FLIP-27 is designed with the above pain points in mind:
Firstly, two different components, Split Enumerator and Source Reader, are introduced into Job Manager and Task Manager respectively, which decouple Split discovery and corresponding consumption processing, and facilitate the combination of different strategies at will. For example, in the existing Kafka Connector, there are many different Partition discovery strategies and implementations coupled together. Under the new architecture, we only need to implement one Source Reader to adapt to a variety of Split Enumerator implementations to correspond to different Partition discovery strategies.
Source Connector implemented under the new architecture can unify batches and streams. The only small difference is the limited input to batch scenarios. Split Enumerator produces a fixed number of Split sets and each Split is a finite data set; for unlimited input in stream scenarios, Split Enumerator either produces an infinite number of Split or Split itself is an infinite dataset.
Complex Timestamp Assigner and Watermark Generator are transparently built-in to run in the Source Reader module and are imperceptible to users. In this way, if users want to implement a new Source Connector, they generally no longer need to implement this part of the function repeatedly.
At present, the existing Source Connector of Flink will be reimplemented based on the new architecture in subsequent versions, and Legacy Source will continue to maintain several versions to maintain compatibility. Users can also follow the instructions in the Release documentation to try to experience the development of the new Source.
5 PyFlink ecology
As we all know, Python is widely used in the field of machine learning and data analysis. Flink has been compatible with Python Ecology since version 1.9.0. Python and Flink work together as PyFlink to output the real-time distributed processing capabilities of Flink to Python users. The first two versions of PyFlink already support Python Table API and UDF. In 1.11.0, the support for Python ecological library Pandas and the integration with SQL DDL/Client are expanded. At the same time, the performance of Python UDF has been greatly improved.
Specifically, a normal Python UDF can only process one piece of data per call, and serialization / deserialization is required on both the Java side and the Python side, which is expensive. 1.11.0 Flink supports customization and use of vectorized Python UDF in Table & SQL jobs. Users only need to add an additional parameter udf_type= "pandas" to the UDF modification. The benefits of this are:
Each call can process N pieces of data.
The data format is based on Apache Arrow, which greatly reduces the serialization / deserialization overhead between Java and Python processes.
It is convenient for Python users to develop high-performance Python UDF based on Python libraries commonly used in the field of data analysis, such as Numpy and Pandas.
In addition, PyFlink in 1.11.0 also supports:
Seamless switching between PyFlink table and Pandas DataFrame (FLIP-120) enhances the ease of use and compatibility of the Pandas ecosystem.
Python UDTF (FLINK-14500) can be defined and used in Table & SQL, and Java/Scala UDTF is no longer required.
Cython optimizes the performance of Python UDF (FLIP-121), which is 30 times higher than 1.10.0.
User-defined Metric (FLIP-112) in Python UDF makes it easy to monitor and debug the execution of UDF.
The above interpretation is focused on the API level, user development jobs can directly perceive the improvement of ease of use. Let's take a look at some interesting changes in the execution engine layer in 1.11.0.
(3) improvement of production availability and stability
1 supports Application mode and Kubernetes enhancements
Before version 1.11.0, Flink mainly supported the following two modes:
Session mode: start a cluster ahead of time, and all jobs share the resources of the cluster. The advantage is to avoid the extra overhead of starting the cluster separately for each job, and the disadvantage is that it is slightly less isolated. If a job hangs a Task Manager (TM) container, it will cause all jobs in that container to restart accordingly. Although each job has its own independent Job Manager (JM) to manage, but these JM are running in a process, it is easy to bring load bottlenecks.
Per-job mode: in order to solve the problem of poor isolation of Session mode, each job starts an independent cluster according to resource requirements, and the JM of each job also runs in an independent process, with a relatively small load.
The common problem of the above two modes is that the user code needs to be executed on the client side, and the corresponding Job Graph is compiled and generated and submitted to the cluster to run. In this process, the relevant Jar packages need to be downloaded and uploaded to the cluster, and the client and network load pressure can easily become a bottleneck, especially when a client is shared by multiple users.
1.11.0 introduced the Application pattern (FLIP-85) to solve the above problems, starting a cluster according to the Application granularity, and all Job belonging to this Application are running in this cluster. The core is that the generation of Job Graph and the submission of jobs are not executed on the client side, but transferred to the JM side, so that the load of network download and upload will also be distributed to the cluster, and there is no longer the bottleneck on the Client single point mentioned above.
Users can use the Application mode through bin/flink run-application, which is currently supported by both Yarn and Kubernetes (K8s). Yarn application passes all the dependencies needed to run the job to JM through Yarn Local Resource on the client side. K8s Application allows users to build images containing user Jar and dependencies, automatically create TM based on jobs, and destroy the entire cluster after completion, which is more isolated than Session mode. K8s no longer has the strict Per-Job mode, and the Application mode is equivalent to the implementation of Per-Job submitting jobs in the cluster.
In addition to supporting the Application mode, Flink native K8s perfected a number of basic functional features (FLINK-14460) in 1.11.0 to meet production availability standards. For example, Node Selector, Label, Annotation, Toleration, etc. In order to integrate with Hadoop more conveniently, it also supports the function of automatically mounting Hadoop configuration according to environment variables.
2 Checkpoint & Savepoint optimization
Checkpoint and Savepoint mechanisms have always been one of the core competencies for Flink to maintain its advanced nature, and the community has made cautious changes in this area, with few major functional and architectural adjustments in recent major versions. In the user mailing list, we can often see user feedback and complaints about related problems, such as Checkpoint failed for a long time, Savepoint is not available after job restart, and so on. 1.11.0 selectively solves some common problems in this area and improves the availability and stability of production.
Before 1.11.0, Meta data and State data in Savepoint were stored in two different directories, so if you want to migrate a State directory, it is difficult to identify this mapping, and it may also cause the directory to be deleted incorrectly, and it is also troublesome for directory cleaning. 1.11.0 integrate the two parts of data into one directory, which facilitates overall transfer and reuse. In addition, before Meta references State, the absolute path is used, so the path changes after State directory migration is not available. 1.11.0 changes the State reference to a relative path to solve this problem (FLINK-5763), so that the management and maintenance of Savepoint is more flexible and convenient.
In the actual production environment, users often encounter the problems caused by Checkpoint timeout failure and long-time failure. Once the job failover will cause a large amount of historical data to be played back, the job has no progress for a long time, and the end-to-end delay increases. 1.11.0 improves the optimization and speed up of Checkpoint from different dimensions, with the goal of achieving lightweight Checkpoint in minutes or even seconds.
First of all, the mechanism of Checkpoint Coordinator notifying Task to cancel Checkpoint (FLINK-8871) is added to avoid unnecessary pressure on the system when the Task side is still executing the cancelled Checkpoint. At the same time, the Task side abandons the cancelled Checkpoint, which can participate in the execution of the newly triggered Checkpoint of the Coordinator more quickly. to some extent, it can also prevent the new Checkpoint from failing to execute the timeout again. This optimization also provides convenience for enabling Local Recovery by default later, so that the Task side can clean up the resources of the invalid Checkpoint in time.
Secondly, in the reverse pressure scenario, the entire data link accumulates a large number of Buffer, resulting in Checkpoint Barrier ranking behind the data Buffer, which can not be processed and aligned by Task in time, which leads to the failure of Checkpoint execution for a long time. 1.11.0 solves this problem from two dimensions:
1) try to reduce the total amount of Buffer (FLINK-16428) in the data link so that the Checkpoint Barrier can be processed and aligned as soon as possible.
The upstream output controls the maximum threshold (Backlog) of Buffer stacked by a single Sub Partition to avoid the accumulation of a large number of Buffer on a single link in the case of uneven load.
Modify the default upstream and downstream Buffer configuration reasonably without affecting the network throughput performance.
The basic protocol of upstream and downstream data transmission has been adjusted to allow a single data link to configure 0 exclusive Buffer without deadlock, so that the total number of Buffer is decoupled from the concurrent scale of the job. According to the actual demand, the balance between throughput performance and Checkpoint speed is made, and the Buffer ratio is customized.
Some of this optimization has been done in 1.11.0, and the rest will be done in the next version.
2) A new Unaligned Checkpoint mechanism (FLIP-76) is implemented to fundamentally solve the problem of Checkpoint Barrier alignment in reverse pressure scenarios. In fact, this idea was contemplated long before version 1.10.0, and the implementation mechanism and threading model are also complex because of the major changes involved in many modules. We implemented the prototype POC of two different schemes, tested, compared the performance, and determined the final scheme, so we didn't complete the MVP version until 1.11.0, which is the only heavyweight Feature in the execution engine layer in 1.11.0. Its basic ideas can be summarized as follows:
Checkpoint Barrier transmits data across Buffer and does not queue for processing in the input and output queue, which is decoupled from the computing power of the operator. The transmission of Barrier between nodes only has network delay, which can be ignored.
Between multiple input links of each operator, there is no need to wait for Barrier alignment to execute Checkpoint, and the first Barrier to arrive can trigger Checkpoint in advance, which can further speed up Checkpoint without affecting the whole due to the delay of individual links.
In order to be consistent with the semantics of the previous Aligned Checkpoint, all unprocessed input and output data Buffer will be snapshot persisted as Channel State when Checkpoint is executed, and restored together with Operator State during Failover. In other words, the Aligned mechanism ensures that all the data in front of the Barrier must be processed, and the state is reflected in the Operator State in real time; while the Unaligned mechanism reflects the Operator State reflected by the unprocessed data in front of the Barrier to Failover Restart through Channel State playback, which is ultimately consistent from the point of view of state recovery. Note that although additional In-Flight Buffer persistence is introduced here, this process is actually done in the asynchronous phase of Checkpoint, while the synchronous phase only makes lightweight Buffer references, so it will not take up too much of the operator's computing time and affect throughput performance.
Unaligned Checkpoint can obviously accelerate the completion time of Checkpoint under severe backpressure, because it no longer depends on the overall computing capacity, but is more related to the storage performance of the system, which is equivalent to the decoupling of computing and storage. However, its use also has some limitations, it will increase the size of the overall State and bring additional overhead to the storage of IO, so it is not suitable to use Unaligned Checkpoint mechanism in the scenario where IO is already a bottleneck.
Unaligned Checkpoint has not been used as the default mode in 1.11.0. It needs to be manually configured by the user to enable it, and it only takes effect in Exactly-Once mode. However, the Savepoint mode is not supported yet, because Savepoint involves the Rescale scenario of the job, and Channel State currently does not support State split, which will be further supported in later versions. Therefore, Savepoint still uses the previous Aligned mode, which may take a long time to complete in reverse pressure scenarios.
Four summaries
During the development of Flink version 1.11.0, we have seen more and more contributors from China participate in the development of core functions, witnessing the increasing prosperity of the ecological development of Flink in China. For example, contributors from Tencent participated in the development of functions such as K8s and Checkpoint, and contributors from Byte Jump participated in some development of Table & SQL layer and engine network layer. Hope that more companies can participate in the Flink open source community and share their experience in different fields, so that Flink open source technology has always been advanced and can reach more audiences.
These are the improvements in the ease of use of Flink 1.11. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.