Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize the conversion of Flink data flow in Apache Flink

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article is to share with you about how to achieve Flink data stream conversion in Apache Flink, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

The Operators operation converts one or more DataStream to a new DataStream.

Filter functionScalaobject DataStreamTransformationApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment filterFunction (env) env.execute ("DataStreamTransformationApp")} def filterFunction (env: StreamExecutionEnvironment): Unit = {val data=env.addSource (new CustomNonParallelSourceFunction) data.map (x = > {println ("received:" + x) x}). Filter (_% 2 = = 0). Print (). SetParallelism (1)}}

You can choose any one of the data sources before selecting the data source.

No substantive operation is done in map here. In filter, all numbers are modeled on 2, and the print result is as follows:

Received:1received:22received:3received:44received:5received:66received:7received:88

Explain all the data obtained in map, while filtering is performed in filter.

Java public static void filterFunction (StreamExecutionEnvironment env) {DataStreamSource data = env.addSource (new JavaCustomParallelSourceFunction ()); data.setParallelism (1) .map (new MapFunction () {@ Override public Long map (Long value) throws Exception {System.out.println ("received:" + value); return value }}) .filter (new FilterFunction () {@ Override public boolean filter (Long value) throws Exception {return value% 2filters 0;}}) .print () .filter (1);}

You need to use data.setParallelism (1) before you do the map operation, otherwise it will be output multiple times. Because we are using JavaCustomParallelSourceFunction (), and when we use JavaCustomNonParallelSourceFunction, the default is parallelism 1, which can not be set.

Union FunctionScala def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// filterFunction (env) unionFunction (env) env.execute ("DataStreamTransformationApp")} def unionFunction (env: StreamExecutionEnvironment): Unit = {val data01 = env.addSource (new CustomNonParallelSourceFunction) val data02 = env.addSource (new CustomNonParallelSourceFunction) data01.union (data02). Print (). SetParallelism (1)}

The Union operation combines the two datasets and can be processed together. The above printout is as follows:

11223344Java public static void main (String [] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment (); / / filterFunction (environment); unionFunction (environment); environment.execute ("JavaDataStreamTransformationApp");} public static void unionFunction (StreamExecutionEnvironment env) {DataStreamSource data1 = env.addSource (new JavaCustomNonParallelSourceFunction ()); DataStreamSource data2 = env.addSource (new JavaCustomNonParallelSourceFunction ()); data1.union (data2). Print (). SetParallelism (1) } Split Select FunctionScala

Split can split a stream into multiple streams, and select can choose which streams to process from multiple streams.

Def splitSelectFunction (env: StreamExecutionEnvironment): Unit = {val data = env.addSource (new CustomNonParallelSourceFunction) val split = data.split (new OutputSelector [Long] {override def select (value: Long): lang.Iterable [String] = {val list = new util.ArrayList [String] () if (value% 2 = = 0) {list.add ("even")} else {list.add ("odd") ")} list}}) split.select (" odd " "even". Print (). SetParallelism (1)}

You can process the data based on the name you choose.

Javapublic static void splitSelectFunction (StreamExecutionEnvironment env) {DataStreamSource data = env.addSource (new JavaCustomNonParallelSourceFunction ()); SplitStream split = data.split (new OutputSelector () {@ Override public Iterable select (Long value) {List output = new ArrayList (); if (value% 2 = = 0) {output.add ("odd") } else {output.add ("even");} return output;}}); split.select ("odd") .print () .setParallelism (1) } the above is how to achieve Flink data stream conversion in Apache Flink. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report