In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the Flink dynamic table on the continuous query how to achieve, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let the editor with you to understand.
Using SQL to analyze data flow
More and more companies are adopting streaming technology and migrating existing batch applications to streaming or designing streaming solutions for new applications. Many of these applications focus on analyzing stream data. The data streams analyzed come from a wide range of sources, such as database transactions, clicks, sensor measurements or Internet of things devices.
Apache Flink is very suitable for streaming analysis because it provides event time semantic support, one-time processing, and high throughput and low latency at the same time. Because of these features, Flink can calculate exact and deterministic results from a large number of input streams in near real time, while providing the semantics of exactly one-time processing in the event of a failure.
API,DataStream API, the core of Flink's stream processing, is very expressive and provides primitives for many common operations. Among other functions, it provides highly customizable window logic, different state primitives with different performance characteristics, hooks for registering and responding to timers, and tools for providing efficient asynchronous requests to external systems. On the other hand, many flow analysis applications follow a similar pattern and do not require the level of expression provided by DataStream API. They can be expressed in a more natural and concise way in a domain-specific language. As we all know, SQL is the de facto standard for data analysis. For streaming analysis, SQL allows more people to develop applications on data streams in a shorter period of time. However, there is no open source stream processor that provides comprehensive and good SQL support.
Why is SQL on Streams a big problem?
For many reasons, SQL is the most widely used language in data analysis:
SQL is declarative: you specify what you want, but you don't know how to calculate it.
SQL can be effectively optimized: the optimizer generates a good execution plan to calculate the results.
SQL can be evaluated efficiently: the processing engine knows exactly what to calculate and how to do it effectively.
Finally, as you all know, many tools talk about SQL.
Therefore, SQL can be used to process and analyze data streams, making the stream processing technology available to more users. In addition, due to the declarative nature of SQL and the potential for automatic optimization, it greatly reduces the time and effort required to develop efficient flow analysis applications.
However, SQL (as well as relational data models and algebra) was not designed with streaming data in mind. A relationship is a collection, not an infinite sequence of tuples. When executing SQL queries, traditional database systems and query engines will read and process the full available dataset and produce fixed-size results. Instead, the data stream continues to provide new records, allowing the data to arrive over time. Therefore, streaming queries must continue to process incoming data, not "complete data".
That is to say, it is not impossible to process streams with SQL. Some relational database systems have the urgent maintenance function of materialized views, which is similar to SQL queries on evaluation data streams. Materialized views are defined as SQL queries as well as regular (virtual) views. However, the results of a materialized view query are actually stored (or materialized) on memory or disk, so that the query does not need to be evaluated immediately at the time of the query. To prevent the materialized view from becoming old, the database system needs to update the view when its basic relationship (defining the table referenced in the query) is modified. If you think of a change in the underlying relationship of a view as a change flow (or as a change log flow), it is clear why the materialized view on the flow is to some extent relevant to sql.
FlinkAPI: tables API and SQL
Since the release of version 1.1.0 in August 2016, Flink has two semantically equivalent relationships API, Table API of embedded language (language-embedded) (for Java and Scala) and standard SQL. Both API are designed as a unified API for real-time processing and offline batch processing. Which means
Regardless of whether the input is static bulk data or streaming data, the query produces exactly the same results.
A unified API for streaming and batch processing is important for a number of reasons. First, users only need to learn an API to handle both static and streaming data. In addition, batch and streaming data can be analyzed using the same query, so that historical and real-time data can be analyzed at the same time in the same query. In the current state, we have not yet achieved the complete unity of batch and streaming semantics, but the community is making good progress in achieving this goal.
The following code snippet shows two equivalent Table API and SQL queries that calculate a simple collection of windows on the temperature sensor measurement stream. The syntax of SQL queries is based on the syntax of Apache Calcite's grouping window function and will be supported in version 1.3.0 of Flink.
As you can see, both API are tightly integrated with each other and with Flink's main DataStream and DataSet API. A Table can be generated from a DataSet or DataStream, or it can be converted into a DataSet or DataStream. Therefore, you can easily scan external table sources (such as databases or Parquet files), perform some preprocessing using Table API queries, convert the results to DataSet, and run the Gelly graph algorithm on it. The query defined in the above example can also be used to process batch data by changing the execution environment.
Internally, both API are translated into the same logical representation, optimized by Apache Calcite, and compiled into DataStream or DataSet programs. In fact, the optimization and compilation process does not know whether the query is defined using Table API or SQL. Because Table API and SQL are semantically equivalent and only differ in syntax, we always refer to these two API when we discuss SQL in this article.
In the current state (version 1.2.0), Flink's relational API supports a limited set of relational operators on the data stream, including projections, filters, and window aggregations (projections, filters, and windowed aggregates). All supported operators have something in common: they never update published result records. This is obviously not a problem for one-time record operators such as projection and filter. However, it affects operators that collect and process multiple records, such as window aggregation. Because published results cannot be updated, input records that arrive after the results are published must be discarded in Flink 1.2.0.
For applications that send data to the storage system, such as Kafka topics, message queues, or files that only support append operations and do not update or delete, the limitations of the current version are acceptable. Common use cases that follow this pattern are, for example, continuous ETL and stream archiving applications that save streams to archives or prepare data for further online (streaming) analysis or subsequent offline analysis. Because previously published results cannot be updated, these types of applications must ensure that the published results are correct and do not need to be corrected in the future. The following figure illustrates these applications.
While supporting only appended queries is useful for certain types of applications and specific types of storage systems, there are many flow analysis use cases that need to update the results. This includes streaming applications that cannot discard late records, require early results of (long-running) windowed aggregations, or require non-windowed aggregations. In each case, the previously issued result record needs to be updated. A result update query usually implements its results as an external database or key value store so that external applications can access and query. The applications that implement this pattern are dashboards, reporting applications, or other applications, which require up-to-date results. The following figure illustrates these types of applications.
Continuous queries on dynamic tables
Support for updating queries with previously published results is the next important step in the Flink relational API. This functionality is important because it greatly increases the scope of API and the range of use cases it supports.
Therefore, when adding support for result update queries, we must of course preserve the uniform semantics of stream and batch input. We achieve this through the concept of dynamic tables. A dynamic table is a constantly updated table that can be queried like a regular static table. However, compared to a batch table that terminates as a result and returns a static table, the query on the dynamic table runs continuously and generates a table that is constantly updated according to the changes on the input table. Therefore, the result table is also a dynamic table. This concept is very similar to the materialized view maintenance we discussed earlier.
Assuming that we can run a query on a dynamic table that produces a new dynamic table, the next question is, how do flows and dynamic tables relate to each other? The answer is that you can convert the flow to a dynamic table and convert the dynamic table to a stream. The following figure shows a conceptual model for handling relational queries on a stream.
First, convert the flow to a dynamic table. Use continuous queries to query dynamic tables to generate new dynamic tables. Finally, the result table is converted back. It is important to note that this is just a logical model and does not mean how the query is actually executed. In fact, consecutive queries are translated internally into traditional DataStream programs.
Below, we describe the different steps of this model:
1. Define a dynamic table on a stream
two。 Query dynamic table
3. Send out a dynamic form.
Define a dynamic table on a flow
The first step in evaluating a SQL query on a dynamic table is to define a dynamic table on the flow. This means that we must specify how the record of the flow modifies the dynamic table. The record carried by the stream must have a schema that can be mapped to the relational schema of the table. There are two modes that can define dynamic tables on the flow: append mode and update mode.
In append mode, each flow record is an insert modification to the dynamic table. As a result, all records of the stream are appended to the dynamic table, making it growing and infinitely large. The following figure illustrates the append mode.
In update mode, flow records can represent insert, update, or delete changes to dynamic tables (append mode is actually a special case of update mode). When we define a dynamic table on the flow through the update schema, we can specify unique key attributes on the table. In this case, the update and delete operations are performed on the key attribute. The update mode is shown in the following figure.
Query dynamic table
Once we have defined a dynamic table, we can run the query on it. Because the dynamic table changes over time, we must define the meaning of the query dynamic table. Let's imagine that we take a snapshot of a dynamic table at a specific point in time. This snapshot can be treated as a regular static batch table. We represent the snapshot of dynamic table An at point t as A [t]. Snapshots can be queried with any SQL query. The query generates a regular static table as a result. We represent the result of the query Q on the dynamic table An of time t as Q (A [t]). If we repeatedly calculate the results of the query dynamic table snapshot to get the progress point, we will get many static result tables that change over time, and effectively form a dynamic table. We define the semantics of a query in the dynamic table as follows.
The query Q on dynamic table A produces the dynamic table R, which at each point in time t equals the result of applying Q to A [t], that is, R [t] = Q (A [t]). This definition means running the same query Q on a batch table and producing the same results in the flow table. Below, we give two examples to illustrate the semantics of queries on dynamic tables.
In the following figure, we see a dynamic input table An on the left, which is defined in append mode. At t = 8, A consists of six lines (blue). At time t = 9 and t = 12, a line is appended to A (shown in green and orange, respectively). We run a simple query shown in the center of the diagram on Table A. The query is grouped by attribute k and the records of each group are counted. On the right, we see the results of querying Q (oranges) at time t = 8 (blue), t = 9 (green), and t = 12:00. At each point in time t, the result table is equivalent to a batch query on dynamic table An at time t.
The query in this example is a simple grouping (but no window) aggregate query. Therefore, the size of the result table depends on the number of different grouping keys for the input table. In addition, it is worth noting that the query constantly updates its previously issued result rows, rather than just adding new rows.
The second example shows a similar query that differs in an important respect, in addition to grouping records on a key attribute k, the query groups records into a five-second scrolling window, which means it calculates the count of each k value every five seconds. Again, we use the group window function of Calcite to specify this query. On the left side of the figure, we see input table An and how it changes over time in append mode. On the right, we see the results table and how it changes over time.
Contrary to the results of the first example, the result table grows relative to time, that is, new result rows are calculated every 5 seconds (assuming that the input table has received more records in the past 5 seconds). Although the non-windowed query (primarily) updates the rows of the result table, the windowed aggregate query appends only the new rows to the result table.
Although this post focuses on the semantics of SQL queries on dynamic tables rather than on how to handle such queries effectively, we would like to point out that it is not possible to calculate the complete results of the query from scratch whenever the input table is updated. Instead, the query is compiled into a streaming program that constantly updates its results according to changes in input. This means that not all valid SQL queries are supported, but only those SQL queries that can be sequentially, incrementally, and efficiently evaluated are supported. We plan to discuss the details of the evaluation of SQL queries on dynamic tables in subsequent blog posts.
Issue a dynamic form
The query dynamic table generates another dynamic table that represents the results of the query. Based on the query and its input tables, the resulting tables are continuously modified by inserts, updates, and deletions, just like regular database tables. It may be a table with a single row that is constantly updated, only inserting the table without updating changes, or both.
Traditional database systems use logs to rebuild tables in the event of failure and replication. There are different logging techniques, such as UNDO,REDO and UNDO / REDO logging. In short, the UNDO log records the previous value of the modified element to recover the incomplete transaction, the REDO log records the new value of the modified element to change the completed transaction lost by redo, and the UNDO / REDO log records the old and new values of a changed element to undo the lost changes of the incomplete transaction and the redo completed transaction. According to the principles of these logging techniques, dynamic tables can be converted into two types of update log flows, namely, REDO flow and REDO + log flow.
Convert a dynamic table to a redo+undo stream by converting the changes in the table into stream messages. The insert is transmitted as an insert message with a new row, the delete modification is transmitted as a delete message with the old row, and the update modification is transmitted as the delete message with the old row, and the insert message with the new row. The following figure illustrates this behavior.
A dynamic table is shown on the left, which is maintained in append mode and used as input to the graph center query. The query results are converted to the redo + undo stream shown at the bottom. The first record of the input table (1meme A) produces a new record in the result table and therefore inserts the message + in the stream (AMagi 1). A second input record with k ='A'(4MagneA) generates an update of the record in the result table, and thus generates a delete message-(AMagol 1) and an insert message + (A, 2). All downstream operators or data receivers need to be able to handle both types of messages correctly.
In both cases, a dynamic table can be converted to an redo stream: it can be either an append-only table (that is, with only insert modifications) or a unique key attribute. Each insert modification on the dynamic table generates an insert message and adds a new row to the redo flow. Due to the limitations of the redo stream, only tables with unique keys can be updated and deleted. If the key is deleted from the keyed dynamic table, or because the row is deleted or because the key property of the row is modified, the delete key in the delete key is sent to the redo stream. Update changes result in an update message with an update, that is, a new line. Because delete and update modifications are defined for a unique key, downstream operators need to be able to access previous values through key. The following figure shows how the result table of the same query is transformed into a redo stream.
Generating rows that are inserted into the dynamic table (1Magi A) causes + (AMagne1) to insert messages. An updated line (4JA) is generated and an update message (AP2) is generated.
A common use case for redo streams is to write query results to append-only storage systems, such as scrolling files or Kafka topics, or to data stores with key access features, such as Cassandra, relational databases, or compressed kafka topics. You can also implement dynamic tables as keyed states within a streaming application to evaluate continuous queries and make them available from external systems. With this design, Flink itself maintains the results of continuous SQL queries in the flow and provides key lookups on the results table, such as from dashboard applications.
What happens when you switch to a dynamic table?
In version 1.2, all streaming operators of Flink's relational API (such as filters, item and group window aggregations) only emit new rows and cannot update previously issued results. In contrast, dynamic tables can handle updates and delete changes. Now you may ask yourself: what does the current version of the processing model have to do with the new dynamic table model? Will the semantics of API change completely? Do we need to reimplement API from scratch to achieve the required semantics?
The answers to all these questions are simple. The current processing model is a subset of the dynamic table model. Using the terms we introduced in this article, the current model converts the flow into a dynamic table in append mode, that is, an infinite growth table. Because all operators accept only insert changes and produce insert changes (that is, emit new rows) on their result tables, all supported queries generate dynamic append tables that are converted back to DataStreams using the redo model for append tables. Therefore, the semantics of the current model is completely covered and saved by the new dynamic table model.
Conclusion and prospect
Flink's relational API can quickly implement flow analysis applications and use them in a variety of production environments. In this blog post, we discussed the future of Table API and SQL. This effort will enable more people to access Flink and stream processing. In addition, the unified semantics for querying historical and real-time data and the concept of querying and maintaining dynamic tables will make the implementation of many exciting use cases and applications very easy. Because this article focuses on the semantics of relational queries on flows and dynamic tables, we do not discuss the details of how to execute queries, including internal reclamation, handling late events, support for early results, and boundary space requirements.
In recent months, many members of the Flink community have been discussing and contributing to the relationship API. We have made great progress so far although most of our work has focused on processing streams in append mode, but the next step on the agenda is to process dynamic tables to support queries that update their results. If you are excited about the idea of using SQL to process streams and want to work on it, please provide feedback, join the discussion in the mailing list, or work on JIRA issues.
Thank you for reading this article carefully. I hope the article "how to realize continuous queries on dynamic tables in Flink" shared by the editor will be helpful to everyone. At the same time, I also hope that you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.