Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the features of Apache Flink 1.11?

2025-04-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

In this issue, the editor will bring you about the functions of Apache Flink 1.11. the article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

Flink 1.11 has improved and improved many aspects on the basis of 1.10, and is committed to further improving the availability and performance of Flink.

The new features, improvements, major changes and future development plans of version 1.11 are described in detail below.

In terms of cluster deployment

1.Flip-85] Flink supports Application Mode

Currently, Flink uses a separate client to create JobGraph and submit jobs. In practice, downloading job jar packages takes up a lot of bandwidth on the client machine and needs to start a separate process (taking up unmanaged resources) as a client. In order to solve these problems, a new Application mode is provided in Flink-1.11, which transfers the generation of JobGraph and the submission of jobs to the Master node.

Users can use application mode through bin/flink run-application. Currently, Application mode supports the deployment of Yarn and K8s. Yarn Application mode will pass all the dependencies needed to run the task to Flink Master through Yarn Local Resource on the client side, and then submit the task on the Master side. K8s Application allows the user to build an image containing the user's Jar and dependencies, automatically creates a TaskManager based on the job, and destroys the entire Cluster at the end.

two。 [Flink-13938] [Flink-17632] Flink Yarn supports remote Flink lib Jar caching and job creation using remote Jar

Before 1.11, every job submitted by Flink on Yarn needs to upload the Jars under Flink lib, which consumes additional storage space and communication bandwidth. Flink-1.11 allows users to provide multiple remote lib directories, and files under these directories are cached on Yarn nodes, thus avoiding unnecessary upload and download of Jar packages and making submission and startup faster:

. / bin/flink run-m yarn-cluster-d\-yD yarn.provided.lib.dirs=hdfs://myhdfs/flink/lib,hdfs://myhdfs/flink/plugins\ examples/streaming/WindowJoin.jar

In addition, 1.11 allows users to create jobs directly using Jar packages on the remote file system, further reducing the overhead of downloading Jar packages:

. / bin/flink run-application-p 10-t yarn-application\-yD yarn.provided.lib.dirs= "hdfs://myhdfs/flink/lib"\ hdfs://myhdfs/jars/WindowJoin.jar

3. [Flink-14460] Flink K8s feature enhancement

In 1.11, Flink supports the Application mode proposed by FLIP-85 for K8s, which has better isolation than Session mode.

In addition, Flink also adds some new features to support K8s, such as Node Selector,Label,Annotation,Toleration and so on. In order to integrate with Hadoop more conveniently, it also supports the function of automatically mounting Hadoop configuration according to environment variables.

4. [FLIP-111] docker image is unified

Previously, several different Dockerfile were provided in the Flink project to create Docker images of Flink, but now they are unified into the apache/flink-docker [1] project.

5. [Flink-15911] [Flink-15154] supports configuring network interfaces for local monitoring bindings and addresses and ports for external access, respectively.

In some usage scenarios (such as Docker, NAT port mapping), the local network address and port seen by the JM/TM process may be different from the address and port used by other processes to access the process from the outside. Previously, Flink did not allow users to set different local and remote addresses for TM/JM, which caused problems with Flink in the NAT network used by Docker, etc., and could not limit the exposure range of listening ports.

Different parameters are introduced for local and remote listening addresses and ports in 1.11. Where:

* jobmanager.rpc.address

* jobmanager.rpc.port

* taskmanager.host

* taskmanager.rpc.port

* taskmanager.data.port

Used to configure remote listening addresses and ports

* jobmanager.bind-host

* jobmanager.rpc.bind-port

* taskmanager.bind-host

* taskmanager.rpc.bind-port

* taskmanager.data.bind-port

Used to configure local listening addresses and ports.

In terms of resource management

1. [Flink-16614] Unified memory resource configuration on JM

A major change in Flink-1.10 is the redefinition of the TM memory model and configuration rules [2]. Flink 1.11 further adjusts the JM memory model and configuration rules to unify the memory configuration of JM with that of TM:

For specific memory configuration, please refer to the corresponding user documentation [3].

two。 [FLIP-108] added scheduling support for extended resources such as GPU

With the development of machine learning and deep learning, more and more Flink assignments will be embedded in machine learning or deep learning models, resulting in the demand for GPU resources. Prior to 1.11, Flink did not support the management of extended resources such as GPU. In order to solve this part, Flink provides a unified management framework for extended resources in 1.11, and built-in support for GPU resources based on this framework.

