Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use FlinkStreamSQL correctly

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you how to use FlinkStreamSQL correctly. The content is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

I. preliminary preparation

Project path: https://github.com/DTStack/flinkStreamSQL

Official document: https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/docs/quickStart.md

Git Clone project

First of all, you need to import the project from Github to IDEA (there are many import methods, here is a common one)

From the IDEA menu bar, Git option-> Clone-> FlinkStreamSQL project address, click Clone to get a copy of the FlinkStreamSQL source code!

After the project is downloaded, the default branch is 1.11_release, and the corresponding Flink version is Flink 1.11.x (the release version of FlinkStreamSQL corresponds to the release version of Flink). You need to switch between other versions. It is recommended to use 1.10_release.

Project compilation

After downloading the project, before compiling for the first time, maven reimport the whole project once.

If there is a lack of JAR package, you can search on a certain degree or a song (the project itself does not rely on any unique JAR package, after all, it is an open source project), or search in the files of the official nail group, and you will find something unexpected.

Once there is no problem with the above, you can start compiling.

Compile command:

Mvn clean package-DskipTests

After packaging, the corresponding plug-in package folder is generated. Version 1.8 corresponds to the sqlplugins of plugins,1.10 and later versions.

If there are plug-ins that are not needed, you can comment out the plug-ins that you do not need in the pom under the root path of the project.

[! Attention! ] [! Attention! ] [! Attention! ]

Some plug-ins have dependencies, so when commenting, please be careful not to comment out the related dependent plug-ins.

The rdb module is dependent on all relational databases, including the impala module (although it is not a relational database, it uses JDBC)

The core module is dependent on all modules and cannot be commented!

The Launcher module is a must for task submission and cannot be commented!

The Kafka-base module is the foundation of the kafka plug-in. If you use the kafka plug-in (no matter what version), you can't comment!

Version 1.10 and later, the new dirtyData module is used to provide the specified storage function of dirty data (such as storing dirty data in a specified mysql database), which cannot be commented!

[! Attention! ] [! Attention! ] [! Attention! ]

Task submission

After the project has been compiled, you can submit the task. Tasks can be submitted in local, standalone, yarn-session and yarn-per-job modes, and application will be supported later (you need to wait until version 1.12)

Submit tasks from idea

If you don't understand any of the following concepts, check the information yourself (learning to look up information is more efficient than asking others)

The version of idea used is 2020.3 public version, which can be modified by itself in some different places.

Here, take yarn-per-job mode as an example. Other modes are similar. You can configure the task submission parameters by yourself in the documentation.

1. Configure idea-application

There is a quick way to find LauncherMain, then run it, modify it in the application automatically generated by idea, or simply "Modify Run Configuration"

The task submission parameters that you have been using are posted here, and you need to modify them by yourself. the specific meaning of each parameter is also described in detail in the official parameter document.

-nameTest-modeyarnPer-sql/dtstack/sql/test/JoinDemoFour.sql-localSqlPluginPath/IdeaProjects/StreamSQLOne/sqlplugins-flinkconf/dtstack/conf/flink-yarnconf/dtstack/conf/yarn-flinkJarPath/dtstack/flink-1.10.1/lib-confProp {\ "metrics.latency.interval\":\ "30000\",\ "metrics.latency.granularity\":\ "operator\",\ "time.characteristic\":\ "ProcessingTime\",\ "disableChain\":\ "true\"}-pluginLoadModeshipfile-queueb

How do you write the task SQL? According to your own plug-in, look at the corresponding plug-in documentation. The most basic task SQL framework is:

CREATE Source (source table)-> CREATE Side (dimension table to determine whether it is needed according to your own business)-> CREATE Sink (result table)-> INSERT INTO Sink blablabla... (SQL of the business actually executed, this must be required, otherwise the task is executed.)

The SQL for daily use is also posted here, which needs to be modified by yourself.

CREATE TABLE SourceOne (id int, name varchar, age bigint, phone bigint, birth timestamp, todayTime time, todayDate date, money decimal, price double, wechat varchar, proName varchar) WITH (type = 'kafka11', bootstrapServers =' kudu1:9092', zookeeperQuorum = 'kudu1:2181/kafka', offsetReset =' latest' Topic = 'tiezhu_in_one', enableKeyPartitions =' false', topicIsPattern = 'false', parallelism =' 1') CREATE TABLE DimOne (id int, age bigint, name varchar, birth timestamp, PRIMARY KEY (id, age, name), period for system_time) WITH (type = 'mysql', url =' jdbc:mysql://k3:3306/tiezhu?useSSL=false', userName = 'root', password =' admin123', tableName = 'TestOne', parallelism =' 1century, cache = 'LRU', asyncCapacity =' 100' AsyncTimeout = '1000, errorLimit =' 10, cacheTTLMs = '1000') CREATE VIEW ViewOne ASSELECT DO.age as age, SO.todayTime as todayTime, SO.todayDate as todayDate, SO.name as name, DO.id as id, DO.birth as birth, SO.proName as proNameFROM SourceOne SO LEFT JOIN DimOne DO ON SO.id = DO.id CREATE TABLE DimTwo (id int, proName varchar, createDate date, createTime time, PRIMARY KEY (id), period for system_time) WITH (type = 'mysql', url =' jdbc:mysql://k3:3306/tiezhu?useSSL=false', userName = 'root', password =' admin123', tableName = 'TestDemoTwo', parallelism =' 1century, cache = 'LRU', asyncCapacity =' 100' ErrorLimit = '10') CREATE View ViewTwo ASSELECT DimTwo.proName as proName, DimTwo.createDate as createDate, DimTwo.createTime as createTime, ViewOne.todayTime as todayTime, ViewOne.todayDate as todayDate, ViewOne.name as name, ViewOne.birth as birth, ViewOne.age as age DimTwo.id as idFROM ViewOne LEFT JOIN DimTwo DimTwo ON ViewOne.id = DimTwo.id and '2020-10-28' = DimTwo.createDate and DimTwo.id > = 2 CREATE TABLE SinkOne (id int, name varchar, age bigint, birth timestamp, todayTime time, todayDate date, createTime time, createDate date, proName varchar) WITH (type = 'kafka11', bootstrapServers =' kudu1:9092', topic = 'tiezhu_out', parallelism =' 1mm, updateMode = 'upsert'); INSERT INTO SinkOneSELECT * FROM ViewTwo

If you need remote debugging, you need to add the remote debugging configuration of Flink in flink-conf.yaml, then configure "JVM Remote" in idea, and break points in the code block (this method can also debug the code of Flink itself)

Env.java.opts.jobmanager:-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005env.java.opts.taskmanager:-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

You only need to modify these two places marked. If it is a HA cluster, you need to modify it according to the log (how to look at the log, how to modify it, and check the information yourself).

At this point, that's all for the task remote submission process.

Local debugging

If you dislike remote debugging, you can try the local debugging of FlinkStreamSQL, the LocalTest module (this module is commented out by default and can be opened by itself if necessary). The method is very simple, modify the corresponding parameters, and then execute RUN.

But [attention! There are most commonly used plug-in modules in the pom file of the LocalTest module, but if there is a similar "ClassNotFoundException", it is likely that there is no corresponding plug-in module in pom. At the same time, it should be noted that because of class conflicts in the Kafka module, there can only be one Kafka module in the LocalTest module.

The above is how to use FlinkStreamSQL correctly. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report