In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article shows you how to achieve a typical ETL scenario based on Flink, the content is concise and easy to understand, it will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.
The following will start with the background of the birth of digital warehouse, the architecture of digital warehouse, and the comparison between offline and real-time data warehouse, summarize the development and evolution of digital warehouse, and then share several schemes to implement typical ETL scenarios based on Flink.
1. An overview of real-time data warehouse 1.1 background of real-time data warehouse generation
Let's first review the concept of data warehouse.
The concept of data warehouse was put forward by Bill Inmon in the 1990s, and the background at that time was that the traditional OLTP database could not well support the long-period analysis and decision-making scenario, so we should compare and understand the four core points of the concept of data warehouse combined with the state of OLTP database at that time.
Topic-oriented: data warehouse data organization is different from OLTP transaction-oriented processing. Because the data warehouse is analysis-oriented, the data is often organized in the form of topics such as analysis scenarios or analysis objects.
Integrated: for data warehouse, it is often necessary to collect multiple scattered and heterogeneous data sources, do some ETL processing such as data cleaning, and integrate into a data warehouse, while OLTP does not need to do similar integration operations.
Relatively stable: OLTP database is generally business-oriented, its main role is to accurately reflect the current business status, so OLTP database needs to support a large number of add, delete, change operations. But for the data warehouse, as long as it is the data stored in the warehouse, the general usage scenario is query, so the data is relatively stable.
Reflect historical changes: a data warehouse is a collection of data that reflects historical changes, which can be understood as storing snapshots of some historical data. For OLTP databases, it is only necessary to reflect the latest status at that time.
The above four points are a core definition of the data warehouse. We can also see that for real-time data warehouses, some definitions of traditional data warehouses, that is, offline data warehouses, will be weakened, such as reflecting historical changes. After introducing the basic concepts of data warehouse, some classical modeling methods will be used in data warehouse modeling, such as paradigm modeling, dimensional modeling and Data Vault. In the Internet big data scene, the most commonly used is the dimensional modeling method.
Then take a look at the classic architecture of the offline warehouse.
This data warehouse architecture is mainly biased towards the scenario of Internet big data, which can be seen from the above picture that there are three core links.
1. The first link is the part of data sources. In general, there are two main types of data sources for Internet companies:
The first category is the log type data source of the user's behavior log and some back-end logs by reporting at the client burial point. For embedding behavior logs, it usually goes through such a process that the data is first reported to Nginx and then collected by Flume, then stored in message queues such as Kafka, and then pulled to our offline data warehouse HDFS by some real-time or offline pulling tasks.
The second type of data source is the business database, and for the business database, its binlog is usually collected through Canal, then it is also collected into the message queue, and finally pulled to HDFS by Camus.
Both parts of the data source will eventually land on the ODS layer in HDFS, also known as the source data layer, which is consistent with the original data source.
two。 The second link is the offline data warehouse, which is shown in the blue box in the figure. We can see that it is a hierarchical structure, in which the model design is based on the idea of dimensional modeling.
The lowest layer is the ODS layer, which keeps the data stored in HDFS without information loss and basically keeps the original log data unchanged.
Above the ODS layer, unified data cleaning and normalization are generally carried out, and the DWD detail data layer is obtained. This layer also contains unified dimensional data.
Then based on the DWD detail data layer, we will organize our data according to some analysis scenarios, analysis entities, etc., and organize into some sub-topic summary data layer DWS.
On top of DWS, we will do some APP application data layer closer to the application scenario, which should be highly aggregated and can be directly imported into our application services.
In the middle of the production process of offline data warehouse, some offline production architecture engines are generally used, such as MapReduce, Hive, Spark and so on. The data is generally stored on HDFS.
3. After the first two links, some of our application layer data will be stored in data services, such as HBase, Redis, Kylin and other KV storage. And it will encapsulate the corresponding service interface and provide services for some of the data stored in these data stores. In the outermost layer we will produce some business-oriented reports, analysis-oriented data products, and some business products that will support online, and so on. This layer is called the data application part that is closer to the business side.
The above is an introduction to a basic offline data warehouse classic architecture.
As we all know, with the popularity of mobile devices, we are gradually transitioning from the manufacturing era to the Internet era. In the era of manufacturing, the traditional warehouse is mainly to support the business decision-makers and managers of enterprises in some traditional industries to make some business decisions. At that time, the business decision-making cycle was relatively long, at the same time, the amount of data was relatively small, and databases such as Oracle and DB2 were sufficient.
However, with the development of distributed computing technology, intelligent technology, the improvement of overall computing power, the development of the Internet and other factors, the amount of data we collect on the Internet has increased exponentially. And the business no longer only depends on people to make decisions, a large part of the main body of decision-making has been transformed into computer algorithms, such as some intelligent recommendation scenarios and so on. Therefore, at this time, the decision-making cycle is raised from the original sky-level requirements to seconds, and the decision-making time is very short. In terms of scenarios, they will also face more scenarios that require real-time data processing, such as real-time personalized recommendations, advertising scenarios, and even some traditional enterprises have begun to monitor the quality of processed products in real time. And the financial industry relies heavily on anti-cheating and so on. Therefore, under such a background, the real-time data warehouse must be proposed.
1.2 Real-time data warehouse architecture
First of all, let's introduce the classic architecture of real-time data warehouse-Lambda architecture:
This architecture was proposed by the author of Storm. In fact, the main idea of Lambda architecture is to add the part of real-time data warehouse on the basis of the original offline data warehouse architecture, and then make a merge between the offline stock data and our real-time data of tweak 0, which can produce the result of real-time update of data status.
Compared with the above 1.1 offline data warehouse architecture diagram, it is obvious that the increase in the real-time warehouse is the yellow area in the image above. We usually put real-time warehouse data on message queues such as Kafka, and there are also some layers of dimensional modeling, but in the part of summarizing the data, we will not put some data in the APP layer in the real-time warehouse, but more will move to the data service side to do some calculations.
Then in the part of real-time computing, we often use computing engines such as Flink, Spark-streaming and Storm. In terms of timeliness, the timeliness can be improved from the original sky level and hour level to the second level and minute level.
You can also see that in this architecture diagram, there are two parts in the middle data warehouse, one is the offline data warehouse, the other is the real-time data warehouse. We have to operate and maintain two sets of engines (real-time computing and offline computing), and at the code level, we also need to implement real-time and offline business code. Then when merging, we need to ensure data consistency between implementation and offline, so whenever our code changes, we also need to do a lot of comparison and verification of this real-time offline data. In fact, this is relatively high for both resources and operation and maintenance costs. This is an obvious and prominent problem in Lamda architecture. Therefore, the Kappa structure is generated.
One of the main ideas of the Kappa architecture is to remove the offline data warehouse in the data warehouse part, and all the data warehouse production uses real-time data warehouse. From the picture above, you can see that the offline data warehouse module is no longer in the middle.
With regard to Kappa architecture, students who are familiar with real-time warehouse production may have a question. Because we are often faced with business changes, a lot of business logic needs to be iterated. If the caliber of some of the previously produced data has changed, it will need to be recalculated or even re-brushed historical data. For real-time data warehouse, how to solve the problem of data recalculation?
The idea of the Kappa architecture in this area is to first prepare a message queue that can store historical data, such as Kafka, and this message alignment allows you to restart consumption from a historical node. Then you need to start a new task to consume the data on the Kafka from an earlier time node, and then when the progress of the new task is on a par with the current running task, you can switch the downstream of the current task to the new task, the old task can be stopped, and the result table of the original output can also be deleted.
With some improvements in our real-time OLAP technology, a new real-time architecture has been proposed, which is temporarily called the real-time OLAP variant.
This idea is that a large number of aggregation, analysis and calculation are carried out by the real-time OLAP engine. In the part of real-time warehouse computing, we do not need to do particularly heavy, especially some logic related to aggregation, and then we can ensure that we can flexibly face all kinds of business analysis requirements changes in the data application layer, and the whole architecture is more flexible.
Finally, let's make an overall comparison of these architectures of real-time data warehouses.
This is a comparison of the three real-time warehouse architectures as a whole:
From the perspective of computing engine: Lamda architecture it needs to maintain two sets of batch computing engines, Kappa architecture and real-time OLAP variants only need to maintain streaming computing engine.
Development cost: for Lamda architecture, because it needs to maintain two sets of real-time offline code, its development cost will be higher. Kappa architecture and real-time OLAP variants only need to maintain a set of code.
Analytical flexibility: real-time OLAP variants are relatively flexible.
In terms of real-time OLAP engine dependency: real-time OLAP variants are strongly dependent on the capabilities of real-time OLAP variant engines, while the first two are not.
Computing resources: Lamda architecture needs to batch two sets of computing resources, Kappa architecture only needs streaming computing resources, and real-time OLAP variants need additional OLAP resources.
Logical change recalculation: Lamda architecture is recalculated through batch processing, Kappa architecture needs to re-consume message queue recalculation as described earlier, real-time OLAP variants also need to re-consume message queue, and this data has to be re-imported into the OLAP engine for calculation.
1.3 traditional data warehouse vs real-time data warehouse
Then let's take a look at the overall difference between traditional data warehouse and real-time data warehouse.
First of all, from the perspective of timeliness: offline data warehouse is to support hours and days, real-time data warehouse to seconds-level minutes, so the timeliness of real-time data warehouse is very high.
In terms of data storage mode: offline data warehouse needs to be stored on HDFS and RDS. Real-time data warehouse generally contains message queues and some kv storage, such as dimensional data will be more stored on kv storage.
In the production and processing process, the offline data warehouse needs to rely on offline computing engine and offline scheduling. But for the real-time data warehouse, it mainly depends on the real-time computing engine.
two。 Implementation of typical ETL scenarios based on Flink
Here we mainly introduce two real-time ETL scenarios: dimension table join and dual-stream join.
Dimension table join
Preloaded dimension table
Thermal storage association
Broadcast dimension table
Temporal table function join
Dual-stream join
Offline join vs. Real-time join
Regular join
Interval join
Window join
2.1D table join2.1.1 preload dimension table
Option 1:
The dimension table is fully preloaded into memory for association. The specific implementation is that we define a class to implement RichFlatMapFunction, then read the dimension database in the open function, load all the data into memory, and then use operators on the probe stream to associate with memory dimension data at runtime.
The advantage of this scheme is that it is relatively simple to implement, and the disadvantages are obvious, because we have to load each dimension data into memory, so it only supports a small amount of dimension data. At the same time, if we want to update the dimension table, we also need to restart the job, so it is a bit expensive to update the dimension data, and it will cause some delay. For the preloaded dimension table, it is applicable to the small dimension table, where the frequency of change is not very high, and the requirement for the timeliness of change is relatively low.
A DimFlatMapFunction is defined here to implement RichFlatMapFunction. There is a dim of Map type, in fact, after reading the dimension data of DB, it can be used to store our dimension data, and then in the open function, we need to connect to our DB to obtain the data in DB. Then you can see in the code below that our scene is to extract the ID and the name of the product from a list of goods. Then after we get the dimensional data in DB, we will store it in dim.
Next, we will use dim in the flatMap function. After we have obtained the data of the probe stream, we will compare it in dim. Whether it contains the data of the same commodity ID, if so, append the relevant product name to the data tuple, and then make an output. This is a basic process.
In fact, this is a basic initial version of the implementation of the scheme. But this solution also has an improved way, that is, in the open function, you can create a new thread to load the dimension table regularly. In this way, there is no need to manually restart job to update the dimension data, and a periodic update of dimension data can be achieved.
Option 2:
Through the mechanism of Distributed cash to distribute local dimension files to task manager and then load them into memory to do correlation. The implementation can be divided into three steps:
The first step is to register the file through env.registerCached.
The second step is to implement RichFunction, which gets the cache file through RuntimeContext in the open function.
Step 3 is to parse and use this part of the file data.
One of the advantages of this approach is that you do not need to prepare or rely on an external database, but the disadvantage is that the data is also loaded into memory, so the amount of data supported in the dimension table is relatively small. And if the dimension data needs to be updated, the job needs to be restarted. Therefore, it is not recommended to use this scheme in the formal production process, because in fact, from the warehouse point of view, we hope that all the data can be managed by schema. Putting the data in the file to do such an operation is not conducive to our overall data management and standardization. So in this way, you can use it when doing some small demo, or when testing.
Then it applies to some scenarios where the dimensional data is in the form of a file, the amount of data is small, and the update frequency is relatively low, for example, we read a static code table, configuration file, and so on.
2.1.2 Hot Storage Association
The second big implementation idea in dimension table join is thermal storage association. Specifically, we import the dimensional data into some hot storage such as Redis, Tair, HBase, and then query it through asynchronous IO, and overlay using the Cache mechanism. We can also add some obsolete mechanism, and finally cache the dimensional data in memory to reduce the overall access pressure to the hot storage.
Such a process is shown in the figure above. In the Cache section, Google's Guava Cache is recommended, which encapsulates some asynchronous interactions about Cache, as well as some mechanisms for Cache elimination, which is more convenient to use.
There are two important points in the experimental solution just now. One is that we need to use asynchronous IO to access storage. Here, let's review the difference between synchronous IO and asynchronous IO:
For synchronous IO, after a request is made, you must wait for the request to return before you can continue to send a new request. So the overall throughput is relatively small. Because real-time data processing pays special attention to latency, this way of synchronizing IO is unacceptable in many scenarios.
Asynchronous IO can issue multiple requests in parallel, the overall throughput is relatively high, and the latency will be much lower. If we use asynchronous IO, after the throughput of external storage increases, it will put a lot of pressure on external storage, and sometimes it will become a bottleneck of delay in our entire data processing. So the introduction of Cache mechanism is to reduce our access to external storage through Cache.
Just mentioned Cuava Cache, its use is very simple, you can try to use. For the hot storage association scheme, its advantage is that because the dimensional data does not have to be fully loaded in memory, it is not limited to memory size, and the amount of dimensional data can be more. In Meituan Dianping's traffic scenario, our dimensional data can be supported to the order of 1 billion. On the other hand, the disadvantage of this scheme is obvious, we need to rely on hot storage resources, and the dimension update feedback to the result is delayed to a certain extent. Because we first need to import the data into hot storage, and then there will be a loss in the time when the Cache expires.
Generally speaking, this method is suitable for scenarios where the amount of dimension data is relatively large and it can accept that there is a certain delay in dimension update.
2.1.3 broadcast dimension table
The third big idea is to broadcast dimension tables, which mainly uses broadcast State to broadcast dimension data streams to downstream task to do join.
Implementation method:
Send dimension data to Kafka as broadcast raw stream S1
Define the state descriptor MapStateDescriptor. Call S1.broadcast () to get broadCastStream S2
Call non-broadcast stream S3.connect (S2) to get BroadcastConnectedStream S4
Implement the association processing logic in KeyedBroadcastProcessFunction/BroadcastProcessFunction and call S4.process () as a parameter
The advantage of this scheme is that the dimension changes can be updated to the results in a timely manner. Then the disadvantage is that the data still needs to be stored in memory, because it is stored in state, so the amount of data supporting dimensional tables is still not very large. The applicable scenario is that we need to perceive the changes of dimensions from time to time, and the dimensional data can be transformed into real-time streams.
Here is a small demo:
The broadcast stream pageStream we use here actually defines a page ID and the name of the page. For non-broadcast stream probeStream, it is a string in json format, which contains the device ID, the ID of the page, and the timestamp, which can be understood as the behavior record of the user's PV access on the device.
From the point of view of the whole implementation, follow the above four steps:
The first step is to define the state descriptor for the broadcast.
Step 2 let's go here to generate broadCastStream.
In step 3, we need to connect the two stream.
The most important part of step 4 is the need to implement BroadcastProcessFunction. The first parameter is our probeStream, the second parameter is the data of the broadcast stream, and the third parameter is the data we want to output. You can see that the main data processing logic is in processElement.
In the process of data processing, we first obtain our broadcastStateDesc through context, then parse the data of the probe stream, and finally get the corresponding pageid. Then we just got the state to query whether there is the same pageid, and if we can find the corresponding pageid, then add the corresponding pagename to our entire json stream for output.
2.1.4 Temporal table function join
After introducing the above method, another more important method is to use Temporal table function join. First of all, what is Temporal table? It is actually a concept: it can return the view of the data content of the continuously changing table at a certain time, and the continuously changing table is changingtable, which can be a real-time changelog data or a materialized dimension table placed on external storage.
Its implementation is to do the join of probe stream and Temporal table through UDTF, called Temporal table function join. This join approach is suitable for scenarios where dimensional data is in the form of changelog streams, and we have demands that we need to correlate by time version.
First of all, let's take a look at an example. Here is an example of exchange rate and currency transactions on the official website. For our dimensional data, which is the changelog stream just mentioned, it is RateHistory. It reflects the exchange rates of different currencies at different times relative to the yen.
The first field is the time and the second field is the currency currency. The third field is the exchange rate against the yen, and then from our probe table point of view, it defines the purchase of orders in different currencies. For example, two euros were bought at 10:15, and the table records a currency transaction. In this example, what we are asking for is the total yen trading volume for the purchase of currency. How can we achieve this goal through Temporal table function join?
Step 1 first we need to define TemporalTableFunction on top of the changelog stream, where two key parameters are necessary. The first parameter is a time attribute that can help us identify the version information, the second parameter is the component that needs to be associated, here we choose currency.
Then we register TemporalTableFunction's name in tableEnv.
Then let's take a look at the TemporalTableFunction we registered and what effect it can have.
For example, if we use the rates function to get the state at 11:50. You can see that for the dollar, its state at 11:50 actually falls in the range of 11, 49, 49, 11, and 56, so the choice is 99. Then for the euro, 11:50 falls between 11:15 and 12:10, so we would choose a figure like 119th. What it actually implements is the concept of TemporalTable that we defined at the beginning, which can obtain the valid data of changelog at some point. Once the TemporalTableFunction is defined, we need to use this Function to implement the business logic.
Note that we need to specify the specific join key we need to use here. For example, because both streams are constantly updated, for our 11:00 record in order table, the correlation is the euro's status at 10:45, and then it's 116, so the final result is 232.
What I just introduced is the use of Temporal table function join.
Comparison of 2.1.5 dimensional table join
Then we will review some differences of join in each dimension in the dimension table join, so that we can better understand the scenarios in which each method is applicable.
In terms of implementation complexity: except that the hot storage association is a little more complex, the complexity of other implementations is basically low.
In terms of the amount of data in the dimensional table: hot storage association and Temporal table function join can support a relatively large amount of data. In other ways, the size of the memory is limited because the dimension table is loaded into memory.
On the update frequency of dimension tables: because preloading DB data into memory and Distributed Cache need to restart when updating dimension table data, they are not suitable for scenarios where dimension tables need to be changed frequently. For broadcast dimension tables and Temporal table function join, the dimension table data can be updated in real time and reflected to the results, so they can support scenarios where dimension tables are updated frequently.
For the real-time update of dimension table: in broadcast dimension table and Temporal table function join, they can achieve the effect of fast real-time update. Hot storage associations can also meet business needs in most scenarios.
In the form of dimensional tables: you can see that the first way is to support access to DB to store a small amount of data, Distributed Cache to support files, and hot storage associations to access hot storage such as HBase and Tair. Both broadcast dimension tables and Temporal table function join require dimensional data to be transformed into real-time streams.
On external storage: both the first way and the hot storage association need to rely on external storage.
In the dimension table join, let's first introduce these basic methods. Some students may have some other programs, and then you can feedback and communicate. Here are some more commonly used programs, but not limited to these programs.
2.2 dual-stream join
First of all, let's review, how does batch process two tables join? In general, when implementing a batch engine, two ideas are adopted.
One is a sort-based Sort-Merge join. The other is to convert it into Hash table and load it into memory to do Hash join. Are these two ideas equally applicable to dual-stream join scenarios? In the double-stream join scenario, the objects to be processed are no longer such batch data and limited data, but infinite data sets. For infinite data sets, we have no way to sort them and then process them, and there is no way to convert all infinite data sets into Cache and load them into memory for processing. Therefore, these two methods are basically not applicable. At the same time, in the dual-stream join scenario, our join object is two streams, and the data is constantly entering, so the results of our join also need to be constantly updated.
So what kind of solution should we have to solve the implementation problem of dual-stream join? One of the basic ideas of Flink is to persist the data of the two streams into state and then use it. Because of the need to constantly update the results of join, the previous data can not be discarded theoretically without any strings attached. However, in terms of implementation, state can not keep all the data permanently, so it is necessary to localize the global scope of join in some ways, that is, to split an infinite data stream into wired data sets as far as possible to do join.
In fact, it is basically such a big idea. Next, let's take a look at the specific implementation.
2.2.1 offline join vs. Real-time join
Next, let's take inner join as an example.
The left stream is marked in black and the right stream is marked in blue. These two streams need to be inner join. First of all, after the left and right streams are entered, the relevant elements need to be stored on the corresponding state. In addition to storing the data elements on the state, the data elements in the left stream need to be compared with the Right State on the right to see if they can match. Similarly, after the flow element on the right arrives, you also need to compare it with the Left State on the left to see if it can be match. If you can match, it will be output as the result of inner join. This picture is a rough display of the general details of an inner join. It is also a way to realize the dual-stream join.
2.2.2 Regular join
Let's first take a look at the type 1 dual-stream join approach, Regular join. This join approach needs to retain the state of both streams, keep it continuously and do not purge. The data on both sides is all visible to each other's streams, so the data needs to be continuously stored in the state, so the state cannot be stored too large, so this scenario is only suitable for bounded data streams. Its syntax, as you can see, is more like offline batch SQL:
In the above page are some of the current Flink support Regular join writing, you can see that and our ordinary SQL is basically the same.
2.2.3 Interval join
The second type of join supported by Flink in dual-stream join is Interval join, also known as interval join. What does it mean? It adds a time window limit, which requires that when two streams do join, one of the streams must fall within a certain time range of the timestamp of the other stream, and their join key is the same in order to complete the join. Adding the time window limit allows us to clean up the data that is out of the time range, so that we don't need to retain the full amount of State.
Interval join supports both processing time and even time to define time. If you are using processing time,Flink internal will use the system time to divide the window, and to do the relevant state cleaning. If you use even time, you will use the mechanism of Watermark to divide windows and do State cleaning.
The author of Flink previously had a very intuitive sharing of content, so here is an example of reusing his part directly:
We can see that for Interval join: it defines a lower limit of time that allows us to clean up data outside the lower limit of time. For example, in the SQL, we actually limited the join condition that ordertime must be greater than shiptime minus 4 hours. For Shipments streams, if you receive the 12:00 Watermark, it means that the data for the Orders stream can be discarded when the data timestamp before 8:00 is less than the time stamp, and is no longer retained in the state.
At the same time, for shiptime, it also sets a lower limit of time, that is, it must be greater than ordertime. For Orders streams, if a 10:15 Watermark is received, the Shipments's state data before 10:15 can be discarded. So Interval join allows us to clean up part of the history of state.
2.2.4 Window join
Finally, let's talk about the third Window join of dual-stream join: its concept is to join elements in two streams that have the same key and are in the same window. Its execution logic is similar to Inner join, it must satisfy the same join key at the same time, and the elements can only be output in the final result in the same window. The specific way to use it is as follows:
Currently, Window join only supports Datastream's API, so the way it is used here is also a form of Datastream. We can see that we first join the two streams, then define the conditions of join key in where and equalTo, then specify the window partition method WindowAssigner in window, and finally define JoinFunction or FlatJoinFunction to implement the specific processing logic of our matching elements.
Because window is actually divided into three categories, our Window join here will also be divided into three categories:
Type 1 Tumbling Window join: it is a window divided by time interval.
You can see that there are two streams (the green one and the yellow one) in this picture. In this example, we define a window of two milliseconds, each circle is a single element on each stream, and the timestamp above represents the corresponding time of the element, so we can see that it is divided by an interval of two milliseconds, and there is no overlap between window and window. For the first window, we can see that the green stream has two elements that match, and then the yellow flow has two elements that match, which will be combined in pair, and finally input into JoinFunction or FlatJoinFunction to do specific processing.
The second type of window is Sliding Window Join: Sliding Window is used here.
Sliding window first defines a window size, and then defines a sliding time window size. If the size of the sliding time window is less than the defined window size, there will be overlap between the window and the window. As shown in this figure, the red window and the yellow window overlap, where the 0 element of the green flow is in both the red window and the yellow window, indicating that an element can be in both windows at the same time. Then in the specific Sliding Window Join, you can see that for the red window, there are two elements, green 0 and yellow 0, their two elements meet the window join condition, so they will form a pair of 0Pol 0. Then, for the yellow window, the two digits of green 0 and yellow 0 and 1 are qualified, and they will be combined into two pair of 0Magi 1, 0Magi 0 and 1Magi 0, and finally they will go into our defined JoinFunction to do processing.
The third category is SessionWindow join: the window used here is session window.
Session window defines a time interval during which a stream will reopen a new window if no elements arrive. In the above picture, we can see that there is no overlap between windows. The Gap we define here is 1, and for the first window, you can see that the green 0 element and the yellow 1 and 2 elements are all in the same window, so it will form a pair like 1, 0 and 2. The rest is similar, qualified pair will go into the final JoinFunction to do processing.
The above is how a typical ETL scenario based on Flink is implemented. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.