Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize the Join of data flow by Flink SQL

2025-10-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "how Flink SQL realizes the Join of data flow". In the daily operation, I believe that many people have doubts about how to realize the Join of data flow in Flink SQL. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubt of "how to realize the Join of data flow by Flink SQL". Next, please follow the editor to study!

No matter in the field of OLAP or OLTP, Join is a SQL statement which is often involved in business and has complex optimization rules. For offline computing, after years of accumulation in the database field, the semantics and implementation of Join has been very mature, but for Streaming SQL, which has just sprung up in recent years, Join is in its infancy.

The most critical problem is that the implementation of Join depends on caching the entire data set, while the object of Streaming SQL Join is infinite data flow. Memory pressure and computational efficiency are inevitable problems in long-term operation. The following article will analyze how Flink SQL solves these problems and implements the Join of two data streams combined with the development of SQL.

Implementation of offline Batch SQL Join

There are three basic implementation methods of traditional offline Batch SQL (SQL for bounded data sets), which are Nested-loop Join, Sort-Merge Join and Hash Join.

Nested-loop Join is the most simple and straightforward, loading two datasets into memory and comparing the elements in the two datasets one by one to see if they meet the Join condition by inline traversal. Although Nested-loop Join has the lowest time efficiency and space efficiency, it is more flexible and has a wide range of applications, so its variant BNL is often used by traditional databases as the default basic option of Join.

As the name implies, Sort-Merge Join is divided into two Sort and Merge phases. First, the two datasets are sorted separately, and then the two ordered datasets are traversed and matched respectively, similar to the merge of merge sorting. It is worth noting that Sort-Merge only applies to Equi-Join (the Join condition uses equals as the comparison operator). Sort-Merge Join requires sorting of two datasets, which is costly and is usually used as an optimization solution in the case of ordered datasets as input.

Hash Join is also divided into two phases, first converting one dataset to Hash Table, and then traversing another dataset element and matching the elements within the Hash Table. The first phase and the first data set are called the build phase and build table, respectively, and the second phase and the second data set are called the probe phase and the probe table, respectively. Hash Join is more efficient but requires more space, which is usually used as an optimization scheme when one of the tables of Join is a small table suitable for memory. Like Sort-Merge Join, Hash Join is only available for Equi-Join.

Real-time Streaming SQL Join

Compared with offline Join, real-time Streaming SQL (SQL for unbounded data sets) can not cache all data, so the sorting of data sets required by Sort-Merge Join is basically impossible, while Nested-loop Join and Hash Join can meet the requirements of real-time SQL after some improvements.

Let's look at the basic implementation of basic Nested Join in real-time Streaming SQL through examples. (the case and diagram come from Piotr Nowojski's sharing in Flink Forward San Francisco [2]).

Figure 1. Join-in-continuous-query-1

Table A has 1 and 42 elements, and Table B has 42 elements, so the Join result at this time will output 42.

Figure 2. Join-in-continuous-query-2

Then Table B receives three new elements in turn, 7, 3, and 1. Because 1 matches the element of Table A, the result table outputs another element 1.

Figure 3. Join-in-continuous-query-3

Then Table A has new inputs 2, 3, and 6 match the elements of Table B, so output 3 to the result table.

We can see that we need to save the contents of two input tables in Nested-Loop Join, while the historical data that Table An and Table B need to save increases endlessly over time, resulting in unreasonable memory and disk resources, and the matching efficiency of a single element will become less and less efficient. A similar problem exists in Hash Join.

So is it possible to set a cache culling strategy to clean up unnecessary historical data in a timely manner? The answer is yes, and the key lies in how the cache culling strategy is implemented, which is the main difference between the three Join provided by Flink SQL.

Join of Flink SQL

Regular Join

Regular Join is the most basic Join without cache culling strategy. Inputs and updates to both tables in Regular Join are globally visible, affecting all subsequent Join results. For example, in the following Join query, the new record of the Orders table will be matched with all historical and future records of the Product table.

SELECT * FROM OrdersINNER JOIN ProductON Orders.productId = Product.id

