In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Today, I will talk to you about how to carry out Flink homework problem analysis and tuning practice, which may not be well understood by many people. In order to make you understand better, the editor has summarized the following contents for you. I hope you can gain something according to this article.
Checkpoint mechanism
1. What is checkpoint simply means that Flink persists state periodically in order to achieve fault tolerance and exactly-once semantics, and this persistence process is called checkpoint, which is a snapshot of the global state of Flink Job at a certain time.
When we want to implement a global state reservation function for the distributed system, the traditional scheme will introduce a unified clock, which will be broadcast to each slaves node through the master node in the distributed system. When the node receives the unified clock, they can record their current state.
However, there are also some problems in the way of unifying the clock. The GC time of a certain node is relatively long, or the fluctuation of the network between master and slaves at that time results in the transmission delay or failure of the clock, which will cause the data inconsistency between this slave and other machines and eventually lead to brain fissure. If we want to solve this problem, we need to do a HA (High Availability) for master and slaves. However, the more complex a system is, the more unstable it is and the higher the maintenance cost is.
Flink puts all the checkpoint into a stream called Barrier.
The figure above is an example of a Barrier. From the first Task upstream to the last Task downstream, the save snapshot function is triggered every time the Task passes through the blue fence in the figure. Let's use an example to illustrate briefly.
two。 Case analysis
This is a simple ETL process. First, we take the data from the Kafka for a trans conversion operation, and then send it to a downstream Kafka.
There is no chaining tuning in this example. So forward strategy is used at this time, that is, "the output of a task is only sent to one task as input." this approach also has the advantage that if both task are in the same JVM, then unnecessary network overhead can be avoided.
Set Parallism to 2, and the DAG figure is as follows:
The Analysis process of ■ CK
Each Flink job will have a JobManager, and there will be a checkpoint coordinator in the JobManager to manage the entire checkpoint process. We can set a time interval for checkpoint coordinator to send an checkpoint event to the source task in each Container, that is, the first task (corresponding to the task1,task2 in the parallel diagram).
When a Source operator receives a Barrier, it pauses its own data processing, then makes its current state into a snapshot (snapshot), saves it to the specified persistent storage, and finally asynchronously sends an ack (Acknowledge character-- confirmation character) to CheckpointCoordinator, while broadcasting the Barrier to all its downstream operators to resume its own data processing.
Each operator makes the snapshot as above and broadcasts it downstream until finally the Barrier is passed to the sink operator, and the snapshot is made. It is important to note that the upstream operator may be multiple data sources, and the corresponding Barrier needs to be all aligned to trigger checkpoint at one time. Therefore, in the case of a long checkpoint, it may be because it takes a long time to align the data.
■ Snapshot & Recover
As shown in the figure, this is the initialization phase of our Container container. E1 and e2 are data just consumed from Kafka, and at the same time, CheckpointCoordinator sends Barrier to it.
At this time, Task1 completes its checkpoint process, and the effect is to record that offset is 2 (E1 ~ E2), and then broadcast Barrier to the downstream operator, and the input of Task3 is the output of Task1. Now, assuming that the function of my program is the number of statistical data, the checkpoint effect of Task3 is to record the number of data as 2 (because the data from Task1 are E1 and E2), and then broadcast the Barrier downwards, when the Barrier is passed to the sink operator. Snapshot is finished.
At this time, data will continue to be generated in the source and new checkpoint will be generated, but at this time, if the Container is down and restarted, the data needs to be restored. The offset in the just completed checkpoint is 2, and the state count is 2, so we will restore it according to this state. At this point, Task1 will be consumed from E3, which is called Recover operation.
Considerations for ■ checkpoint
The three key points listed below will affect the throughput of the system, which should be paid attention to in the actual development process:
3. Generation of back pressure and reverse pressure treatment of Flink
In distributed systems, there is often a need for data exchange between multiple Task and multiple JVM, and we use producers and consumers to illustrate this.
Suppose that my current Producer uses unbounded buffer for storage. When the production speed of our producers is much faster than that of consumers, the data on the production side will backlog due to the low consumption power of the consumer side, which will eventually lead to the generation of OOM.
Even if the bounded buffer is used, the consumer's spending power is also low, and the producer will stop production when the buffer is full, which can not completely solve our problem, so we need to adjust according to different circumstances.
Flink also exchanges data between different TaskManager through bounded buffer. And the method is divided into static flow control and dynamic flow control.
To put it simply, when producers have more TPS than consumers, we use overflow writing, use batch to encapsulate our data, and then send it out in batches, and then sleep for a period of time after each transmission is completed, which is calculated as left (remaining data) / tps, but it is difficult to predict the situation of the system.
Flow control prior to Flink 1.5 is based on TCP's sliding window, which has been mentioned in previous courses. Flink has deprecated this mechanism since 1.5, so I won't explain it here. In this network model, the data generation node can only decide whether it wants to send data to the consumer side by checking whether the current channel is writable or not. It has no idea about the true capacity of the downstream data consumer side. As a result, when the generating node finds that the channel is no longer writable, it is possible that the downstream consumer node has a large backlog of data.
Credit-Based we use the following example of data exchange to illustrate:
The data exchange of Flink can be divided into three kinds, one is the data exchange of the same Task, the other is the data exchange between different Task and JVM. The third is the exchange between different Task and different JVM.
The data exchange of the same Task is the forward strategy method we just mentioned, which mainly avoids serialization and network overhead.
The second method of data exchange is that the data is first serialized through a record Writer and then passed to Result Partition, and then passed to the Input Gate of another Task through local channel, then deserialized and pushed to Record Reader for operation.
Because the third kind of data exchange involves different JVM, there will be some network overhead, which is different from the second one in that it first pushes the data to the Netty and pushes the data to the remote Task through netty.
■ Credit-Based
At this point, we can see that event1 has already been pushed to TaskB,backlog with a backlog = 1. In fact, it is only to make consumers aware of our production side.
After event1 is received by TaskB, TaskB will return an ack to TaskA with a credit = 3, which tells TaskA how many more pieces of data it can receive. Flink uses this way of telling each other to make producers and consumers aware of each other's status.
After a period of time, the bounded buffer in TaskB is full, and TaskB replies credit = 0 to TaskA. At this time, the channel channel will stop working and TaskA will no longer send data to TaskB.
At this time, after a period of time, there has also been a backlog of data in the bounded Buffer in TaskA, so the decline in throughput and processing delay we usually encounter is because the whole system is equivalent to a stagnant state at this time. As shown in figure 2, all processes are marked with "X", indicating that these processes have stopped working.
JVM is a very complex system. When it runs out of memory, it will cause OOM and cause the system to crash. After getting our allocated memory, Flink will first allocate a cutoff reserved memory to ensure the security of the system. Netword buffers is actually a memory pool corresponding to the bounded buffer,momery manager that we have just mentioned. This part of memory can be set to in-heap or out-of-heap memory. Of course, in streaming jobs, we generally set it to out-of-heap memory, and the Free part is the memory block provided to users.
Now let's assume that the memory allocated to this TaskManager is 8g.
The first thing is to cut off the part of cutoff, which is 0.25 by default, so our available memory is 0.1 of the available memory occupied by 8gx0.75network buffers, so the 6144x0.1 in-heap / out-of-heap memory is the part of available memory minus network buffers, and then multiplied by 0.8 to give users the remaining 0.2 part of heap memory.
In fact, the truth is that Flink first knows the size of heap memory and then deduces the size of other memory.
Problem location of Flink Job
1. The formula of problem positioning
"one pressure, two checks and three indicators, delayed throughput is the core. Always pay attention to the amount of resources, first look at GC."
The first pressure refers to the back pressure, and the second check refers to checkpoint. Whether it takes a long time to align the data and whether the state is very large. These are closely related to the system throughput. The three indicators refer to some displays of the Flink UI block. Our main concerns are actually delay and throughput, system resources, and GC logs.
Look at the reverse pressure: usually the downstream of the last high subTask is one of the bottlenecks of job.
Look at Checkpoint duration: Checkpoint duration can affect the overall throughput of job to a certain extent.
Look at the core indicators: indicators are the basis for accurate judgment of the performance of a task, and delay indicators and throughput are the most key indicators.
The utilization rate of resources: improving the utilization rate of resources is the ultimate goal.
Common performance problems in ■
A brief explanation:
When focusing on back pressure, people often ignore the performance problems caused by the serialization and deserialization of data. Some data structures, such as HashMap and HashSet, where key needs to be calculated by hash, use keyby to operate when there is a large amount of data, which has a great impact on performance. Data tilting is our classic problem, which will be expanded later. If our downstream is MySQL,HBase, we will all carry out a batch operation, that is, let the data be stored in a buffer and then sent when certain conditions are met. The purpose of this is to reduce interaction with external systems and reduce the cost of network overhead. Frequent GC, whether CMS or G1, will stop the whole job during GC. A long GC time will also cause JobManager and TaskManager to be unable to send heartbeats on time. At this time, JobManager will think that this TaskManager is missing, and it will open a new TaskManager window as a means to cut unlimited data into limited data blocks. For example, we know that there is a problem of data overlap when using sliding windows. Size = 5min does not belong to the category of large windows, but step = 1s means that data will be processed once a second, which will lead to high data overlap and large amount of data.
2.Flink job tuning
We can remove duplicates through some data structures, such as Set or Map, combined with Flink state. However, these deduplication schemes will lead to a sharp decline in performance as the amount of data increases, such as the write performance problems caused by hash conflicts, the GC problems caused by excessive memory, and the loss of TaskManger.
Program 2 and scheme 3 are also carried out by means of some data structures, and interested students can go down and understand it by themselves, and will not be carried out here.
■ data skew
Data tilt is a high-frequency problem that everyone will encounter, and there are many solutions.
The first scenario is that when our concurrency setting is lower than the number of zones, it will cause the above-mentioned uneven consumption.
The second mentioned is the uneven distribution of key, which can be broken up by adding random prefixes so that the data will not be concentrated in several Task.
Perform an aggregation operation on the same key locally on each node, similar to the local combiner in MapReduce. After map-side pre-aggregation, there is only one same key locally for each node, because multiple identical key are aggregated. When other nodes pull the same key on all nodes, the amount of data that needs to be pulled is greatly reduced, thus reducing the disk IO and network transmission overhead.
■ memory tuning
The memory structure of Flink has just been mentioned, so we know that tuning is mainly aimed at tuning non-heap memory Network buffer, manager pool and heap memory, which are basically controlled by parameters.
These parameters need to be adjusted according to our own situation, and only some suggestions are given here. And for ManagerBuffer, Flink's streaming jobs now do not use too much of this part of the memory, so we will set it relatively small, no more than 0.3.
Heap memory tuning is about JVM, mainly to change the default garbage collector to G1, because the default use of Parallel Scavenge for the old GC has a serialization problem, its Full GC takes a long time, the following is some introduction about G1, there are a lot of online materials, so I won't explain it here.
After reading the above, do you have any further understanding of how to carry out Flink homework problem analysis and tuning practice? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.