Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Exploration and practice on how to optimize Flink Real-time Task Resources

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

In this issue, the editor will bring you exploration and practice on how to optimize Flink real-time task resources. The article is rich in content and analyzes and describes for you from a professional point of view. I hope you can get something after reading this article.

With the completion of Flink K8s and real-time cluster migration, more and more Flink real-time tasks are running on K8s clusters. Flink K8s improves the flexibility of real-time clusters to scale up and reduce the cost of machine expansion and reduction during the promotion period. At the same time, because K8s has a special team for maintenance within the company, Flink K8s can also better reduce the company's operation and maintenance costs.

However, at present, Flink K8s task resources are configured by users on the real-time platform, and users have less experience on how many resources to configure real-time tasks, so there are situations where there are more user resources configured, but can not be used in practice. For example, 4 concurrency of a Flink task can actually meet the business processing needs, and as a result, 16 concurrency is configured by the user, which will lead to a waste of real-time computing resources, which will have a certain impact on the real-time cluster resource level and the underlying machine cost. Based on this background, the editor explores and practices the optimization of Flink task resources from the aspects of Flink task memory and message capability processing.

I. types and optimization ideas of Flink computing resources

1.1 Flink Compute Resource Type

I think the resources required for a Flink task to run can be divided into five categories:

Memory resources

Local disk (or cloud disk) storage

Dependent external storage resources. Such as HDFS, S3 (task status / data), HBase, MySQL, Redis, etc. (data)

CPU resources

Network card resources

At present, Flink tasks mainly use memory and CPU resources, and local disks, dependent external storage resources and network card resources are generally not bottlenecks, so in this paper, we optimize Flink real-time task resources from two aspects: memory and CPU resources of Flink tasks.

1.2 ideas for optimizing Flink real-time task resources

For Flink real-time task resource analysis, we think it mainly includes two points:

First, from the perspective of task memory, real-time tasks are analyzed from the aspect of heap memory.

On the other hand, it starts with the real-time task message processing ability to ensure that the data processing needs of the business side can be met, and at the same time, the CPU resources should be used as reasonably as possible.

After that, combined with the relevant indicators obtained from the real-time task memory analysis and the rationality of the real-time task concurrency degree, a preset value of real-time task resources is obtained. after full communication with the business side, the real-time task resources are adjusted to achieve the purpose of rationalizing the allocation of real-time task resources, so as to better reduce the cost of using the machine.

1.2.1 Task memory perspective

So how do you analyze the heap memory of Flink tasks? Here we combine the Flink task GC log for analysis. The GC log contains changes and usage of memory in different areas of the GC heap each time. At the same time, according to the GC log, you can also get a Taskmanager after each Full GC, the amount of space left in the old age. It can be said that obtaining the GC logs of real-time tasks is the premise for us to analyze the memory of real-time tasks.

GC log content analysis. Here we use the open source GC Viewer tool to carry out specific analysis. After each analysis, we can obtain GC-related metrics. Here are some of the results of an GC log analysis through GC Viewer:

Above, through the GC log, the total size of a single Flink Taskmanager heap, the memory space allocated by the younger generation and the old age, the remaining size of the old age after Full GC, and so on, are analyzed. Of course, there are many other metrics. The definition of relevant metrics can be checked in Github.

The most important thing here is the residual size of the old age after Full GC. According to the Java heap size calculation rule of the book "authoritative Guide to Java performance Optimization", if the remaining size space of the old age after Full GC is M, then the size is suggested to be 3 ~ 4 times M, 1 ~ 1.5 times M for the new generation, and 2 ~ 3 times M for the old age. Of course, for the real memory configuration, you can increase the corresponding proportion according to the actual situation. Used to prevent traffic from skyrocketing.

Therefore, through the GC log of the Flink task, we can calculate the total heap memory size recommended by the real-time task. When we find that the difference between the recommended heap memory size and the actual real-time task heap memory size is too large, we think that we can reduce the memory configuration of real-time tasks on the business side, thus reducing the use of machine memory resources.

1.2.2 Perspective of task message processing capability

For the analysis of Flink task message processing capability, we mainly look at whether the input per unit time of the data source consumed by the real-time task matches the Operator / Task message processing capability of the real-time task. Operator is an operator of Flink task, while Task is a physical carrier in which one or more operators Chain are executed together.

We generally use the unit time input of Kafka,Kafka Topic internally to obtain the data source by calling the Kafka Broker JMX metric API. Of course, you can also call Flink Rest Monitoring-related API to obtain all Kafka Source Task unit time inputs for real-time tasks, and then add them together. However, since the backpressure may have an impact on the input on the Source side, here we directly use the Kafka Broker metric JMX API to obtain the Kafka Topic unit time input.

After obtaining the Kafka Topic unit time input of the real-time task, the following is to determine whether the message processing capability of the real-time task matches the data source input. The overall message processing capability of a real-time task is affected by the slowest Operator / Task. For example, the Kafka Topic input consumed by a Flink task is 20000 Record / S, but there is a Map operator with a concurrency of 10. If the business side of the Map operator calls Dubbo, and a Dubbo interface is 10 ms from request to return, then the processing capacity of the Map operator is 1000 Record / S (1000 ms / 10 ms * 10), and the real-time task processing capacity will be reduced to 1000 Record / S.

