In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to build FlinkSQL". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let Xiaobian take you to learn "how to build FlinkSQL"!
1. background
Because the company has a lot of internal needs and does not want to write a streaming program every time, it starts to build a flinksql platform based on jdk1.8 and flink1.12.x.
2. effect
Pass a sql file to jar package, then sql in sql file will be executed automatically
3. jar package vs web interface
Investigated web-based zeppline
Zeppline was originally designed for interactive analysis.
zeppline rest api is incompatible with existing monitoring, need to modify existing monitoring code
Although the web interface is user-friendly and a good choice for analysts, it is necessary for developers to develop HA servers for real online long-running applications.
Based on the above 3 points, jar is finally selected as the final method
4. use
Write sql to xxx.sql file, such as
CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;-- ExecutionCheckpointingOptionsset execution.checkpointing.mode=EXACTLY_ONCE;set execution.checkpointing.timeout=30 min;-- 30minset execution.checkpointing.interval=1 min ; -- 1minset execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;-- ExecutionConfigOptionsset table.exec.state.ttl=1 day; -- 1 dayset table.exec.mini-batch.enabled=true; -- enable mini-batch optimizationset table.exec.mini-batch.allow-latency=1 s; -- 1sset table.exec.mini-batch.size=1000;set table.exec.sink.not-null-enforcer=drop;-- -- dadadadadadaCREATE TABLE orders( status int, courier_id bigint, id bigint, finish_time BIGINT)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');-- flink.partition-discovery.interval-millis;CREATE TABLE infos( info_index int, order_id bigint)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');CREATE TABLE redisCache( finishOrders BIGINT, courier_id BIGINT, dayStr String)WITH ( 'connector' = 'redis', 'hostPort'='localhost:6400', 'keyType'='hash', 'keyTemplate'='test2_${courier_id}', 'fieldTemplate'='${dayStr}', 'valueNames'='finishOrders', 'expireTime'='259200');create view temp asselect o.courier_id, (CASE WHEN sum(infosMaxIndex.info_index) is null then 0 else sum(infosMaxIndex.info_index) end) finishOrders, o.status, dayStrfrom ((select courier_id, id, last_value(status) status, MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr from orders where status = 60 group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) oleft join (select max(info_index) info_index, order_id from infos group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_idgroup by o.courier_id, o.status, dayStr;INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;
Package flinksql-platform and upload to server
Put the necessary connector jars into the appropriate directory
Execution, as
flink-1.12.0/bin/flink run -p 3 -yt ./ flinkjar/ -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar -m yarn-cluster -ynm sqlDemo -c io.github.shengjk.Main ./ flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./ xxx.sql
wherein
-C Add udfJar and other third-party jar packages-C parameters apply to the JobGraph generated by the client, and then submit the JobGraph to run
-yt directory Submit third-party jar packages such as udfJar to TaskManager
At this point, I believe that everyone has a deeper understanding of "how to build FlinkSQL", so let's actually operate it! Here is the website, more related content can enter the relevant channels for inquiry, pay attention to us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.