In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-09-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
Environment flink1.7.2
Add the jar in the lib of flink1.7.2, otherwise the class will not be found
Avro-1.8.2.jar flink-connector-kafka-0.10_2.12-1.7.2.jar flink-connector-kafka-base_2.12-1.7.2.jar flink-json-1.7.2.jar kafka-clients-0.11.0.0.jarflink-avro-1.7.2.jar flink-connector-kafka-0.11_2.12-1.7.2.jar flink-core-1 .7.2.jar flink-python_2.12-1.7.2.jar log4j-1.2.17.jarflink-cep_2.12-1.7.2.jar flink-connector-kafka-0.9_2.12-1.7.2.jar flink-dist_2.12-1.7.2.jar flink-table_2.12-1.7.2.jar slf4j-log4j12- 1.7.15.jar modifies the table value in sql-client-defaults.yaml tables:-name: myTable type: source update-mode: append connector: property-version: 1 type: kafka version: 0.11 topic: im-message-topic2 startup-mode: earliest-offset properties:-key: bootstrap.servers value: kafkaip:9092-key: group.id value: testGroup format : property-version: 1 type: json schema: "ROW (sessionId STRING FromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING,msg STRING TimestampSend STRING) "schema:-name: sessionId type: STRING-name: fromUid type: STRING-name: toUid type: STRING-name: chatType type: STRING-name: type type: STRING-name: msgId type: STRING-name: msg type: STRING-name: rowTime type : TIMESTAMP rowtime: timestamps: type: "from-field" from: "timestampSend" watermarks: type: "periodic-bounded" delay: "60"-name: procTime type: TIMESTAMP proctime: true run. / bin/sql-client.sh embedded select * from myTable
Then use MATCH_RECOGNIZE 's sql
SELECT * FROM myTable MATCH_RECOGNIZE (PARTITION BY sessionId ORDER BY rowTime MEASURES e2.procTime as answerTime, LAST (e1.procTime) as customer_event_time, e2.fromUid as empUid, e1.procTime as askTime, 1 as total_talk ONE ROW PER MATCH AFTER MATCH SKIP TO LAST e2 PATTERN (E1 e2) DEFINE E1 as e1.type = 'yonghu', e2 as e2.type =' guanjia')
There is no need to thank the code for using sql-client above, of course, you can also write the code, and here is the corresponding program.
Public static void main (String [] arg) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment (env); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) TableEnv.connect (new Kafka () .version ("0.11") .topic ("im-message-topic3") / / .property ("zookeeper.connect", "") .property ("bootstrap.servers") "kafkaip:9092") .startFromEarliest () .sinkPartitionerRoundRobin () / Flink partition is randomly mapped to the kafka partition) .withFormat (new Json () .failOnMissingField (false) .deriveSchema ()) .withSchema (new Schema () .field ("sessionId") Types.STRING) .from ("sessionId") .field ("fromUid", Types.STRING) .from ("fromUid") .field ("toUid", Types.STRING) .from ("toUid") .field ("chatType", Types.STRING) .from ("chatType") .field ("type") Types.STRING) .from ("type") .field ("msgId", Types.STRING) .from ("msgId") .field ("msg", Types.STRING) .from ("msg") / / .field ("timestampSend", Types.SQL_TIMESTAMP) .field ("rowtime") Types.SQL_TIMESTAMP) .rowtime (new Rowtime () .timestampsFromField ("timestampSend") .watermarksPeripheral Balls (1000)) .field ("proctime", Types.SQL_TIMESTAMP) .proctime () .inAppendMode () .timestampsFromField ("myTable") Table tb2 = tableEnv.sqlQuery ("SELECT" + "answerTime, customer_event_time, empUid, noreply_counts Total_talk "+" FROM myTable "+" + "MATCH_RECOGNIZE (" + "PARTITION BY sessionId" + "ORDER BY rowtime" + "MEASURES" + "e2.rowtime as answerTime" "+" LAST (e1.rowtime) as customer_event_time, "+" e2.fromUid as empUid, "+" 1 as noreply_counts, "+" e1.rowtime as askTime "+" 1 as total_talk "+" ONE ROW PER MATCH "+" AFTER MATCH SKIP TO LAST e2 "+" PATTERN (E1 e2) "+" DEFINE "+ "E1 as e1.type = 'yonghu' "+" e2 as e2.type = 'guanjia' "+") "+") DataStream appendStream = tableEnv.toAppendStream (tb2, Row.class); System.out.println ("schema is:"); tb2.printSchema (); appendStream.writeAsText ("/ usr/local/whk", WriteMode.OVERWRITE); logger.info ("stream end"); Table tb3 = tableEnv.sqlQuery ("select sessionId, type from myTable"); DataStream temp = tableEnv.toAppendStream (tb3, Row.class) Tb3.printSchema (); temp.writeAsText ("/ usr/local/whk2", WriteMode.OVERWRITE); env.execute ("msg test");}
It's done, but there are a lot of holes in it.
Note: if you are using TimeCharacteristic.EventTime, please no longer use procTime.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
The market share of Chrome browser on the desktop has exceeded 70%, and users are complaining about
The world's first 2nm mobile chip: Samsung Exynos 2600 is ready for mass production.According to a r
A US federal judge has ruled that Google can keep its Chrome browser, but it will be prohibited from
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
About us Contact us Product review car news thenatureplanet
More Form oMedia: AutoTimes. Bestcoffee. SL News. Jarebook. Coffee Hunters. Sundaily. Modezone. NNB. Coffee. Game News. FrontStreet. GGAMEN
© 2024 shulou.com SLNews company. All rights reserved.