In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly shows you the "Flink implementation engine in the stream batch integration example analysis", the content is easy to understand, well-organized, hope to help you solve your doubts, the following let the editor lead you to study and learn the "Flink implementation engine flow batch integration example analysis" this article.
I. background
With the continuous development of the Internet and mobile Internet, various industries have accumulated huge amounts of business data. In order to improve the user experience and enhance the competitiveness of products in the market, enterprises have adopted a real-time approach to deal with big data. The real-time big screen of social media, the real-time recommendation of e-commerce, the real-time traffic forecast of the city brain, and the real-time anti-fraud of the financial industry all show that the real-time processing of big data has become an irresistible trend.
Under the general trend of real-time, Flink has become the de facto standard in the real-time computing industry. We can see that not only Alibaba, but also leading manufacturers in various fields at home and abroad use Flink as the technical base for real-time computing, including byte jumping, Tencent and Huawei at home, Netflix and Uber abroad, and so on.
Real-time business is only a starting point, and one of the goals of Flink is to provide users with a real-time offline integrated user experience. In fact, many users not only need real-time data statistics, in order to confirm the effectiveness of the operation or product strategy, users also need to compare with historical data (yesterday, or even the same period last year). From the user's point of view, the original streaming and batch independent scheme has some pain points:
The cost of labor is relatively high. Since streams and batches are two systems, the same logic needs to be developed twice by two teams. Data link redundancy. In many scenarios, the content of flow and batch computing is actually the same, but because it is two systems, the same logic still needs to be run twice, resulting in a certain waste of resources. The caliber of the data is inconsistent. This is the most important problem that users encounter. Two sets of systems, two sets of operators, two sets of UDF, will certainly produce varying degrees of error, these errors have brought great trouble to the business side. These errors can not be solved simply by the input of manpower or resources.
On Singles' Day in 2020, while the real-time flood peak reached an all-time high of 4 billion, the Flink team together with the DT team launched a data warehouse architecture based on Flink, which well solved a series of problems caused by the Lambda architecture: the same SQL was used for batch operations, which increased the R & D efficiency by 3 times and 4 times; a set of engines ensured the natural consistency of data caliber. Batch operations run in the same cluster, and peak cutting and valley filling greatly improve resource efficiency.
The success of Flink is inseparable from the healthy and vigorous development of the Flink open source community. As can be seen from the Apache Software Foundation's annual report, Flink ranks among the top three key indicators reflecting the prosperity of the open source community: Flink ranks first in the activity of user mailing lists, Flink ranks second in the number of developer submissions, and Github users rank second in traffic. These data are not limited to big data's field, but to all the projects under the Apache Open Source Foundation.
The year 2020 is also the second year for Blink to feed the community. In these two years, we have gradually contributed the experience accumulated by Blink in the group to the community, so that Flink has become a real platform for flow and approval. Through this article, I would like to share with you what Flink has done in the past two years in the implementation of engine flow batch fusion. At the same time, I also hope that old users and new friends of Flink can learn more about the "past life and present life" of the integrated architecture of Flink.
Second, the hierarchical structure of the integration of flow and approval
Overall, the core engine of Flink is mainly divided into the following three layers:
SDK layer. There are two main types of SDK in Flink, the first is relational Relational SDK, which is SQL/Table, and the second is physical Physical SDK, which is DataStream. These two types of SDK are unified in streaming batches, that is, whether it is SQL or DataStream, the user's business logic can be used in both streaming and batch scenarios as long as it is developed; execute the engine layer. The execution engine provides a unified DAG to describe the data processing process Data Processing Pipeline (Logical Plan). Whether it is a flow task or a batch task, the user's business logic is converted to this DAG diagram before execution. The execution engine converts this logical DAG into Task executed in a distributed environment through Unified DAG Scheduler. Task transfers data through Shuffle, and we use Pluggable Unified Shuffle architecture to support two Shuffle modes of streaming batches; state storage. The state storage layer is responsible for storing the state execution state of the operator. For streaming jobs, there are open source RocksdbStatebackend, MemoryStatebackend, and commercial versions of GemniStateBackend;. For batch jobs, we have introduced BatchStateBackend in the community version. This article mainly shares the following aspects:
DataStream introduces how to solve the current challenges faced by Flink SDK through streaming and batch integrated DataStream; DAG Scheduler introduces how to fully tap the performance advantages of streaming engine through a unified Pipeline Region mechanism; how to improve the ease of use of the engine and improve the resource utilization of the system by dynamically adjusting the execution plan The flow-batch integrated Shuffle architecture introduces how to meet the customized requirements of different Shuffle policies through a unified Shuffle architecture, while avoiding repeated development on common requirements; the flow-batch integrated fault tolerance strategy introduces how to meet the fault tolerance in batch scenarios and improve the fault tolerance effect in flow scenarios.
Third, flow and batch integrated DataStream
SDK Analysis and challenges
As shown in the figure above, there are three main types of SDK provided by Flink:
Table/SQL is an advanced SDK of Relational, which is mainly used in some data analysis scenarios. It can support both Bounded and Unbounded input. Because Table/SQL is Declarative, the system can help users make many optimizations, such as Filter Push Down predicate push-down, on-demand reverse sequence binary data and so on, according to the Schema provided by the user. At present, Table/SQL can support both Batch and Streaming execution modes. [1] DataStream is a kind of Physical SDK. Although Relatinal SDK is powerful, it also has some limitations: the operation of State and Timer is not supported; the upgrade of Optimizer may lead to incompatibility of physical execution plans between the two versions with the same SQL. On the other hand, DataStream SDK can not only support the operation of State and Timer dimension Low Level, but also have good control over the physical execution plan because DataStream is a kind of Imperative SDK, so there is no incompatibility caused by version upgrade. At present, DataStream still has a large user base in the community. For example, there are still nearly 500 DataStream issue without Closed. Although DataStream can support both Bounded and Application written by Unbounded Input in DataStream, it only supports the execution mode of Streaming before Flink-1.12. DataSet is a Physical SDK that only supports Bounded input. It optimizes some operators according to the characteristics of Bounded, but does not support operations such as EventTime and State. Although DataSet is the earliest SDK provided by Flink, with the continuous development of real-time and data analysis scenarios, the influence of DataStream and SQL,DataSet in the community is gradually declining. At present, Table/SQL 's scenario support for stream batch unification is relatively mature, but there are still some challenges for Phyiscal SDK, mainly in two aspects:
Using the existing Physical SDK, it is impossible to write a real Application that can be used in production. For example, if a user writes a program to deal with real-time data in Kafka, it is natural to use the same program to deal with historical data stored on OSS/S3/HDFS. But at present, neither DataSet nor DataStream can meet the "simple" demands of users. You may wonder why there is a problem when DataStream does not support both Bounded's Input and Unbounded's Input. In fact, "the devil is hidden in the details", I will elaborate on it in the Unified DataStream section. The cost of learning and understanding is relatively high. As Flink grows, more and more new users join the Flink community, but for these new users they have to learn two kinds of Physical SDK. Compared with other engines, the cost of getting started with users is relatively high. There are semantic differences between the two kinds of SDK, for example, Watermark and EventTime are available on DataStream, but not on DataSet. It is not a small threshold for users to understand the two mechanisms. As the two SDK are not compatible, a new user will face a high switching cost if he or she makes a wrong choice. Unified Physical SDK
In order to solve the above challenges faced by Physical SDK, we regard Unified DataStream SDK as the unified Physical SDK of Flink. This part mainly solves two problems:
Why choose DataStream as the Unified Physical SDK?
What capabilities does Unified DataStream provide than the "old" DataStream so that users can write a real batch Application that can be used in production?
Why not Unified DataSet?
In order to solve the problem of high cost of learning and understanding, the most natural and simple solution is to choose one of DataStream and DataSet as the only Physical SDK of Flink. So why did we choose DataStream over DataSet? There are two main reasons:
User revenue. As previously analyzed, with the development of the Flink community, the influence of DataSet in the community is gradually declining. If you choose to use DataSet as the Unified Physical SDK, then a large number of users' previous "investment" in DataStream will be invalidated. Choosing DataStream can give many users an additional return on their existing DataStream "investment"; development costs. DataSet is too old to support the basic concepts of modern real-time computing engines, such as EventTime, Watermark, State, Unbounded Source and so on. Another deeper reason is that the existing implementation of DataSet operators is completely unreusable in streaming scenarios, such as Join and so on. This is not the case with DataStream, where a lot of reuse can be done. So how to reuse DataStream operators in two scenarios of streaming batches? Unified DataStream
Many users who have some knowledge of Flink may ask: DataStream is an input that supports Bounded/Unbounded at the same time, why do we say: it is impossible to write a real stream batch Application that can be used in production with DataStream? To put it simply, DataStream was originally designed to be used in Unbounded scenarios, so there is still a gap between Bounded and traditional batch engines in terms of efficiency, availability, and ease of use. Specifically, it is reflected in the following two aspects:
Efficiency
Let me show you an example. The following is a performance comparison chart of DataStream and DataSet that runs a WordCount of the same size. As you can see from this example, the performance of DataSet is nearly five times that of DataStream.
Obviously, in order for DataStream to support both streaming and batch scenarios in production, it is necessary to greatly improve the efficiency of DataStream in Bounded scenarios. So why is DataStream less efficient than DataSet?
As we mentioned earlier, DataStream was originally mainly designed for use in Unbounded scenarios, while one of the main features of Unounded scenarios is disorder, that is to say, any DataStream operator cannot assume the order in which the Record is processed, so many operators will cache these out-of-order data with a Kramv storage. Wait until the appropriate time to extract these data from the Kramp V storage for processing and output. In general, operator access to K _ hand V storage involves a lot of serialization and deserialization, while random disk I _ hand O is also raised; in DataSet, it is assumed that the data is bounded, that is, random disk I _ max O access can be avoided by optimization, while serialization and deserialization are optimized. This is the main reason why WorkerCount written in DataSet is five times faster than WordCount written in DataStream.
When you know the reason, is it necessary to rewrite all the DataStream operators? No problem in theory, but DataStream has a large number of operators to rewrite, and some operators are more complex, such as a series of operators related to Window. As you can imagine, if all are rewritten, the amount of work is very huge. So we almost completely avoid rewriting all operators through the single Key BatchStateBackend, and get very good results at the same time.
Students who have some knowledge of Flink should know that the original Application written in DataStream adopts the execution mode of Streaming, in which the semantics of end-to-end Exactly Once is maintained through Checkpoint. Specifically, the Sink of a job will Commit the data to the external system only after all the operators of the whole graph (including Sink itself) have done their own Snapshot, which is a typical 2PC protocol that relies on Flink Checkpoint mechanism. Although Streaming can also be used in the Bounded scenario, there may be some problems for users:
High resource consumption: using Streaming mode, you need to get all the resources at the same time. In some cases, users may not have so many resources; fault tolerance costs are high: in Bounded scenarios, some operators may not be able to support Snapshot operations in order to be efficient, and the entire job may need to be re-executed in the event of an error. So in the Bounded scenario, users want Application to use Batch execution mode, because the use of Batch execution mode can naturally solve the above two problems. It is relatively simple to support the execution mode of Batch in the Bounded scenario, but it introduces a very thorny problem-the end-to-end Exactly Once semantics cannot be guaranteed by using the existing Sink API. This is because there is no Checkpoint in Bounded scenarios, and the original Sink relies on Checkpoint to ensure end-to-end ExactlyOnce. At the same time, we do not want developers to develop two different implementations for Sink in different modes, because this does not take advantage of Flink and other ecological docking.
In fact, the Sink of a Transactional mainly solves the following four problems:
What to commit?How to commit?Where to commit?When to commit? Flink should ask Sink developers to provide What to commit and How to commit, while the system should choose Where to commit and When to commit according to different execution modes to ensure end-to-end Exactly Once. Finally, we proposed a new Unified Sink API, which allows developers to develop only one set of Sink to run in both Streaming and Batch execution modes. What is introduced here is only the main idea, how to ensure the consistency of End to End in the limited flow scenario, and how to connect the external ecology such as Hive and Iceberg, in fact, there are still some challenges.
4. DAG Scheduler with integrated stream and batch
What problem is Unified DAG Scheduler trying to solve?
It turns out that Flink has two scheduling modes:
One is the flow scheduling mode, in which Scheduler will apply for all the resources needed by a job, and then schedule all the Task of the job at the same time, and all Task communicate with each other in the way of Pipeline. Batch jobs can also be done in this way, and there will be a great improvement in performance. However, for long-running Batch jobs, this model still has some problems: when the scale is relatively large, it consumes more resources at the same time, and for some users, it may not have so many resources; the cost of fault tolerance is relatively high, for example, once an error occurs, the whole job needs to be rerun. One is the batch scheduling mode. This model is similar to the traditional batch engine, where all Task can apply for resources independently, and Task communicate with each other through Batch Shuffle. The advantage of this approach is that the cost of fault tolerance is relatively small. But there are also some shortcomings in this mode of operation. For example, data between Task interacts with each other through disk, resulting in a large number of disk IO. Generally speaking, with these two scheduling methods, we can basically meet the needs of the integrated flow and batch scenario, but there is also a lot of room for improvement, specifically reflected in three aspects:
Inconsistent architecture and high maintenance costs. The essence of scheduling is to allocate resources, in other words, to solve the problem of When to deploy which tasks to where. The original two scheduling modes have certain differences in the timing and granularity of resource allocation, which ultimately leads to the lack of complete unity in the scheduling architecture, which requires developers to maintain two sets of logic. For example, in the flow scheduling mode, the granularity of resource allocation is the scheduling mode of all Task; batches of the entire physical execution plan, and the granularity of resource allocation is a single task. When Scheduler gets a resource, it needs to take two different sets of processing logic according to the job type; performance. The traditional batch scheduling method, although the fault tolerance cost is relatively small, but the introduction of a large number of disk Imax O, and the performance is not the best, can not give full play to the advantages of Flink streaming engine. In fact, in the scenario with relatively sufficient resources, we can adopt the scheduling mode of "stream" to run Batch jobs, so as to avoid additional disk IBO and improve the efficiency of job execution. Especially at night, streaming jobs can release certain resources, which makes it possible for batch jobs to run in a "Streaming" way. Adaptive. At present, the physical execution plans of the two scheduling modes are static, and the static generation of physical execution plans has some problems, such as high cost of tuning manpower, low utilization of resources and so on. Unified scheduling based on Pipeline Region
In order to give full play to the advantages of flow engine and avoid some shortcomings of full graph scheduling at the same time, we introduce the concept of Pipeline Region. Unified DAG Scheduler allows Task to communicate with each other through either Pipeline or Blocking in a DAG diagram. These Task, which are connected by Pipeline's data exchange, are called a Pipeline Region. Based on the above concepts, Flink introduces the concept of Pipeline Region, which applies for resources and schedules tasks according to the granularity of Pipeline Region, whether it is a stream job or a batch job. Careful readers will find that, in fact, the original two modes are special cases of Pipeline Region scheduling.
Even if the resource can satisfy the "flow" scheduling mode, which tasks can be scheduled in the "flow" way?
Some students will still worry that the fault-tolerant cost of adopting "flow" scheduling mode will be higher, because in "flow" scheduling mode, when an error occurs in a Task, all Task connected to him will Fail and then run again.
In Flink, there are two connection modes between different Task [2], one is the connection mode of All-to-All, the upstream Task will connect with all the downstream Task, and the other is the link mode of PointWise, in which the upstream Task will only connect with the downstream Task.
If all the Task of a job are connected through All-to-All, once the "flow" scheduling mode is adopted, then the whole physical topology needs to be scheduled at the same time, then there does exist the problem of high FailOver cost [3]. However, in the topology of actual Batch jobs, not all Task are connected through the edge of All-to-All. A large number of Task in Batch jobs are connected through the edge of PointWise, and the Task connected graph connected by PointWise is scheduled by "flow", which can not only reduce the fault tolerance cost of the job, but also improve the execution efficiency of the job, as shown in the following figure, in the full 10T TPC-DS test. If all PointWise edges are linked with Pipeline, the overall performance can be improved by more than 20%.
The above is only one of the four strategies for dividing Pipeline Region provided by Schduler [4]. In fact, Planner can customize which Task adopts Pipeline transmission mode and which Task adopts Batch transmission mode according to the actual operation scenario.
The essence of adaptive scheduling is the decision-making process of resource allocation to the physical execution plan. After the physical execution plan is determined by Pipeline Region, flow jobs and batch jobs can be scheduled according to the granularity of Pipeline Region. There are some problems with statically generating physical execution plans for batch jobs [5]:
The cost of configuring manpower is high. For batch jobs, although it is theoretically possible to infer the concurrency of each stage in the physical execution plan based on statistical information, the static decision results may be seriously inaccurate due to a large number of UDF or the lack of statistical information. In order to ensure the SLA of business assignments, during the promotion period, business students need to manually adjust the concurrency of high-quality batch jobs according to the greatly promoted traffic estimates. As the business changes rapidly, once the business logic changes, the process will be repeated constantly. The whole tuning process requires manual operation by business students, and the labor cost is relatively high. even so, misjudgment may occur, resulting in low utilization of SLA; resources that can not meet the needs of users. Because the cost of manually configuring concurrency is relatively high, it is not possible to manually configure concurrency for all jobs. For medium and low priority jobs, business students will choose some default values as the degree of concurrency, but in most cases these default values are too large, resulting in a waste of resources And although high-priority jobs can be configured concurrently manually, due to the tedious configuration mode, after the big promotion, although the traffic has dropped, the business side will still use the configuration during the promotion period, resulting in a lot of waste of resources; poor stability. The waste of resources eventually leads to the over-application of resources. At present, most batch jobs are mixed with streaming job clusters, specifically, the resources applied for are non-guaranteed resources. Once resources are tight or machine hotspots appear, these non-guaranteed resources are the priority to be adjusted.
In order to solve these problems in static generation physical execution, we introduce adaptive scheduling function for batch jobs [6]. Compared with the original static physical execution plan, using this feature can greatly improve the resource utilization of users. Adaptive Scheduler can dynamically determine the concurrency of the current JobVertex based on the execution of the upstream JobVertex of a JobVertex. In the future, we can also dynamically decide what kind of operator to use downstream based on the upstream JobVertex output data.
V. Shuffle framework of streaming and approving as a whole
Flink is a streaming platform, so the engine provides two types of Shuffle, Streaming and Batch, for different execution modes. Although there are some differences in specific strategies between Streaming Shuffle and Batch Shuffle, they are essentially for the purpose of redividing the data (re-partition), so there are still some commonalities between different Shuffle. Therefore, our goal is to provide a unified Shuffle architecture, which can not only meet the policy customization of different Shuffle, but also avoid repeated development on common requirements.
Overall, the Shuffle architecture can be divided into four levels as shown in the following figure. The Shuffle requirements of streams and batches have some differences at each level, as well as a lot of commonalities. I have done some brief analysis below.
Differences between streaming batch Shuffle
As we all know, batch jobs and stream jobs have different requirements for Shuffle, which can be reflected in the following three aspects:
The life cycle of Shuffle data. The Shuffle data of the stream job is basically consistent with the life cycle of the Task, while the Shuffle data of the batch job is decoupled from the Task life cycle; the storage medium of the Shuffle data. Because the Shuffle data life cycle of the flow job is relatively short, the Shuffle data of the flow job can be stored in memory; while the Shuffle data life cycle of the batch job has some uncertainty, so the Shuffle data of the batch job needs to be stored on disk; Shuffle deployment method [7]. Deploying Shuffle services and compute nodes together is an advantage for convection jobs because it reduces unnecessary network overhead, thereby reducing Latency. However, for batch jobs, this deployment method has some problems in resource utilization, performance and stability. [8]
Commonalities between streaming batch Shuffle
There are differences and similarities in Shuffle between batch jobs and stream jobs. The commonness is mainly reflected in:
Meta management of data. The so-called Shuffle Meta refers to the mapping of logical data partition to the physical location of the data. Whether in streaming or batch scenarios, under normal circumstances, you need to find out the physical location of reading or writing data from Meta; in abnormal cases, in order to reduce the cost of fault tolerance, Shuffle Meta data is usually persisted; data transfer. Logically, the Shuffle of both stream jobs and batch jobs is to re-partition/re-distribution the data. In distributed systems, the repartition of data involves data transmission across threads, processes and machines.
Shuffle architecture Unified Shuffle architecture abstracts three components [9]: Shuffle Master, Shuffle Reader, and Shuffle Writer. Flink completes the repartition of data between operators by interacting with these three components. Through these three components, you can meet the differences in specific strategies of different Shuffle plug-ins:
Shuffle Master resource request and resource release. In other words, the plug-in needs to notify the framework How to request/release resource. On the other hand, it is up to Flink to determine the upstream operator of When to call it;Shuffle Writer, which uses Writer to write data to Shuffle Service--Streaming Shuffle and writes data to memory; External/Remote Batch Shuffle can write data to external storage; and operators downstream of Shuffle Reader can read Shuffle data through Reader. At the same time, we also provide architectural support for the commonness of streaming batch Shuffle-- Meta management, data transfer, service deployment [10]-- so as to avoid repeated development of complex components. Efficient and stable data transmission is one of the most complex subsystems of a distributed system. For example, upstream and downstream backpressure, data compression, zero copy of memory and other problems have to be solved in the transmission. As long as it is developed once in the new architecture, it can be used together in both streaming and batch scenarios, which greatly reduces the cost of development and maintenance.
6. the fault-tolerant strategy of the integration of flow and batch
The original fault tolerance strategy of Flink is based on checkpoint. specifically, regardless of the failure of a single Task or JobMaster, Flink will restart the entire job according to the most recent checkpoint. Although there is some room for optimization in this strategy, it is basically acceptable to the flow scenario in general. Currently, the checkpoint [11] is not opened in Flink Batch mode, which means that if any errors occur, the entire job has to be executed from scratch.
Although the original strategy can theoretically guarantee that it will eventually produce the right results, it is obvious that most customers cannot accept the price of this fault-tolerant strategy. In order to solve these problems, we have improved the fault tolerance of Task and JM respectively.
Pipeline Region Failover
Although there is no timed Checkpoint in Batch exec mode, in Batch exec mode, Flink allows Task to communicate through Blocking Shuffle. After a failure to read the Task of the Blocking Shuffle, because all the data needed for the Task is stored in the Blocking Shuffle, you only need to restart the Task and all the downstream tasks connected to it through the Pipeline Shuffle, instead of restarting the entire job.
Generally speaking, the Pipeline Region Failover policy is the same as when Scheduler performs normal scheduling, it splits a DAG into some Pipeline Region connected by several Pipeline shuffle. Whenever a FailOver occurs in a Task, only the Region in which the Task is located will be restarted.
JM Failover
JM is the control center of a job, including the various execution states of the job. Flink uses these states to schedule and deploy tasks. Once an error occurs in JM, all of these states will be lost. Without this information, even if none of the worker nodes failed, the new JM would not be able to schedule the original job. For example, because the end information of a task has been lost, after the end of a task, the new JM cannot determine whether the existing state meets the conditions for scheduling downstream tasks-- all input data has been generated.
From the above analysis, we can see that the key to JM Failover is how to make a JM "restore memory". In VVR [12], we restore the critical state of JM based on Operation Log mechanism.
Careful students may have found that although the starting point of these two improvements is for batch scenarios, they are actually equally effective for the homework capacity of the flow. The above is only a brief introduction of the two fault-tolerant strategies, in fact, there is a lot to think about. For example, what should we do if the upstream data of Blocking is lost? What are the key states in JM that need to be restored? The above is all the contents of the article "sample Analysis of flow and batch Integration in Flink execution engine". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.