In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
Preface
A classmate is investigating the support of MLSQL Stack convection. And then said that stream debugging is actually very difficult. Through practice, we hope to achieve the following three points:
Can view the latest fixed number of Kafka data debugging results (sink) at any time can be printed on the web console stream program can automatically speculate json schema (now spark is not available)
After implementing these three points, I found that debugging really became much easier.
Process flow
First of all, I built a new kaf_write.mlsql, which makes it easy for me to write data to Kafka:
Set abc=''' {"x": 100,200, "z": 200,200, "dataType": "A group"} {"x": 120," y ": 100,260,260" dataType ":" B group "} {" x ": 120," y": 100,260, "z": 260,260, "dataType": "B group"} {"x": 120," y ": 100,260" z ": "dataType": "B group"} {"x": 120,260z "," dataType ":" B group "} {" x ": 120," y": 100,260,260,260,260,260,260,260,260,260,260,260,260,260,250,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,260,2 "dataType": "B group"} {"x": 120,260z "," dataType ":" B group "} {" x ": 120," y": 100,260, "z": 260,260, "dataType": "B group"} {"x": 120,100,260,260, "dataType": "B group"}'' Load jsonStr.`abc`as table1;select to_json (struct (*)) as value from table1 as table2;save append table2 as kafka.`wow` where kafka.bootstrap.servers= "127.0.0.1 struct 9092"
So that every time I run it, the data can be written to Kafka.
Then, when I'm done, I need to see if the data is really written in and what it looks like:
! kafkaTool sampleData 10 records from "127.0.0.1 wow" wow
This sentence means that I want to sample 10 pieces of Kafka data from Kafka. The address of the Kafka is 127.0.1 Kafka 9092 and the theme is wow. The running results are as follows:
There's no problem. Then I wrote a very simple streaming program:
-- the stream name, should be uniq.set streamName= "streamExample";-- use kafkaTool to infer schema from KafkafkafkaTool registerSchema 2 records from "127.0.0.1 Append 9092" wow;load kafka.`wow` options kafka.bootstrap.servers= "127.0.0.1 Vol 9092" as newkafkatable1;select * from newkafkatable1as table21;-- print in webConsole instead of terminal console.save append table21 as webConsole.``options mode= "Append" and duration= "15" and checkpointLocation= "/ tmp/s-cpl4"
The running results are as follows:
We can also see the real-time effect on the terminal.
Supplement
Of course, there are two special advantages of MLSQL Stack and convection. The first is that you can set the callback of http protocol for convective events, and then use batch SQL to process the results of convection, and finally store them in the library. See the following script:
The stream name, should be uniq.set streamName= "streamExample" -- mock some data.set data=''' {"key": "yes", "value": "no", "topic": "test", "partition": 0, "offset": 0, "timestamp": "2008-01-24 18 no 01.001", "timestampType": 0} {"key": "yes", "value": "no", "topic": "test", "partition": 0, "offset": 1, "timestamp": "2008-01-24 1801-24 181purr 01.002" "timestampType": 0} {"key": "yes", "value": "no", "topic": "test", "partition": 0, "offset": 2, "timestamp": "2008-01-24 1815 01yes 01.003", "timestampType": 0} {"key": "yes", "value": "no", "topic": "test", "partition": 0, "offset": 3, "timestamp": "2008-01-24 1814 01R 01.003" "timestampType": 0} {"key": "yes", "value": "no", "topic": "test", "partition": 0, "offset": 4, "timestamp": "2008-01-24 1815 01yes 01.003", "timestampType": 0} {"key": "yes", "value": "no", "topic": "test", "partition": 0, "offset": 5, "timestamp": "2008-01-24 1814 01R 01.003" "timestampType": 0}'' -- load data as tableload jsonStr.`data`as datasource;-- convert table as stream sourceload mockStream.`datasource` options stepSizeRange= "0-3" as newkafkatable1;-- aggregation select cast (value as string) as k from newkafkatable1as Table21 possible callback post "http://127.0.0.1:9002/api_v1/test" when" started,progress,terminated ";-- output the the result to console.save append table21 as custom.``options mode=" append "and duration=" 15 "and sourceTable=" jack "and code='''select count (*) as c from jack as newjack Save append newjack as parquet.` / tmp/ jack`;''and checkpointLocation= "/ tmp/cpl15"
Summary
The above is the whole content of this article. I hope the content of this article has a certain reference and learning value for everyone's study or work. Thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.