For further configuration of the extended resource management framework and GPU resource management, you can refer to the corresponding FLIP page: the Publlic interface section of https://cwiki.apache.org/confluence/display/FLINK/FLIP-108%3A+Add+GPU+support+in+Flink (the corresponding user documentation community is under preparation, and you can refer to the corresponding user documentation later).

3. [FLINK-16605] allows users to limit the maximum number of slot for Batch jobs

To prevent Flink Batch jobs from taking up too much resources, Flink-1.11 introduces a new configuration item: slotmanager.number-of-slots.max, which limits the maximum number of Slot for the entire Flink cluster. This parameter is recommended only for Batch Table / SQL jobs that use Blink Planner.

Enhancements to Flink-1.11 WEB UI

1. [FLIP-103] improve the display of JM/TM logs on Web UI

Previously, users could only read .log and .out logs through Web UI, but in fact there may be other files in the log directory, such as GC log and so on. The new interface allows users to access all logs under the log directory. In addition, the functions of log reload, download and full-screen display have been added.

two。 [FLIP-99] allows more historical Failover exceptions to be displayed

Previously, Web UI can only show a single 20 historical Failover exceptions for a single job. When the job is frequently Failover, the initial exception (more likely to be root cause) will be flooded quickly, thus increasing the difficulty of troubleshooting. The new version of WEB UI supports paging to show more historical exceptions.

3. [Flink-14816] allows users to Thread Dump directly on the page

Thread Dump is very helpful for locating problems in some jobs. Before 1.11, users must log in to the machine where TM resides to perform Thread Dump operations. 1.11's WEB UI integrates this feature by adding Thread dump tabs that allow users to obtain TM Thread Dump directly through WEB UI.

Source & Sink

1. [FLIP-27] New Source API

FLIP-27 is a larger Feature in 1.11. There are some problems in the traditional Source interface of Flink, such as the need to implement different Source for stream jobs and batch jobs, the lack of unified data partition discovery logic, the need for Source implementers to handle the locking logic themselves, and the lack of a common architecture that makes Source developers have to deal with multithreading manually. These problems increase the difficulty of implementing Source in Flink.

FLIP-27 introduces a whole new set of Source interfaces. This interface provides unified functions such as data partition discovery and management, and users only need to focus on logic such as partition information reading and data reading, instead of dealing with complex thread synchronization problems, which greatly simplifies the burden of Source implementation and provides a basis for providing more built-in functions for Source.

two。 [FLINK-11395] [Flink-10114] Streaming File Sink added support for Avro and ORC formats

For commonly used StreamingFileSink,1.11, support for two common file formats, Avro and ORC, has been added.

Avro:

Stream.addSink (StreamingFileSink.forBulkFormat (Path.fromLocalFile (folder), AvroWriters.forSpecificRecord (Address.class)) .build ())

ORC:

OrcBulkWriterFactory factory = new OrcBulkWriterFactory (new RecordVectorizer (schema), writerProps, new Configuration ()); Stream.addSink (StreamingFileSink .forBulkFormat (new Path (outDir.toURI ()), factory) .build ())

State management

1. [FLINK-5763] modify the file structure of Savepoint so that Savepoint can be self-contained and moved

Flink-1.11 replaces the absolute path of the file in Savepoint with a relative path, allowing users to move the location of the Savepoint directly without the need to manually modify the path in meta (Note: this feature is not supported when Entropy Injection is enabled in the S3 file system).

two。 [FLINK-8871] add callback of Checkpoint failure and notify TM side

Notification of Checkpoint success was provided prior to Flink 1.11. On the one hand, you can cancel the ongoing Checkpoint, and on the other hand, users can also receive the corresponding notification through the new notifyCheckpointAborted API of CheckpointListener.

3. [FLINK-12692] heap keyed Statebackend supports overflow data to disk

