In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to use Look up dimension table in flink". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to use Look up dimension tables in flink.
Background
Dimension table is a very common concept in streaming computing. It is generally used in the join of sql to complete the streaming data. For example, our source stream is the order data from the log, but we only record the id of the order goods in the log, and there is no other information. But when we store the data in the warehouse for data analysis, we need other information such as product name, price and so on. In this kind of problem, we can complete the data by querying the dimension table when processing the stream.
Dimension tables are generally stored in external storage, such as mysql, hbase, redis, and so on. Today we take mysql as an example to talk about the use of dimension tables in flink.
LookupableTableSource
A LookupableTableSource is provided in flink, which can be used to implement dimension tables, that is, we can query external storage through several key columns to obtain relevant information to complete stream data.
Public interface LookupableTableSource extends TableSource {
TableFunction getLookupFunction (String [] lookupKeys)
AsyncTableFunction getAsyncLookupFunction (String [] lookupKeys)
Boolean isAsyncEnabled ()
}
As we can see, there are three ways to LookupableTableSource
GetLookupFunction: used to synchronize the data of the query dimension table and return a TableFunction, so it is essentially implemented through a user-defined UDTF.
GetAsyncLookupFunction: used to asynchronously query the data of the dimension table. This method returns an object.
IsAsyncEnabled: synchronous query by default. If you want to enable asynchronous query, this method needs to return true
In flink, we see that there are four main classes that implement this interface, JdbcTableSource,HBaseTableSource,CsvTableSource,HiveTableSource. Today we mainly take jdbc as an example to talk about how to query dimension tables.
Example explanation
Next, let's give a small example. First, we define stream source. We use datagen provided by flink 1.11 to generate data.
Let's simulate the generation of user data. Here, only the user's id is generated, ranging from 1 to 100.
CREATE TABLE datagen (
Userid int
Proctime as PROCTIME ()
) WITH (
'connector' = 'datagen'
'rows-per-second'='100'
'fields.userid.kind'='random'
'fields.userid.min'='1'
'fields.userid.max'='100'
)
For more information on how to use datagen, please see:
Talk about the random data generator in flink 1.11-DataGen connector
Then create an mysql dimension table information:
CREATE TABLE dim_mysql (
Id int
Name STRING
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc:mysql://localhost:3306/test'
'table-name' = 'userinfo'
'username' = 'root'
'password' = 'root'
)
The sample data in our mysql table is as follows:
Finally, the sql query is executed, and the flow table is associated with the dimension table:
SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime ON datagen.userid = dim_mysql.id
An example of the result is as follows:
3 > 53Magnet 2020-09-03T07Rose 19purl 34.565 recorder nullpenol null
3 > 73Magic 2020-09-03T07Rose 19purs 34.566 recordings nullpenol null
1 > 14 Magazine 2020-09-03T07PUR 19purl 34.566 Jol 14recoveraaddda
2 > 11T07T032020-09-03T07GRAPHY 34.566 recorder nullpenol null
4 > 8Jing 2020-09-03T07GRAPHY 34.566JET 8 name 8
1 > 61Magazine 2020-09-03T07Rose 19purl 34.567 recorder nullpenol null
3 > 12Magne2020-09-03T07GRAPHY 34.567JELER 12MAA
2 > 99tech 2020-09-03T07GRAPHY 34.567 recorder nullpenol null
4 > 37 Magazine 2020-09-03T07GRAPHY 19GRAPHY 34.568MAL nullpenol null
2 > 13T07T032020-09-03T07GRAPHY 34.569 13JINGOTADADA
3 > 6T07mm 2020-09-03T07GRAPHY 34.568pr 6
We see that the data that exists in the dimension table has been associated, and the data that is not in the dimension table is displayed as null.
Source code parsing JdbcTableSource
Taking jdbc as an example, let's take a look at what the bottom layer of flink does.
The JdbcTableSource#isAsyncEnabled method returns false, that is, asynchronous queries are not supported, so enter the JdbcTableSource#getLookupFunction method.
@ Override
Public TableFunction getLookupFunction (String [] lookupKeys) {
Final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo (producedDataType)
Return JdbcLookupFunction.builder ()
.setOptions (options)
.setLookupOptions (lookupOptions)
.setFieldTypes (rowTypeInfo.getFieldTypes ())
.setFieldNames (rowTypeInfo.getFieldNames ())
.setKeyNames (lookupKeys)
.build ()
}
Finally, a JdbcLookupFunction object is constructed.
Options are parameters that connect to jdbc, such as user, pass, url, and so on.
LookupOptions is a number of parameters related to dimension tables, mainly cache size, timeout, and so on.
LookupKeys means to associate the fields of the query dimension table.
JdbcLookupFunction
So let's take a look at the JdbcLookupFunction class, which is a subclass of TableFunction. You can refer to this article for the specific use of TableFunction:
Flink practical tutorial-TableFunction of Custom function
The core of a TableFunction is the eval method. In this method, the main work is to query the data through the splicing of multiple keys into sql. The first thing to query is the cache, and the data in the cache will be returned directly. If there is no cache, then query the database, and then return the query results and put them into the cache. The next query will directly query the cache.
Why add a cache? Cache is not enabled by default, and each query will send a request to the dimension table to query. If the amount of data is relatively large, it is bound to cause some pressure on the system that stores the dimension table, so flink provides a LRU cache. When querying the dimension table, the cache is queried first, and the cache does not query the external system, but if a data query frequency is relatively high, it has been hit all the time. You won't be able to get new data. So the cache also needs to add a timeout, after which the data will be forcibly deleted and go to the external system to query the new data.
How to turn on the cache exactly? Let's take a look at the JdbcLookupFunction#open method
@ Override
Public void open (FunctionContext context) throws Exception {
Try {
EstablishConnectionAndStatement ()
This.cache = cacheMaxSize =-1 | | cacheExpireMs = =-1? Null: CacheBuilder.newBuilder ()
.accounreAfterWrite (cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize (cacheMaxSize)
.build ()
} catch (SQLException sqe) {
Throw new IllegalArgumentException ("open () failed.", sqe)
} catch (ClassNotFoundException cnfe) {
Throw new IllegalArgumentException ("JDBC driver class not found.", cnfe)
}
}
That is to say, if cacheMaxSize and cacheExpireMs need to be set at the same time, a cache object cache will be constructed to cache data. The properties of the DDL corresponding to these two parameters are lookup.cache.max-rows and lookup.cache.ttl
For the specific cache size and timeout settings, users need to define themselves according to their own situation, and make a tradeoff between the accuracy of the data and the throughput of the system.
At this point, I believe you have a deeper understanding of "how to use Look up dimension tables in flink". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.