Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Flink sql-clent MATCH_RECOGNIZE kafka example

2025-10-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Environment flink1.7.2

Add the jar in the lib of flink1.7.2, otherwise the class will not be found

Avro-1.8.2.jar flink-connector-kafka-0.10_2.12-1.7.2.jar flink-connector-kafka-base_2.12-1.7.2.jar flink-json-1.7.2.jar kafka-clients-0.11.0.0.jarflink-avro-1.7.2.jar flink-connector-kafka-0.11_2.12-1.7.2.jar flink-core-1 .7.2.jar flink-python_2.12-1.7.2.jar log4j-1.2.17.jarflink-cep_2.12-1.7.2.jar flink-connector-kafka-0.9_2.12-1.7.2.jar flink-dist_2.12-1.7.2.jar flink-table_2.12-1.7.2.jar slf4j-log4j12- 1.7.15.jar modifies the table value in sql-client-defaults.yaml tables:-name: myTable type: source update-mode: append connector: property-version: 1 type: kafka version: 0.11 topic: im-message-topic2 startup-mode: earliest-offset properties:-key: bootstrap.servers value: kafkaip:9092-key: group.id value: testGroup format : property-version: 1 type: json schema: "ROW (sessionId STRING FromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING,msg STRING TimestampSend STRING) "schema:-name: sessionId type: STRING-name: fromUid type: STRING-name: toUid type: STRING-name: chatType type: STRING-name: type type: STRING-name: msgId type: STRING-name: msg type: STRING-name: rowTime type : TIMESTAMP rowtime: timestamps: type: "from-field" from: "timestampSend" watermarks: type: "periodic-bounded" delay: "60"-name: procTime type: TIMESTAMP proctime: true run. / bin/sql-client.sh embedded select * from myTable

Then use MATCH_RECOGNIZE 's sql

SELECT * FROM myTable MATCH_RECOGNIZE (PARTITION BY sessionId ORDER BY rowTime MEASURES e2.procTime as answerTime, LAST (e1.procTime) as customer_event_time, e2.fromUid as empUid, e1.procTime as askTime, 1 as total_talk ONE ROW PER MATCH AFTER MATCH SKIP TO LAST e2 PATTERN (E1 e2) DEFINE E1 as e1.type = 'yonghu', e2 as e2.type =' guanjia')

There is no need to thank the code for using sql-client above, of course, you can also write the code, and here is the corresponding program.

Public static void main (String [] arg) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment (env); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) TableEnv.connect (new Kafka () .version ("0.11") .topic ("im-message-topic3") / / .property ("zookeeper.connect", "") .property ("bootstrap.servers") "kafkaip:9092") .startFromEarliest () .sinkPartitionerRoundRobin () / Flink partition is randomly mapped to the kafka partition) .withFormat (new Json () .failOnMissingField (false) .deriveSchema ()) .withSchema (new Schema () .field ("sessionId") Types.STRING) .from ("sessionId") .field ("fromUid", Types.STRING) .from ("fromUid") .field ("toUid", Types.STRING) .from ("toUid") .field ("chatType", Types.STRING) .from ("chatType") .field ("type") Types.STRING) .from ("type") .field ("msgId", Types.STRING) .from ("msgId") .field ("msg", Types.STRING) .from ("msg") / / .field ("timestampSend", Types.SQL_TIMESTAMP) .field ("rowtime") Types.SQL_TIMESTAMP) .rowtime (new Rowtime () .timestampsFromField ("timestampSend") .watermarksPeripheral Balls (1000)) .field ("proctime", Types.SQL_TIMESTAMP) .proctime () .inAppendMode () .timestampsFromField ("myTable") Table tb2 = tableEnv.sqlQuery ("SELECT" + "answerTime, customer_event_time, empUid, noreply_counts Total_talk "+" FROM myTable "+" + "MATCH_RECOGNIZE (" + "PARTITION BY sessionId" + "ORDER BY rowtime" + "MEASURES" + "e2.rowtime as answerTime" "+" LAST (e1.rowtime) as customer_event_time, "+" e2.fromUid as empUid, "+" 1 as noreply_counts, "+" e1.rowtime as askTime "+" 1 as total_talk "+" ONE ROW PER MATCH "+" AFTER MATCH SKIP TO LAST e2 "+" PATTERN (E1 e2) "+" DEFINE "+ "E1 as e1.type = 'yonghu' "+" e2 as e2.type = 'guanjia' "+") "+") DataStream appendStream = tableEnv.toAppendStream (tb2, Row.class); System.out.println ("schema is:"); tb2.printSchema (); appendStream.writeAsText ("/ usr/local/whk", WriteMode.OVERWRITE); logger.info ("stream end"); Table tb3 = tableEnv.sqlQuery ("select sessionId, type from myTable"); DataStream temp = tableEnv.toAppendStream (tb3, Row.class) Tb3.printSchema (); temp.writeAsText ("/ usr/local/whk2", WriteMode.OVERWRITE); env.execute ("msg test");}

It's done, but there are a lot of holes in it.

Note: if you are using TimeCharacteristic.EventTime, please no longer use procTime.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report