(this feature is not actually incorporated into the Flink 1.11 code, but users can download and try it from https://flink-packages.org/packages/spillable-state-backend-for-flink. )

For Heap Statebackend, it can achieve better performance because it maintains state directly as a Java object. However, the memory consumed by its Heap State backend is uncontrollable before, because citation can cause serious GC problems.

To solve this problem, SpillableKeyedStateBackend supports overflowing data to disk, allowing Statebackend to limit the amount of memory used. For more information about SpillableKeyedStateBackend, refer to https://flink-packages.org/packages/spillable-state-backend-for-flink.

4. [Flink-15507] enable Local Recovery by default for Rocksdb Statebackend

When Local Recovery is enabled by default, the speed of Failover can be accelerated.

5. Modify the default value of state.backend.fs.memory-threshold parameter to 20k

(this part of the work is still in progress, but should be included in 1.11.)

State.backend.fs.memory-threshold determines when State data needs to be written out of memory in FS Statebackend. The previous default of 1k caused problems with a large number of small files in many cases and affected the performance of State access, so this value was raised to 20k in 1.11. It is important to note that this change may increase JM memory usage, especially if operator concurrency is high or UnionState is used. [4]

Table & SQL

1. [FLIP-65] optimize the type inference mechanism in Table API UDF

The new version of the type inference mechanism provides more type information about input parameters than the previous type inference mechanism, allowing users to implement more flexible processing logic. This feature currently provides support for UDF and UTF, but does not support UDAF for the time being.

two。 [FLIP-84] optimize the interface of TableEnvironment

Flink-1.11 enhances TableEnv in the following areas:

In the past, sqlUpdatec behaved differently for DDL and DML, and the former would execute immediately, while the latter would have to wait until env.execute. 1.11 is executed when it is unified as env.execute. Provides support for queries that need to return results, such as show table, explain sql, etc. Provides support for caching the execution of multiple SQL statements. A new collect method has been added to allow users to obtain query execution results.

3. [FLIP-93] supports Catalog based on JDBC and Postgres

Before 1.11, when users used Flink to read / write relational databases or read Change Log, they needed to manually copy the table schema of the database to Flink. This process is boring and easy to make mistakes, which greatly increases the use cost of users. 1.11 provides Catalog management based on JDBC and Postgres, so that Flink can automatically read table schemas, thus reducing the manual operation of users.

4. [FLIP-105] added support for ChangeLog feeds

It is the function that users always want to import the dynamic data of external system (such as Mysql BinLog,Kafka Compacted Topic) into Flink through Change Data Capture mechanism (CDC) and write out the Update/Retract stream of Flink to external system. Flink-1.11 implements support for reading and writing CDC data. At present, Flink can support both Debezium and Canal CDC formats.

5. [FLIP-95] New TableSource and TableSink interfaces

It simplifies the interface structure of current Table Source/Sink, provides the basis for supporting CDC functions, avoids the dependence on DataStream API and solves the problem that only Blink Planner can support efficient Source/Sink implementation.

For more specific API changes, please see:

Https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

6. [FLIP-122] modify Connector configuration item

FLIP-122 rearranges Table/SQL Connector's "With" configuration item. Due to historical reasons, there are some redundancy or inconsistencies in With configuration items, such as all configuration items are connector. Beginning and different configuration item name patterns, and so on. The modified configuration items address these redundancy and inconsistencies. (it should be emphasized that the existing configuration items can still be used normally.)

For a list of new configuration items, please refer to:

Https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

7. [FLIP-113] Flink SQL supports dynamic Table attributes

The dynamic Table attribute allows the user to dynamically modify the configuration item of the table when using the table, thus avoiding the trouble of redeclaring the DDL of the table due to the change of the configuration item. As shown below, dynamic attributes allow the user to override the attribute values in DDL with the syntax of / * + OPTIONS ('K1cm attribute values v1') * / when executing the query.

SELECT * FROM EMP / * + OPTIONS ('K1, JOIN DEPT, v2) * / JOIN DEPT / * + OPTIONS ('a.b.cm2 / v4') / ON EMP.deptno = DEPT.deptno

8. [FLIP-115] add Flink SQL's support for Hive

Support for the five csv/orc/parquet/json/avro formats is provided for FileSystem Connector, as well as full support for Batch and Streaming FileSystem Connector.

Provides support for Hive Streaming Sink.

9. [FLIP-123] supports Hive-compatible DDL and DML statements

FLIP-123 provides support for Hive dialects, which allows users to operate using Hive's DDL and DML.

DataStream API

1. [FLIP-126] optimize the WatermarkAssigner interface of Source

(note that this part of the work has been completed, but whether it should be included in 1.11 is still under discussion.)

The new WatermarkAssigner interface integrates the two types of Watermark interfaces of the previous AssignerWithPunctuatedWatermarks and AssignerWithPeriodicWatermarks, thus simplifying the complexity of Source implementation that supports plug-in Watermark in subsequent development.

two。 [FLIP-92] supports Operator with more than two inputs

Flink 1.11 provides support for multi-input Operator.

MultipleInputTransformation transform = new MultipleInputTransformation ("My Operator", new SumAllInputOperatorFactory (), BasicTypeInfo.LONG_TYPE_INFO, 1)

Env.addOperator (transform .addInput (source1.getTransformation ()) .addInput (source2.getTransformation ()) .addInput (source3.getTransformation ()

New MultipleConnectedStreams (env) .transform (transform) .addSink (resultSink)

PyFlink & ML

1. [FLINK-15636] supports the operation of Python UDF in Flink Planner's batch mode

Before that, Python UDF could run in Blink Planner's stream, batch, and Flink Planner stream modes. After support, the stream batch mode of both Planner supports the operation of Python UDF.

two。 [FLINK-14500] support for Python UDTF

UDTF supports a single write to multiple outputs. The stream batch mode of both Planner supports the operation of Python UDTF.

3. [FLIP-121] optimize the execution efficiency of Python UDF through Cython

The computing logic of Coder (serialization, deserialization) and Operation is optimized with Cython, and the end-to-end performance is tens of times higher than that of version 1.10.

4. [FLIP-97] support for Pandas UDF

Pandas UDF uses pandas.Series as the input and output type and supports batch processing of data. Generally speaking, Pandas UDF performs better than normal UDF because it reduces the serialization and deserialization overhead of data interaction between Java and Python processes, and reduces the number of Python UDF calls and call overhead because the data can be processed in batches. In addition, when you use Pandas UDF, you can use the Pandas-related Python library more easily and naturally.

5. [FLIP-120] supports conversion between PyFlink Table and Pandas DataFrame

Import pandas as pdimport numpy as np

# Create a PyFlink Tablepdf = pd.DataFrame (np.random.rand (1000, 2)) table = t_env.from_pandas (pdf, ["a", "b"]) .filter ("a > 1000")

# Convert the PyFlink Table to a Pandas DataFramepdf = table.to_pandas ()

6. [FLIP-112] support defining user-defined Metric in Python UDF

Currently, four custom Metric types are supported, including Counter, Gauges, Meters and Distributions. Both User Scope and User Variables for Metric can be defined.

7. [FLIP-106] [FLIP-114] supports the use of Python UDF in SQL DDL and SQL client

Before that, Python UDF could only be used in Python Table API. After registering for Python UDF in a way that supports DDL, SQL users can also easily use Python UDF. In addition, SQL Client is also supported by Python UDF, Python UDF registration and Python UDF dependency management.

8. [FLIP-96] supports Python Pipeline API

A new set of ML Pipeline API has been introduced in Flink 1.9 to enhance the ease of use and extensibility of Flink ML. Due to the wide use of Python in the field of ML, FLIP-96 provides a set of corresponding Python Pipeline API for the convenience of Python users.

Runtime optimization

1. [FLIP-76] supports Unaligned Checkpoint

Under Flink's existing Checkpoint mechanism, each operator needs to wait until it receives all the upstream Barrier alignment before it can Snapshot and continue to send barrier back. In the case of backpressure, it may take a long time for Barrier to transfer from the upstream operator to the downstream, which leads to the problem of Checkpoint timeout.

To solve this problem, Flink 1.11 adds Unaligned Checkpoint mechanism. When you enable Unaligned Checkpoint, you can execute checkpoint when you receive the first barrier, and save the data being transferred between upstream and downstream to the snapshot as a state. In this way, the completion time of checkpoint is greatly shortened, it no longer depends on the processing capacity of operators, and solves the problem that checkpoint cannot do in reverse pressure scenarios for a long time.

You can turn on the unaligned Checkpoint mechanism through env.getCheckpointConfig (). EnableUnalignedCheckpoints ();.

two。 [FLINK-13417] supports Zookeeper 3.5

Flink integration with ZooKeeper 3.5 is supported. This will allow users to use some new Zookeeper features, such as SSL, etc.

3. [FLINK-16408] supports Classloder reuse at Slot level

Flink 1.11 modifies the loading logic of the ClassLoader on the TM side: unlike the previous creation of a new ClassLoader after each Failover, as long as there is a Slot occupied by this job, the corresponding ClassLoader will be cached. This modification has some impact on the semantics of the job Failover because the Static field is not reloaded after Failover, but it avoids the problem of running out of JVM meta memory caused by a large number of ClassLoader creation.

4. [FLINK-15672] upgrade the logging system to log4j 2

5. [FLINK-10742] reduce the number of data copies and memory footprint of the TM receiver

When Flink-1.11 receives data in the downstream network, by reusing the buffer memory management of Flink itself, the memory copy from netty layer to Flink buffer and the extra overhead of direct memory are reduced, thus the probability of Direct Memory OOM or Container being Kill due to memory overuse in online jobs is reduced.

These are the functions of Apache Flink 1.11 shared by the editor. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report