Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is Flink data flow programming in ApacheFlink

2025-02-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

In this issue, the editor will bring you about how Flink data flow programming in ApacheFlink is. The article is rich in content and analyzes and describes for you from a professional point of view. I hope you can get something after reading this article.

Data sources can be created through StreamExecutionEnvironment.addSource (sourceFunction), and Flink also provides some built-in data sources for ease of use, such as readTextFile (path) readFile (), and of course, you can write a custom data source (you can implement the SourceFunction method, but not in parallel. Or implement an interface ParallelSourceFunction that can be implemented in parallel or inherit RichParallelSourceFunction)

Introduction

Start with a simple introduction and set up a DataStreamSourceApp

Scalaobject DataStreamSourceApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment socketFunction (env) env.execute ("DataStreamSourceApp")} def socketFunction (env: StreamExecutionEnvironment): Unit = {val data=env.socketTextStream ("192.168.152.45", 9999) data.print ()}}

This method will read data from socket, so we need to start the service in 192.168.152.45:

Nc-lk 9999

Then run DataStreamSourceApp and enter on the server:

Iie4bu@swarm-manager:~$ nc-lk 9999apacheflinkspark

It will also output in the console:

3 > apache4 > flink1 > spark

The preceding 341 represents the degree of parallelism. You can do this by setting setParallelism:

Data.print () .setParallelism (1) Javapublic class JavaDataStreamSourceApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment (); socketFunction (environment); environment.execute ("JavaDataStreamSourceApp");} public static void socketFunction (StreamExecutionEnvironment executionEnvironment) {DataStreamSource data = executionEnvironment.socketTextStream ("192.168.152.45", 9999); data.print () .setParallelism (1) }} customize the way to add data sources Scala to implement SourceFunction interface

This approach cannot be processed in parallel.

Create a new custom data source

Class CustomNonParallelSourceFunction extends SourceFunction [Long] {var count=1L var isRunning = true override def run (ctx: SourceFunction.SourceContext [Long]): Unit = {while (isRunning) {ctx.collect (count) count+=1 Thread.sleep (1000)}} override def cancel (): Unit = {isRunning = false}}

This method first defines an initial value count=1L, and then executes the run method, which mainly outputs the count and performs the add-one operation, which ends when the cancel method is executed. The calling method is as follows:

Def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment / / socketFunction (env) nonParallelSourceFunction (env) env.execute ("DataStreamSourceApp")} def nonParallelSourceFunction (env: StreamExecutionEnvironment): Unit = {val data=env.addSource (new CustomNonParallelSourceFunction () data.print ()}

The output is that the console outputs the count value all the time.

The degree of parallelism cannot be set unless it is set to 1.

Val data=env.addSource (new CustomNonParallelSourceFunction ()) .setParallelism (3)

Then the console reports an error:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism (DataStreamSource.java:55) at com.vincent.course05.DataStreamSourceApp$.nonParallelSourceFunction (DataStreamSourceApp.scala:16) at com.vincent.course05.DataStreamSourceApp$.main (DataStreamSourceApp.scala:11) at com.vincent.course05.DataStreamSourceApp.main (DataStreamSourceApp.scala) inherits ParallelSourceFunction method import org.apache.flink.streaming.api.functions.source. {ParallelSourceFunction SourceFunction} class CustomParallelSourceFunction extends ParallelSourceFunction [Long] {var isRunning= true var count = 1L override def run (ctx: SourceFunction.SourceContext [Long]): Unit = {while (isRunning) {ctx.collect (count) count+=1 Thread.sleep (1000)}} override def cancel (): Unit = {isRunning=false}}

Method has the same function as above. The main method is as follows:

Def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment / / socketFunction (env) / / nonParallelSourceFunction (env) parallelSourceFunction (env) env.execute ("DataStreamSourceApp")} def parallelSourceFunction (env: StreamExecutionEnvironment): Unit = {val data=env.addSource (new CustomParallelSourceFunction ()) .setParallelism (3) data.print ()}

You can set the degree of parallelism 3, and the output is as follows:

