Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to build FlinkSQL

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "how to build FlinkSQL". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let Xiaobian take you to learn "how to build FlinkSQL"!

1. background

Because the company has a lot of internal needs and does not want to write a streaming program every time, it starts to build a flinksql platform based on jdk1.8 and flink1.12.x.

2. effect

Pass a sql file to jar package, then sql in sql file will be executed automatically

3. jar package vs web interface

Investigated web-based zeppline

Zeppline was originally designed for interactive analysis.

zeppline rest api is incompatible with existing monitoring, need to modify existing monitoring code

Although the web interface is user-friendly and a good choice for analysts, it is necessary for developers to develop HA servers for real online long-running applications.

Based on the above 3 points, jar is finally selected as the final method

4. use

Write sql to xxx.sql file, such as

CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;-- ExecutionCheckpointingOptionsset execution.checkpointing.mode=EXACTLY_ONCE;set execution.checkpointing.timeout=30 min;-- 30minset execution.checkpointing.interval=1 min ; -- 1minset execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;-- ExecutionConfigOptionsset table.exec.state.ttl=1 day; -- 1 dayset table.exec.mini-batch.enabled=true; -- enable mini-batch optimizationset table.exec.mini-batch.allow-latency=1 s; -- 1sset table.exec.mini-batch.size=1000;set table.exec.sink.not-null-enforcer=drop;-- -- dadadadadadaCREATE TABLE orders( status int, courier_id bigint, id bigint, finish_time BIGINT)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');-- flink.partition-discovery.interval-millis;CREATE TABLE infos( info_index int, order_id bigint)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');CREATE TABLE redisCache( finishOrders BIGINT, courier_id BIGINT, dayStr String)WITH ( 'connector' = 'redis', 'hostPort'='localhost:6400', 'keyType'='hash', 'keyTemplate'='test2_${courier_id}', 'fieldTemplate'='${dayStr}', 'valueNames'='finishOrders', 'expireTime'='259200');create view temp asselect o.courier_id, (CASE WHEN sum(infosMaxIndex.info_index) is null then 0 else sum(infosMaxIndex.info_index) end) finishOrders, o.status, dayStrfrom ((select courier_id, id, last_value(status) status, MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr from orders where status = 60 group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) oleft join (select max(info_index) info_index, order_id from infos group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_idgroup by o.courier_id, o.status, dayStr;INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;

Package flinksql-platform and upload to server

Put the necessary connector jars into the appropriate directory

Execution, as

flink-1.12.0/bin/flink run -p 3 -yt ./ flinkjar/ -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar -m yarn-cluster -ynm sqlDemo -c io.github.shengjk.Main ./ flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./ xxx.sql

wherein

-C Add udfJar and other third-party jar packages-C parameters apply to the JobGraph generated by the client, and then submit the JobGraph to run

-yt directory Submit third-party jar packages such as udfJar to TaskManager

At this point, I believe that everyone has a deeper understanding of "how to build FlinkSQL", so let's actually operate it! Here is the website, more related content can enter the relevant channels for inquiry, pay attention to us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report