Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Flink Batch SQL 1.10 practice

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

As a unified computing framework for streaming batches, Flink has completed a large number of batch-related enhancements and improvements in 1.10. 1.10 can be said to be the first mature production available Flink Batch SQL version, it swept away the weakness of the previous Dataset, there are significant improvements in function and performance, the following I will elaborate from three aspects: architecture, external system integration, and practice.

Architecture Stack

First, let's take a look at stack. In the new Blink planner, batch is also set up on Transformation, which means that we have nothing to do with Dataset at all:

We can reuse components and code with streaming as much as possible, and have the same set of behaviors. If you want Table/SQL 's toDataset or fromDataset, it's completely out of the question. Deal with it at the Table level as much as possible. In the future, we are considering building BoundedStream on DataStream to bring batch processing capabilities to DataStream. Network model

The Batch pattern is the end result in the middle, which is consistent with typical Batch processing, such as MapReduce/Spark/Tez.

The previous network models of Flink are also divided into Batch and Pipeline, but the Batch mode only supports upstream and downstream partition execution, that is, resource consumption does not have to meet the common concurrency of upstream and downstream at the same time. But another key point is that Failover is not docking well, and 1.9and 1.10 have made improvements in this respect to support a single point of Failover.

It is recommended to open it when Batch:

Jobmanager.execution.failover-strategy = region

To avoid rebooting too frequently and causing JobMaster to be too busy, you can increase the restart interval:

Restart-strategy.fixed-delay.delay = 30s

The benefits of the Batch model are:

Good fault tolerance, can be a single point of recovery and scheduling, no matter how many resources can run poor performance, intermediate data needs to be down the disk, it is strongly recommended to enable compression

Taskmanager.network.blocking-shuffle.compression.enabled = true

Batch mode is relatively stable, suitable for traditional Batch operations, large assignments.

Pipeline mode is the traditional mode of Flink, which uses the same set of codes as Streaming homework. In fact, Impala and Presto in the community are also similar modes. They only go through the network and need to deal with reverse pressure without dropping the disk. Its main advantages and disadvantages are:

Fault tolerance, only global rescheduling, you have to ensure that there are sufficient resources for good performance, Pipeline execution, complete reuse of Stream, reuse flow control backpressure and other functions.

If possible, you can consider turning on Pipeline mode.

Scheduling model

Flink on Yarn supports two modes, Session mode and Per job mode, which are now highly unified at the scheduling level.

Session mode has no maximum process restrictions. When Job needs resources, it will go to Yarn to apply for new resources, and when Session has free resources, it will reuse Job, so its model is basically the same as PerJob. The only difference is that the Session mode can reuse processes across jobs.

In addition, if you want a better reuse process, consider increasing the timeout release of TaskManager:

Resourcemanager.taskmanager-timeout = 900000

Resource model

Let's start with concurrency:

For Source: at present, the table of Hive is based on InputSplit to determine how much concurrency is needed, and the Operators that can be Chain after it is naturally the same concurrency as source. For the Operators (Tasks) after the downstream network transmission: except for the Task which must need single concurrency, all the other Task are unified and controlled by table.exec.resource.default-parallelism.

We have implemented the ability to infer concurrency based on statistics within Blink, but the above strategy is sufficient in most scenarios.

Manage memory

Currently, there are multiple Slot in a TaskManager. In a Batch job, only one Task can be run in a Slot (turn off SlotShare).

For memory, a single TM will split Manage memory into Slot granularity. If there are n Slot in 1 TM, that is, Task can get 1 manage memory.

We have made a major improvement in 1.10 is that each operators chain up in Task is proportioned to carve up memory, so now the configured operator memory is a proportional value, and what you actually get is divided according to the memory of Slot.

One of the important benefits of this is:

Jobs can be run no matter how much memory there is in the current Slot, which greatly improves out-of-the-box usage. No matter how much memory there is in the current Slot, Operators will carve up the memory and there is no possibility of waste.

Of course, for the sake of efficiency, we generally recommend that a single Slot should have more manage memory than 500MB.

Another thing is that after 1.10, we removed OnHeap's manage memory, so there was only off-heap 's manage memory.

External system Integration Hive

Hive Catalog + Hive is highly recommended, which is by far the most mature architecture for batch processing. In 1.10, in addition to the improvement of previous functions, several other things have been done:

Multi-version support, support for Hive 1.X 2.x 3.x improves partition support, including partition read, dynamic / static partition write, partition statistics support. Integrating Hive built-in functions, you can load in the following ways:

A) TableEnvironment.loadModule ("hiveModule", new HiveModule ("hiveVersion")) optimizes the performance of ORC reads, using vectorized reads, but currently only supports Hive 2 + version, and requires no complex type of column. Have you ever optimized a gap of five times the order of magnitude. Compatible with Streaming Connectors

Thanks to the unified architecture of streaming batches, the current streaming Connectors can also be used on batch. For example, Lookup and Sink of HBase, Lookup and Sink of JDBC, and Sink of Elasticsearch can all be used seamlessly in Batch.

Practice SQL-CLI

In 1.10, SQL-CLI also made a lot of changes, such as stateful SQL-CLI, which also supports DDL, and also supports a large number of DDL commands, which exposes a lot of TableEnvironment capabilities to SQL-CLI, which makes it much more convenient for users. In the future, we also need to interface with the client of JDBC, so that users can better interface with external tools. But SQL-CLI still needs to be improved, for example, currently only Session mode is supported, but Per Job mode is not supported.

Programming mode TableEnvironment tEnv = TableEnvironment.create (EnvironmentSettings .newInstance () .useBlinkPlanner () .inBatchMode () .build ())

The old BatchTableEnv is a dirty design because it binds Dataset and distinguishes Java from Scala, so Blink planner only supports the new TableEnv.

TableEnv registered source, sink, connector, functions, all of which are from temporary and are invalidated after restart. If you need persistent object, consider using HiveCatalog.

TEnv.registerCatalog ("hive", hiveCatalog); tEnv.useCatalog ("hive")

You can execute DML through tEnv.sqlQuery to get a Table, and we also use collect to get a small amount of data:

Table table = tEnv.sqlQuery ("SELECT COUNT (*) FROM MyTable"); List results = TableUtils.collectToList (table); System.out.println (results)

You can execute DDL through tEnv.sqlUpdate, but currently you do not support creating a table for hive, you can only create a table of type Flink:

TEnv.sqlUpdate ("CREATE TABLE myResult (" + "cnt BIGINT") WITH ("+" 'connector.type'='jdbc', "... ())

You can execute insert statements through tEnv.sqlUpdate, Insert to a temporary table or Catalog table, such as insert to the temporary JDBC table created above:

TEnv.sqlUpdate ("INSERT INTO myResult SELECT COUNT (*) FROM MyTable"); tEnv.execute ("MyJob")

When the result table is a Hive table, you can use either overwrite syntax or static Partition syntax, which requires opening the dialect of Hive:

TEnv.getConfig (). SetSqlDialect (SqlDialect.HIVE)

At present, Flink batch SQL is still in rapid development, but 1.10 is already an available version, it has a great improvement in function and performance, and there are a lot of interesting features waiting for everyone to dig together.

Original text link

This article is the original content of Aliyun and may not be reproduced without permission.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report