2 > 11 > 12 > 12 > 23 > 23 > 23 > 34 > 34 > 3 inherit the RichParallelSourceFunction method class CustomRichParallelSourceFunction extends RichParallelSourceFunction [Long] {var isRunning = true var count = 1L override def run (ctx: SourceFunction.SourceContext [Long]): Unit = {while (isRunning) {ctx.collect (count) count + = 1 Thread.sleep (1000)} override def cancel (): Unit = {isRunning = false}} def main (args: Array [String]): Unit = { Val env = StreamExecutionEnvironment.getExecutionEnvironment / / socketFunction (env) / / nonParallelSourceFunction (env) / / parallelSourceFunction (env) richParallelSourceFunction (env) env.execute ("DataStreamSourceApp")} def richParallelSourceFunction (env: StreamExecutionEnvironment): Unit = {val data = env.addSource (new CustomRichParallelSourceFunction ()) .setParallelism (3) data.print ()} Java implements SourceFunction interface import org.apache.flink.streaming.api.functions.source.SourceFunction Public class JavaCustomNonParallelSourceFunction implements SourceFunction {boolean isRunning= true; long count = 1; @ Override public void run (SourceFunction.SourceContext ctx) throws Exception {while (isRunning) {ctx.collect (count); count+=1; Thread.sleep (1000);}} @ Override public void cancel () {isRunning=false } public static void main (String [] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment (); / / socketFunction (environment); nonParallelSourceFunction (environment); environment.execute ("JavaDataStreamSourceApp");} public static void nonParallelSourceFunction (StreamExecutionEnvironment executionEnvironment) {DataStreamSource data = executionEnvironment.addSource (new JavaCustomNonParallelSourceFunction ()); data.print (). SetParallelism (1);}

When setting parallelism:

DataStreamSource data = executionEnvironment.addSource (new JavaCustomNonParallelSourceFunction ()) .setParallelism (2)

Then report an error exception:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism (DataStreamSource.java:55) at com.vincent.course05.JavaDataStreamSourceApp.nonParallelSourceFunction (JavaDataStreamSourceApp.java:16) at com.vincent.course05.JavaDataStreamSourceApp.main (JavaDataStreamSourceApp.java:10) implements ParallelSourceFunction interface import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;public class JavaCustomParallelSourceFunction implements ParallelSourceFunction {boolean isRunning = true; long count = 1 @ Override public void run (SourceContext ctx) throws Exception {while (isRunning) {ctx.collect (count); count+=1; Thread.sleep (1000);} @ Override public void cancel () {isRunning=false;}} public static void main (String [] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment (); / / socketFunction (environment) / / nonParallelSourceFunction (environment); parallelSourceFunction (environment); environment.execute ("JavaDataStreamSourceApp");} public static void parallelSourceFunction (StreamExecutionEnvironment executionEnvironment) {DataStreamSource data = executionEnvironment.addSource (new JavaCustomParallelSourceFunction ()). SetParallelism (2); data.print (). SetParallelism (1);}

You can set the degree of parallelism and output the result:

1122334455 inherits the abstract class RichParallelSourceFunctionpublic class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction {boolean isRunning= true; long count = 1; @ Override public void run (SourceContext ctx) throws Exception {while (isRunning) {ctx.collect (count); count+=1; Thread.sleep (1000);}} @ Override public void cancel () {isRunning=false } public static void main (String [] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment (); / / socketFunction (environment); / / nonParallelSourceFunction (environment); / / parallelSourceFunction (environment); richpParallelSourceFunction (environment); environment.execute ("JavaDataStreamSourceApp");} public static void richpParallelSourceFunction (StreamExecutionEnvironment executionEnvironment) {DataStreamSource data = executionEnvironment.addSource (new JavaCustomRichParallelSourceFunction ()) .setParallelism (2) Data.print (). SetParallelism (1);} output result: the relationship between 112233445566SourceFunction ParallelSourceFunction RichParallelSourceFunction classes above is what Flink data flow programming in ApacheFlink is like. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report