In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
Flink-SQL extension is how to achieve, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can gain something.
What the editor says is based on flink 1.5.4.
Why do we extend Flink-SQL?
Because the SQL syntax of Flink itself does not provide SQL syntax for docking input sources and output purposes. In the process of using, data development needs to write Source and Sink according to the Api interface provided by it, which is extremely tedious. It not only needs to know the API of all kinds of Operator of FLink, but also needs to understand the relevant calling methods of each component (such as kafka,redis,mongo,hbase, etc.), and does not provide SQL-related implementation when it needs to be associated with external data sources. Therefore, data development directly uses Flink to write SQL for real-time data analysis, which requires a large amount of extra work.
Our goal is to only care about what to do when using Flink-SQL, not how to do it. Do not need to care too much about the implementation of the program, focus on business logic.
Next, let's take a look at the extension implementation of Flink-SQL!
01 which flink-related sql have been extended
(1) create a source table statement
(2) create output table statements
(3) create a custom function
(4) Dimension table association
02 how each module is translated into the implementation of flink
(1) how to convert the sql statement that creates the source table into the operator of flink
All the tables in Flink are mapped to the class Table. Then call the registration method to register Table with environment.
StreamTableEnvironment.registerTable (tableName, table)
Currently we only support kafka data sources. Flink itself has an implementation class that reads kafka, FlinkKafkaConsumer09, so you only need to instantiate the object according to the specified parameters. And call the registration method to register.
Also note that rowtime,proctime is often needed in flink sql, so we add additional rowtime,proctime to the registry structure.
When you need to specify an additional DataStream.watermarks (assignTimestampsAndWatermarks) for the use of rowtime, custom watermark mainly does two things: 1: how to get the time field from Row. 2: set the maximum delay time.
(2) how to convert the created output table sql statement to flink operator
The base class of Flink output Operator is OutputFormat, and what we inherit here is RichOutputFormat. This abstract class inherits OutputFormat and implements the method getRuntimeContext () to obtain the runtime environment, which is convenient for us to customize operations such as metric later.
Let's take the output to the mysql plug-in mysql-sink as an example, which is divided into two parts:
Parse create table into table name, field information, and mysql connection information.
This section uses regular expressions to convert create table statements into an internal implementation class. This class stores table names, field information, plug-in types, and plug-in connection information.
Inherit RichOutputFormat to write data to the corresponding external data source.
The main purpose is to implement the writeRecord method, which in fact calls the jdbc implementation insert or update method in the mysql plug-in.
(3) how to convert custom function statements into operator of flink
Flink provides two types of implementation for udf:
(1) inherit ScalarFunction
(2) inherit TableFunction
All you need to do is add the user-provided jar to the URLClassLoader, load the specified class (the classpath that implements the above interface), and then call TableEnvironment.registerFunction (funcName, udfFunc); that is, complete the registration of the udf. You can then use the redefined udf
(4) how is the dimension table function realized?
A common requirement in flow computing is to complete fields for data flow. Because the data collected by the data acquisition side is often limited, it is necessary to complete the required dimensional information before data analysis, but the current flink does not provide the SQL function of join external data sources.
There are several issues that need to be paid attention to to implement this function:
(1) the data of dimension table is constantly changing.
In the implementation, you need to support external data sources that regularly update the cache in memory, such as using strategies such as LRU.
(2) IO throughput problem
If each piece of data received is serialized to the external data source to obtain the corresponding associated records, the network delay will be the biggest bottleneck of the system. Here we choose the operator RichAsyncFunction that Ali contributes to the flink community. This operator obtains data from external data sources asynchronously, which greatly reduces the time spent on network requests.
(3) how to parse the dimension table contained in sql to flink operator
In order to parse the specified dimension table and filter conditions from sql, using regularization is obviously not an appropriate approach. You need to match all kinds of possibilities. It will be an endless process. Check out flink's own resolution of sql. It uses calcite to do the work of sql parsing. Parse the sql into a syntax tree, search the corresponding dimension table iteratively, and then separate the dimension table from the non-dimensional table structure.
Through the above steps, the commonly used kafka source table and join external data source can be written to the specified external destination structure through SQL.
Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.