Because historical data is not cleaned up, Regular Join allows any kind of update operation (insert, update, delete) to the input table. However, because of the resource problem, Regular Join is usually not sustainable, so it is generally only used as the Join of bounded data streams.

Time-Windowed Join

Time-Windowed Join uses the window to set a Join time limit for the two input tables, and the data outside the time range is invisible to the JOIN and can be cleaned up. It is worth noting that one of the issues involved here is the semantics of time, which can refer to the system time at which the calculation occurred (that is, Processing Time) or to the Event Time extracted from the time field of the data itself. If the Processing Time,Flink automatically divides the Join time window according to the system time and cleans the data regularly; if the Event Time,Flink assigns the Event Time window and cleans the data according to the Watermark.

Taking the more commonly used Event Time Windowed Join as an example, a query that compares the Orders order table and the Shipments shipping order table based on the order time and shipping time Join is as follows:

SELECT * FROM Orders o, Shipments sWHERE o.id = s.orderId AND s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL'4' HOUR

This query sets the time lower bound of o.ordertime > s.shiptime-INTERVAL'4' HOUR for the Orders table (figure 4).

Figure 4. Time lower bound of Time-Windowed Join-Orders table

The time lower bound of s.shiptime > = o.ordertime is set for the Shipmenets table (figure 5).

Figure 5. Lower time bound of Time-Windowed Join-Shipment table

Therefore, both input tables only need to cache the data above the lower bound of time to maintain the space occupation in a reasonable range.

However, although there is no problem with the underlying implementation, how to define time through SQL syntax is still difficult. Although the concepts of Event Time, Processing Time and Watermark have become the consensus of the industry in the field of real-time computing, the support for time data types in the field of SQL is still weak. Therefore, the definition of Watermark and time semantics need to be done by programming API, such as the conversion from DataStream to Table, not by SQL alone. This support for the Flink community program is accomplished by expanding the SQL dialect, and interested readers can track progress through FLIP-66 [7].

Temporal Table Join

Although Timed-Windowed Join solves the resource problem, it also limits the usage scenario: both input streams of Join must have a lower bound of time, after which they are inaccessible. This is not applicable to many Join dimensional table businesses, because in many cases dimensional tables do not have a time limit. To solve this problem, Flink provides Temporal Table Join to meet the needs of users.

Temporal Table Join is similar to Hash Join, dividing input into Build Table and Probe Table. The former is generally the changelog of the latitude table, while the latter is generally the business data stream. Typically, the amount of data of the latter should be much larger than that of the former. In Temporal Table Join, Build Table is a view with a time version based on append-only data streams, so it is also called Temporal Table. Temporal Table requires that you define a primary key and a field for versioning (usually the Event Time time field) to reflect the content of the record at different times.

For example, a typical example is the exchange rate conversion of the amount of a business order. Suppose you have an Orders stream that records the order amount, and you need to Join with the RatesHistory exchange rate stream. RatesHistory represents the exchange rate at which different currencies are converted into yen, and there is an updated record whenever the exchange rate changes. The contents of the two tables at a certain time node are as follows:

Figure 6. Temporal Table Join Example]

We register RatesHistory as a Temporal Table named Rates, set the primary key to currency and the version field to time.

Figure 7. Temporal Table Registration]

After that, Rates is assigned a time version, and Rates calculates the exchange rate conversion content that matches the time version based on RatesHistory.

Figure 8. Temporal Table Content]

With the help of Rates, we can express the business logic in the following query:

SELECT o.amount * r.rateFROM Orders o, LATERAL Table (Rates (o.time)) rWHERE o.currency = r.currency

It is worth noting that, unlike the equality of the two tables in Regular Join and Time-Windowed Join, the new record of either table can be matched with the history of the other table, and in Temporal Table Join, updates to Temoparal Table are not visible to records of the other table prior to that time node. This means that we only need to save the Build Side record until the Watermark exceeds the version field of the record. Because the input of Probe Side is theoretically no longer recorded before Watermark, these versions of the data can be safely erased.

At this point, the study of "how to realize the Join of data flow by Flink SQL" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report