Since the processing of a message record flows within a Task, we try to find the slowest Task logic in a real-time task. If we all Chain from Source to Sink, we will find out the logic of the slowest Operator processing. In the source code layer, we have added a custom Metric of single record processing time for Flink Task and Operator, and then the Metric can be obtained through Flink Rest API. We will go through all the Task in a Flink task, query the JobVertex (the point of JobGraph) where the slowest Task is located, and then get the total output of all the Task of the JobVertex, and finally compare it with the Kafka Topic unit time input to determine whether the message processing capability of the real-time task is reasonable.

If the input of real-time task Kafka Topic unit time is S, the concurrency degree of JobVertex represented by the slowest Task is P, the output of JobVertex unit time of the slowest Task is O, and the maximum message processing time of the slowest Task is T, then it is analyzed by the following logic:

When O is approximately equal to S and 1 second / T * P is much greater than S, reducing the degree of task concurrency will be considered.

When O is about S and 1 second / T * P is about S, adjusting the degree of task parallelism is not considered.

When O is much less than S and 1 second / T * P is much less than S, consideration will be given to increasing the degree of task parallelism.

Currently, 1 this situation is unreasonable in the use of CPU. Of course, due to the different traffic of real-time tasks in different time periods, we will have a task that is detected periodically. If we detect that a real-time task conforms to 1 many times in a row, it will automatically alarm and prompt the platform administrator to make resource optimization adjustments.

The following figure is an analysis of the resource logic diagram from the perspective of Flink task memory and message processing capabilities:

Second, analyze and practice Flink from the perspective of memory.

2.1 Flink Task garbage Collector selection

The Flink task is essentially a Java task, so it also involves the choice of garbage collector. Choosing a garbage collector generally requires reference from two angles:

Throughput, that is, task execution time / (task execution time + garbage collection time) per unit time, of course, it does not mean that reducing GC pause time will increase throughput, because if you reduce GC pause time, your GC times will also increase.

Delay. If your Java program involves external interaction, delays can affect the external request experience.

I think the Flink task is a type of Java task that focuses on throughput, so it will be considered more from a throughput perspective. Of course, this is not to say that delay is not considered at all. after all, there is a heartbeat among JobManager, TaskManager and ResourceManager. If the delay is too large, there may be the possibility of heartbeat timeout.

At present, our JDK version is internal JDK 1.8, and the new generation of garbage collectors use Parallel Scavenge, so the old garbage collectors can only choose from Serial Old or Parallel Old. Because the CPU of each Pod of our Flink K8s task is limited to 0.6-1 core, and the maximum is only 1 core, we used Serial Old in the old garbage collector. Multithreaded garbage collection is between single Core, and there may be consumption of thread switching.

2.2 Real-time task GC log acquisition

After setting up the garbage collector, the next step is to get the GC log for the Flink task. Flink tasks are generally composed of a single JobManager + multiple TaskManger. Here, you need to obtain the GC log of TaskManager for analysis. Do you want to get all the TaskManager? Here we sort according to the number of Young GC of TaskManager, sort by the number of times, and take the top 16 TaskManager for analysis. The number of YoungGC can be obtained through Flink Rest API.

The GC log of Flink on Yarn real-time task can be seen directly by clicking on the log link of TaskManager, and then accessed through HTTP, it can be downloaded locally. The GC log of Flink On K8s task will be first written to the cloud disk to which Pod is mounted, and mounted based on K8s hostpath volume. We use Filebeat internally for log file change monitoring and collection, and finally output to the downstream Kafka Topic. We will have a custom log server internally, which will consume the log records of Kafka, automatically set up the disk and manage it, and provide log download API to the outside. Through the log download interface, you can download the GC log of the TaskManager that needs to be analyzed.

2.3 Analysis of Flink task memory based on GC Viewer

GC Viewer is an open source GC log analysis tool. Before using GC Viewer, you need to clone the GC Viewer project code locally, and then compile and package it to use its functionality.

When analyzing the memory of a real-time task heap, the log of Flink TaskManager is downloaded locally, and then the log is processed through GC Viewer. If you think multiple Taskmanager GC log analysis is slow, you can use multithreading. All of the above operations can be coded to automatically produce analysis results. The following is the command line analyzed by GC Viewer:

Java-jar gcviewer-1.37-SNAPSHOT.jar gc.log summary.csv

The above parameter gc.log represents the GC log file name of a Taskmanager, and summary.csv represents the result of the log analysis. The following is the result of our platform's memory analysis of a real-time task:

The following is the description of some of the parameters in the screenshot above:

RunHours,Flink task run hours

YGSize, the maximum amount of memory allocated to a TaskManager Cenozoic heap, in megabytes

YGUsePC, the maximum utilization of a TaskManager Cenozoic reactor

