Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize SideOutputSplit shunt of Flink

2025-01-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "how to achieve SideOutputSplit diversion of Flink". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to achieve SideOutputSplit diversion of Flink".

Version Notes:

Environment: Windiws

Scala: 2.11.8

Flink: 1.10.1

The output of most DataStream API operators is a single output, that is, a stream of some data type.

In addition to the split operator, a stream can be divided into multiple streams, all of which have the same data type.

Process function's side outputs feature can generate multiple streams (recommended after Flink version 1.9), and these streams can have different data types. A side output can be defined as an OutputTag [X] object, where X is the data type of the output stream. Process function can emit an event to one or more side outputs through the Context object.

Import org.apache.flink.streaming.api.scala. {DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.util.Collector/** @ param deviceNo device number * @ param timestamp timestamp * @ param temperature temperature * / case class SensorReading (deviceNo: String, timestamp: Long Temperature: Double) object SensorReadingSplitStreaming {def main (args: Array [String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment / / set time semantic time env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) env.setParallelism (1) val socketSource: DataStream [String] = env.readTextFile ("D:\\ tmp\\ file1.txt") val mapStream: DataStream [SensorReading] = socketSource .map (data = > {val split: Array [String] = data.split (" ") SensorReading (split (0). Trim, split (1). Trim.toLong, split (2) .trim.toDouble)}) / / A pair of data streams are split val tmpStageStream: DataStream [SensorReading] = mapStream.process (new TempStageProcess ()) tmpStageStream.print (" main ") Val lowStream: DataStream [(String, Double)] = tmpStageStream.getSideOutput (new OutputTag [(String, Double)] ("low-tmp")) val highStream: DataStream [(String, Double)] = tmpStageStream.getSideOutput (new OutputTag [(String, Double)] ("high-tmp") lowStream.print ("low") highStream.print ("high") env.execute ()} class TempStageProcess () extends ProcessFunction [SensorReading SensorReading] {/ / define side output stream lazy val lowTmp: OutputTag [(String, Double)] = new OutputTag [(String, Double)] ("low-tmp") Lazy val HighTmp: OutputTag [(String, Double)] = new OutputTag [(String, Double)] ("high-tmp"); / / data processing override def processElement (value: SensorReading, context: ProcessFunction [SensorReading, SensorReading] # Context, collector: Collector [SensorReading]): Unit = {if (value.temperature)

< 10) { context.output(lowTmp, (value.deviceNo, value.temperature)) } else if (value.temperature >

70) {context.output (HighTmp, (value.deviceNo) (value.temperature)} else {collector.collect (value)} / / the contents of the test file are as follows: ↓ / * equipment 8Reg 1610035289736, 54.3 device 5jis 1610035371758, 53.8 device 50.88, 1610035458637, 70.2, 50.2, 1610035543127, 10.2, 51.6, 161003570530227, 51.6, equipment 5161003578738712.9, equipment 71610035877019,88.2, 161003596053, 33.5, equipment 7161003604304040, 63.0, 63.0, 1610036125179364. .5设备6,1610036214972,30.2设备5,1610036296542,56.5设备7,1610036377999,29.7设备6,1610036467523,59.4设备4,1610036557446,71.1设备5,1610036641100,28.2设备2,1610036725803,88.8设备8,1610036808041,73.5设备1,1610036897060,18.0设备7,1610036980127,14.9设备2,1610037069523,47.4设备4,1610037154507,59.5设备5,1610037235099,35.0设备6,1610037317868,76.4设备2,1610037403367,10.0设备2,1610037484177,18.5设备4,1610037571384,98.7设备5,1610037653666,95.6设备6,1610037735520,32.6设备6,1610037823906,83.3设备3,1610037913756,29.1设备7,1610037994980,74.6设备6,1610038081606,22.2设备3,1610038163043,10 .4设备5,1610038244717,56.9设备3,1610038326227,64.8设备4,1610038411053,65.0设备8,1610038500538,93.2设备8,1610038583924,76.2设备1,1610038670150,42.1设备5,1610038756839,35.1设备3,1610038840180,75.9设备3,1610038929751,83.4设备7,1610039019422,24.1设备3,1610039101778,85.0设备8,1610039183077,45.6设备3,1610039264498,79.5设备1,1610039351600,44.4设备8,1610039434187,73.3设备3,1610039518048,77.9设备7,1610039598556,9.79设备4,1610039679144,19.0设备2,1610039761967,56.1设备3,1610039847823,88.2设备6,1610039933024,77.4设备7,1610040014212,14.4设备4,1610040101627,98.2设备8,1610040182379,85 .0设备6,1610040265210,61.8设备2,1610040345769,48.0设备3,1610040432855,19.9设备4,1610040515943,30.9设备4,1610040601373,51.7设备1,1610040681803,29.7设备8,1610040770779,31.6设备3,1610040851986,67.1设备4,1610040941421,93.2设备7,1610041022836,37.2设备8,1610041105401,84.6设备6,1610041189301,19.2设备4,1610041270735,99.0设备4,1610041354109,77.0设备5,1610041435113,49.7设备1,1610041521773,74.2设备8,1610041603035,42.2设备3,1610041687230,87.1设备1,1610041767985,82.7设备3,1610041848130,0.59设备4,1610041933021,7.38设备2,1610042016080,28.9设备2,1610042103229,99.2设备2,1610042190222,42 .2 equipment 3mem1610042277841Nie12.0 equipment 71610042364076 device 73.5 device 710042444652 10.5 device 41610042530461 68.5 device 1mem1610042615421head 78.2 equipment 3mei 161004278747879 64.8 equipment 51610042874301line 6.34 equipment 21610042956073335.6 equipment 81610043038793310.6 equipment 816100431229711030.3 device 716100432038101017.5 device 88paper 16100432915610833.8 equipment 516100433431880.30.5 equipment 21610043445610744.7 equipment 16364364374374 / 17497 /

Output result:

SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/D:/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jarbank SLF4J] SLF4J: Found binding in [jarrigslf4jimplexStaticLoggerBinder.class]: Found binding in [jarbank filebank] .m2Universe repositorybank qosbacklog class classiclly 1.2.0lgslf4jandandStaticLoggerBinder .class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 17 See 19group 42659 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation-No hostname could be resolved for the IP address 127.0.0.1 Using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.17:19:42725 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation-No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.17:19:43088 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils-Log file environment variable 'log.file' is not set.17:19:43089 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils-JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key' Key: 'web.log.path', default: null (fallback keys: [{key=jobmanager.web.log.path IsDeprecated=true}]) '.high > (equipment 1610035371758) main > SensorReading (equipment 5meme 1610035371758) main > SensorReading (equipment 1610035458637) main > SensorReading (equipment 1610035543127) main > SensorReading (equipment 1610035623427) main > SensorReading (equipment 1610035705302) main > SensorReading (equipment 161003578738712.9) high > SensorReading (equipment 6161003560553) main > SensorReading (equipment 6161003604304033.0) main > SensorReading (equipment 5161003612517964.5) main > SensorReading equipment (16100362149230.2 main SensorReading) > 510036294566.5) ) main > SensorReading (equipment 1610036377999) main > SensorReading (equipment 1610036467523) high > (equipment 4pm 71.1) main > SensorReading (equipment 161003664110028.2) high > (equipment 2mei 88.8) high > (equipment 161003689706018.0) main > SensorReading (equipment 161003698012714.9) main > SensorReading (equipment 161003706952347.4) main > SensorReading (equipment 51610037509955.0) high > main (676.4) > SensorReading equipment (216100374033710.0) main SensorReading > equipment (21003748717718) .5) high > (equipment 1610037735520) high > (equipment 6mei 83.3) main > SensorReading (equipment 1610037913756) high > (equipment 74.6) main > SensorReading (equipment 161003808160622.2) main > SensorReading (equipment 161003816304 10.4) main > SensorReading (equipment 51610038244717author66.9) main > SensorReading (equipment 1610038326227) main > SensorReading (equipment 16100384104105pm 65.0) high > (equipment 163841033.2) high (876.2) > main SensorReading (1161003867042) SensorReading (1161003867042) . 1) main > SensorReading (equipment 16100387568399) high > (equipment 3meme 75.9) high > (equipment 33.43.4) main > SensorReading (equipment 710039019422) high > (equipment 3Power85.0) main > SensorReading (equipment 161003918307pm 45.6) high > (equipment 3mei 79.5) main > SensorReading (equipment 1610039351600 44.4) high > (equipment 73.3) high > (equipment 377.9) low > (equipment 77.79) main > SensorReading (equipment 1610039679419.0) main > SensorReading (equipment 16100397619666.1) high > (equipment 388.2) > ) high > (equipment 1610040014212) high > high > (equipment 85.0) main > SensorReading (equipment 1610040265210) main > SensorReading (equipment 1610040345769) main > SensorReading (equipment 161004043285) main > SensorReading (equipment 161004051594 30.9) main > SensorReading > SensorReading (equipment 161004068188329.7) > SensorReading > main SensorReading (equipment 316100407709ml 31.6) > main SensorReading (equipment 31610040458519867.1 high > equipment (43.2) main SensorReading equipment (710040628337) > SensorReading (equipment 1610040601373j 51.7) > main > SensorReading (equipment 16100407709ml 31.6) > main SensorReading (31610040407709hand31.6) > main SensorReading (31610040408519867.1 high) > equipment (4143.2 main SensorReading) > equipment (710040,10283337) .2) high > (equipment 84.6) main > SensorReading (equipment 1610041189301) high > (device 4) high > (device 4) main > SensorReading (equipment 1610041435113 49.7) high > (equipment 1m74.2) main > SensorReading (equipment 1610041603035 42.2) high > (equipment 3m87.1) high > (equipment 82.7) low > (equipment 3grin 0.59) low > (equipment 40.38) main > SensorReading (equipment 21410042016080) high > (equipment 29999) main > SensorReading (equipment 1610041902222) Main > SensorReading (equipment 3mei 1610042277841) high > (equipment 7meme 93.5) main > SensorReading (equipment 1610042444652) main > SensorReading (equipment 1610042530461) high > (equipment 1mei 78.2) main > SensorReading (equipment 3mei 1610042702219mei 18.5) main > SensorReading (equipment 6mei 1610042787778) low > (equipment 5mai 6.34) main > SensorReading (equipment 161004295607356) main > SensorReading (equipment 16100410043079310 10.6) > main SensorReading (81610043122971017.5) > main SensorReading (7161004320381017.5) > equipment (883.8) main SensorReading > equipment (5100433433718830). 5) high > (equipment 2p84.7) main > SensorReading (equipment 1610043545998 Process finished with exit code 53.4) high > (equipment 3Power97.4) Thank you for your reading The above is the content of "how to achieve SideOutputSplit diversion of Flink". After the study of this article, I believe you have a deeper understanding of how to achieve SideOutputSplit diversion of Flink, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report