OGSize, the maximum amount of memory allocated to a TaskManager old heap, in megabytes

OGUsePC, the maximum utilization of a TaskManager old generation reactor

YGCoun, one TaskMnager Young GC count

YGPerTime, a TaskMnager Young GC pause time per second

FGCount, one TaskMnager Full GC count

FGAllTime, one TaskMnager Full GC total time (in second)

Throught,Task Manager Throughput

AVG PT (analysis result avgPromotion parameter), average object size of each Young GC promoted to the old age

Rec Heap, recommended heap size

RecNewHeap, recommended Cenozoic heap size

RecOldHeap, the recommended old heap size

Most of the above memory analysis results can be obtained by GC Viewer analysis, but the recommended heap size, new generation heap size and old heap size are set according to the memory optimization rules in section 1.2.1.

Third, analyze and practice Flink from the perspective of message processing.

3.1Realtime task Kafka Topic unit time input acquisition

To analyze the message processing capability of a Flink task, the first step is to obtain the Kafka data source Topic of the real-time task. Currently, if the data source is not Kafka, we will not analyze it. Flink tasks are generally divided into two categories: Flink Jar tasks and Flink SQL tasks. It is relatively simple for the Flink SQL task to obtain the Kafka data source. Parse the Flink SQL code directly, and then get the parameters after With. After filtering out the Sink table, if the Conector type of SQLCreateTable is Kafka, you can get the specific Kafka Topic through the parameters after SQLCreateTable with.

It is relatively complicated to obtain the Kafka Topic data source of the Flink Jar task. We have a real-time task consanguinity resolution service within us. Through the automatic construction of the Flink Jar task, the PackagedProgram,PackagedProgram is a class within the Flink, and then through PackagedProgram, we can get all the StreamNode of the StreamGraph,StreamGraph of a Flink Jar task that contains Source and Sink. Through reflection, we can get the specific Source Function in the StreamNode. If it is Kafka Source Sunction, we will get its Kafka Topic. The following is a screenshot of the StreamGraph class:

After obtaining the Kafka Topic data source of the Flink task, the next step is to obtain the number of message records entered per unit time of the Topic, which can be obtained through the Kafka Broker JMX Metric API and through the external interface provided by the internal Kafka management platform.

3.2 automatic detection of Flink message processing slowest Task

First of all, we add the Metric of Flink Task single record processing time in the source code layer, which can be obtained through Flink Rest API. The next step is to traverse all the Task of the Flink task to be analyzed with the help of Flink Rest API. Flink Rest Api has an interface like this:

Base_flink_web_ui_url/jobs/:jobid

This API can get all the Vertexs of a task, and a Vertex can be simply understood as a JobVertex in the Flink task JobGraph. JobVertex represents a piece of execution logic in a real-time task.

After obtaining all the Vertex of the Flink task, the next step is to obtain the metric for each Vertex specific Task to process a single record. You can use the following API:

You need to add a get= (specific meitric) after the above Rest API link metrics. For example, metrics?get=0.Filter.numRecordsOut,0 represents the id,Filter.numRecordsOut of the Vertex Task and represents the specific metric name. We internally use taskOneRecordDealTime to indicate that Task processes a single record time Metric, and then use 0.taskOneRecordDealTime to get an indicator of the single record processing time of a Task. The above API supports multiple metrics queries, that is, get is separated by commas.

The overall steps for automatic detection of the slowest Flink message processing Task are as follows:

Get all the Vertexs of a real-time task

Iterate through each Vertex, then get the taskOneRecordDealTime of all the concurrency Task of this Vertex, and record its maximum value

All Vertex single record processing Metric maximum values were compared to find out the Vertex with the slowest processing time.

Here are the results of our real-time platform's analysis of a Flink real-time task:

Fourth, the practice of optimizing real-time task resources of Flink with praise.

Now that there are ways to analyze the memory and message processing capabilities of Flink tasks, the next step is to implement them on the real-time platform side. Our real-time platform scans all running Flink tasks regularly every day. In terms of task memory, we can combine real-time task GC logs and calculate the recommended heap memory size of Flink tasks according to memory optimization rules, and compare it with the heap memory of the actual allocated Flink tasks. If the difference between the two is too large, we think that the memory configuration of Flink tasks is wasted. Next, we will give an alarm prompt to the platform administrator for optimization.

After receiving the alarm prompt, the platform administrator will also determine whether the message capability of the real-time task is reasonable. If the message processing is the slowest Vertex (a certain section of real-time logic), the sum of all Task message records per unit time is about equal to the input of Kafka Topic unit time consumed by the real-time task, but through the concurrency of Vertex and the processing Metric of a single message. When it is calculated that the number of message records processed by the Vertex per unit time is much larger than the unit input of the Kafka Topic, it is considered that the Flink task can appropriately reduce the degree of concurrency. The specific amount of adjustment will be adjusted after communicating with the business side. The overall Flink task resource optimization operation flow is as follows:

The above is the exploration and practice of how to optimize Flink real-time task resources shared by the editor. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report