Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the ways to use Flink technology?

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the relevant knowledge of "what is the use of Flink technology?". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

First, pull the sample code of Flink.

Mvn archetype:generate\-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java\-DarchetypeVersion=1.7.2\-DarchetypeCatalog=local

Implement batch processing to read from a file

Create a hello.txt with the following file content

Hello world welcome

Hello welcome

Statistical word frequency

Public class BatchJavaApp {public static void main (String [] args) throws Exception {String input = "/ Users/admin/Downloads/flink/data/hello.txt"; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); DataSource text = env.readTextFile (input); text.print () Text.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector collector) throws Exception {String [] tokens = value.toLowerCase () .split (""); for (String token: tokens) {collector.collect (new Tuple2 (token,1)) }) .groupBy (0) .sum (1) .print ();}}

Run result (log omitted)

Hello welcomehello world welcome (world,1) (hello,2) (welcome,2)

Pure Java implementation

File read class

Public class FileOperation {/ * reads the contents of the file name filename and puts all the words contained in it into words * @ param filename * @ param words * @ return * / public static boolean readFile (String filename, List words) {if (filename = = null | | words = = null) {System.out.println ("filename is empty or words is empty"); return false } Scanner scanner; try {File file = new File (filename); if (file.exists ()) {FileInputStream fis = new FileInputStream (file); scanner = new Scanner (new BufferedInputStream (fis), "UTF-8"); scanner.useLocale (Locale.ENGLISH);} else {return false }} catch (FileNotFoundException e) {System.out.println ("cannot be opened" + filename); return false;} / / simple participle if (scanner.hasNextLine ()) {String contents = scanner.useDelimiter ("\ A"). Next (); int start = firstCharacterIndex (contents,0); for (int I = start + 1) I {if (map.containsKey (token)) {map.put (token,map.get (token) + 1);} else {map.put (token,1);}}); map.entrySet (). Stream (). Map (entry-> new Tuple2 (entry.getKey (), entry.getValue () .forEach (System.out::println);}}

Running result

[hello, world, welcome, hello, welcome] (world,1) (hello,2) (welcome,2)

Scala code

Pull the Scala sample code

Mvn archetype:generate\-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-scala\-DarchetypeVersion=1.7.2\-DarchetypeCatalog=local

After installing the Scala plug-in and configuring Scala SDK

Import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object BatchScalaApp {def main (args: Array [String]): Unit = {val input = "/ Users/admin/Downloads/flink/data/hello.txt" val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile (input) text.flatMap (_ .toLowerCase.split (")) .filter (_ .nonEmpty) .map ((_ 1)) .groupBy (0) .sum (1) .print ()}

Run result (omit log)

(world,1) (hello,2) (welcome,2)

For the basic content of Scala, please refer to the introduction to Scala, the object-oriented introduction to Scala.

Streaming of transmission from the network

Public class StreamingJavaApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource text = env.socketTextStream ("127.0.0.1", 9999); text.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector collector) throws Exception {String [] tokens = value.toLowerCase () .split ("") For (String token: tokens) {collector.collect (new Tuple2 (token,1));}) .keyby (0) .timeWindow (Time.seconds (5)) .sum (1) .print (); env.execute ("StreamingJavaApp");}}

Open the port before running

Nc-lk 9999

Run the code and enter an a c d b c e e f an in the nc command

Run result (log omitted)

1 > (ePermian 2) 9 > (dmagin 1) 11 > (aPermian 3) 3 > (bmemorie 1) 4 > (fmine1) 8 > (cPhone2)

Scala code

Import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.api.scala._object StreamScalaApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream ("127.0.0.1", 9999) text.flatMap (_ .split (")) .map ((_ ) .keyby (0) .timeWindow (Time.seconds (5)) .sum (1) .print () .setParallelism (1) env.execute ("StreamScalaApp")}}

Run result (omit log)

(C) (c) (b) (b) (d) (1) (f) (e) (2) (a)

Now let's change the tuple to an entity class

Public class StreamObjJavaApp {@ AllArgsConstructor @ Data @ ToString @ NoArgsConstructor public static class WordCount {private String word; private int count;} public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource text = env.socketTextStream ("127.0.0.1", 9999) Text.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector collector) throws Exception {String [] tokens = value.toLowerCase () .split (""); for (String token: tokens) {collector.collect (new WordCount (token,1)) ) .keyBy ("word") .timeWindow (Time.seconds (5)) .sum ("count") .print (); env.execute ("StreamingJavaApp");}}

Running result

4 > StreamObjJavaApp.WordCount (word=f, count=1) 11 > StreamObjJavaApp.WordCount (word=a, count=3) 8 > StreamObjJavaApp.WordCount (word=c, count=2) 1 > StreamObjJavaApp.WordCount (word=e, count=2) 9 > StreamObjJavaApp.WordCount (word=d, count=1) 3 > StreamObjJavaApp.WordCount (word=b, count=1)

Of course, we can write that, too.

Public class StreamObjJavaApp {@ AllArgsConstructor @ Data @ ToString @ NoArgsConstructor public static class WordCount {private String word; private int count;} public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource text = env.socketTextStream ("127.0.0.1", 9999) Text.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector collector) throws Exception {String [] tokens = value.toLowerCase () .split (""); for (String token: tokens) {collector.collect (new WordCount (token,1)) ) .keyby (WordCount::getWord) .timeWindow (Time.seconds (5)) .sum ("count"). Print () .setParallelism (1); env.execute ("StreamingJavaApp");}}

Inside keyBy is a functional interface KeySelector.

@ Public@FunctionalInterfacepublic interface KeySelector extends Function, Serializable {KEY getKey (IN value) throws Exception;}

The lambda expression writing method of flatMap is more tedious and is not as good as anonymous class writing.

Public class StreamObjJavaApp {@ AllArgsConstructor @ Data @ ToString @ NoArgsConstructor public static class WordCount {private String word; private int count;} public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource text = env.socketTextStream ("127.0.0.1", 9999) Text.flatMap ((FlatMapFunction) (value,collector)-> {String [] tokens = value.toLowerCase () .split (""); for (String token: tokens) {collector.collect (new WordCount (token,1)) }) .returns (WordCount.class) .keyby (WordCount::getWord) .timeWindow (Time.seconds (5)) .sum ("count"). Print () .setParallelism (1); env.execute ("StreamingJavaApp");}}

FlatMap can also use RichFlatMapFunction abstract classes

Public class StreamObjJavaApp {@ AllArgsConstructor @ Data @ ToString @ NoArgsConstructor public static class WordCount {private String word; private int count;} public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource text = env.socketTextStream ("127.0.0.1", 9999) Text.flatMap (new RichFlatMapFunction () {@ Override public void flatMap (String value, Collector collector) throws Exception {String [] tokens = value.toLowerCase () .split (""); for (String token: tokens) {collector.collect (new WordCount (token,1)) ) .keyby (WordCount::getWord) .timeWindow (Time.seconds (5)) .sum ("count"). Print () .setParallelism (1); env.execute ("StreamingJavaApp");}}

Scala code

Import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.api.scala._object StreamObjScalaApp {case class WordCount (word: String,count: Int) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream ("127.0.0.1", 9999) text.flatMap (_ .split (")) .map (WordCount (_ ) .keyby ("word") .timeWindow (Time.seconds (5)) .sum ("count") .print () .setParallelism (1) env.execute ("StreamScalaApp")}}

Run result (omit log)

WordCount (bmeme1) WordCount (dmagin1) WordCount (eMagne2) WordCount (fmagin1) WordCount (aMagne3) WordCount (cMagne2)

Data source

Get data from a collection

Public class DataSetDataSourceApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); fromCollection (env);} public static void fromCollection (ExecutionEnvironment env) throws Exception {List list = new ArrayList (); for (int I = 1) I 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 3, 4, 5, 6, 7, 8, 9, 10) data.map (_ + 1). Print ()}}

Run result (omit log)

67891011

MapPartition operator

Partition the result according to the degree of parallelism

A tool class that simulates a database connection

Public class DBUntils {public static int getConnection () {return new Random (). NextInt (10);} public static void returnConnection (int connection) {} public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / / mapFunction (env); / / filterFunction (env); mapPartitionFunction (env);} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList () For (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit the log, the period indicates that there are still many numbers on it, and there are a total of 100 above the horizontal line)

. 5403-5503

Scala code

Import scala.util.Randomobject DBUntils {def getConnection (): Int = {new Random () .nextInt (10)} def returnConnection (connection: Int): Unit = {}} import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// MapFunction (env) / / filterFunction (env) mapPartitionFunction (env)} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new Listbuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Running result

. 5403-5503

First operator

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); firstFunction (env) } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.first (3). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(1m Hadoop) (1m Spark) (1m Flink)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) firstFunction (env)} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1, "Hadoop") (1, "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4 "VUE") val data = env.fromCollection (info) data.first (3). Print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}). Print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(1m Hadoop) (1m Spark) (1m Flink)

Take the first two items in groups

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); firstFunction (env) } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(3menLinux) (1menHadoop) (1magical Spark) (4meme vue) (2menJava) (2meme Spring boot)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) firstFunction (env)} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1, "Hadoop") (1, "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4 "VUE") val data = env.fromCollection (info) data.groupBy (0) .first (2). Print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(3menLinux) (1menHadoop) (1magical Spark) (4meme vue) (2menJava) (2meme Spring boot)

After grouping, take the first two items in ascending order

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); firstFunction (env) } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.ASCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(3mai Linux) (1m Flink) (1m Hadoop) (4m vue) (2m Java) (2m Spring boot)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) firstFunction (env)} Def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1 "Hadoop"), (1, "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4, "VUE")) val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.ASCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(3mai Linux) (1m Flink) (1m Hadoop) (4m vue) (2m Java) (2m Spring boot)

After grouping, take the first two items in descending order

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); firstFunction (env) } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(3menLinux) (1meme Spark) (1meme Hadoop) (4meme vue) (2mending Spring boot) (2meme Java)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) firstFunction (env)} Def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1 "Hadoop"), (1, "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4, "VUE")) val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(3menLinux) (1meme Spark) (1meme Hadoop) (4meme vue) (2mending Spring boot) (2meme Java)

FlatMap operator

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); / / firstFunction (env); flatMapFunction (env);} public static void flatMapFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink") DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (","); Stream.of (tokens) .forEach (collector::collect);}) .returns (String.class) .print () } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

Hadoopsparkhadoopflinkflinkflink

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) / / firstFunction (env) FlatMapFunction (env)} def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop Spark "," hadoop,flink "," flink,flink ") val data = env.fromCollection (info) data.flatMap (_ .split (", "). Print ()} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1," Hadoop "), (1," Spark "), (1," Flink "), (2," Java "), (2," Spring boot "), (3," Linux "), (4) "VUE") val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

Hadoopsparkhadoopflinkflinkflink

Of course, it also supports the same writing method as Java.

Def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink,flink") val data = env.fromCollection (info) data.flatMap ((value,collector: Collector [String]) = > {val tokens = value.split (",") tokens.foreach (collector.collect (_)}). Print ()}

Count the number of words

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); / / firstFunction (env); flatMapFunction (env);} public static void flatMapFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink") DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (","); Stream.of (tokens) .forEach (collector::collect) ) .returns (String.class) .map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {return new Tuple2 (value,1);}}) .groupBy (0) .sum (1) .map () } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(hadoop,2) (flink,3) (spark,1)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) / / firstFunction (env) FlatMapFunction (env)} def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop Spark "," hadoop,flink "," flink,flink ") val data = env.fromCollection (info) data.flatMap (_ split (", "). Map ((_, 1)) .groupBy (0) .sum (1). Print ()} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1," Hadoop "), (1," Spark "), (1," Flink "), (2," Java "), (2," Spring boot "), (3) "Linux"), (4, "VUE") val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(hadoop,2) (flink,3) (spark,1)

Distinct operator

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); / / firstFunction (env); / / flatMapFunction (env); distinctFunction (env) } public static void distinctFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (","); Stream.of (tokens) .forEach (collector::collect) ) .returns (String.class) .returns () .print ();} public static void flatMapFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (",") Stream.of (tokens) .forEach (collector::collect);}) .returns (String.class) .map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {return new Tuple2 (value,1);}}) .groupBy (0) .sum (1) .map () } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

Hadoopflinkspark

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) / / firstFunction (env) / / flatMapFunction (env) distinctFunction (env)} def distinctFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop Spark "," hadoop,flink "," flink,flink ") val data = env.fromCollection (info) data.flatMap (_ .split (", "). Distinct (). Print ()} def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List (" hadoop,spark "," hadoop,flink "," flink,flink ") val data = env.fromCollection (info) data.flatMap (_ .split (", ")). Map ((_ ) .groupBy (0) .sum (1). Print ()} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1, "Hadoop"), (1, "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4, "VUE") val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

Hadoopflinkspark

Join operator

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); / / firstFunction (env); / / flatMapFunction (env); / / distinctFunction (env); joinFunction (env) } public static void joinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitive Brother"), new Tuple2 (2, "Brother J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue") List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (4, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.join (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {return new Tuple3 (first.getField (0), first.getField (1), second.getField (1));}) .print () } public static void distinctFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (","); Stream.of (tokens) .forEach (collector::collect) ) .returns (String.class) .returns () .print ();} public static void flatMapFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (",") Stream.of (tokens) .forEach (collector::collect);}) .returns (String.class) .map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {return new Tuple2 (value,1);}}) .groupBy (0) .sum (1) .map () } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(3, Little Leader, Chengdu) (1pm PK Brother, Beijing) (4, Sky Blue, Hangzhou) (2Perry J Brother, Shanghai)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) / / firstFunction (env) / / flatMapFunction (env) / / distinctFunction (env) joinFunction (env)} def joinFunction (env: ExecutionEnvironment): Unit = {val info1 = List ((1 Val info2 = List ((1, "Beijing"), (2, "Shanghai"), (3, "Chengdu"), (4, "Hangzhou")) val data1 = env.fromCollection (info1) val data2 = env.fromCollection (info2) data1.join (data2) .where (0) .equalTo (0). Apply (first,second) = > (first._1) First._2,second._2) .print ()} def distinctFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink,flink") val data = env.fromCollection (info) data.flatMap (_ .split (","). Distinct (). Print ()} def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink") Flink ") val data = env.fromCollection (info) data.flatMap (_ .split (", "). Map ((_, 1)) .groupBy (0) .sum (1). Print ()} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1," Hadoop "), (1," Spark "), (1," Flink "), (2," Java "), (2," Spring boot "), (3," Linux "), (4) "VUE") val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(3, Little Leader, Chengdu) (1pm PK Brother, Beijing) (4, Sky Blue, Hangzhou) (2Perry J Brother, Shanghai)

OutJoin operator

Left connection

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); / / firstFunction (env); / / flatMapFunction (env); / / distinctFunction (env); / / joinFunction (env); outJoinFunction (env) } public static void outJoinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitive Brother"), new Tuple2 (2, "Brother J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue") List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (5, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.leftOuterJoin (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {if (second = = null) {return new Tuple3 (first.getField (0), first.getField (1), "-");} return new Tuple3 (first.getField (0), first.getField (1), second.getField (1)) }}). Print ();} public static void joinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitor"), new Tuple2 (2, "J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue")) List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (4, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.join (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {return new Tuple3 (first.getField (0), first.getField (1), second.getField (1));}) .print () } public static void distinctFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (","); Stream.of (tokens) .forEach (collector::collect) ) .returns (String.class) .returns () .print ();} public static void flatMapFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (",") Stream.of (tokens) .forEach (collector::collect);}) .returns (String.class) .map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {return new Tuple2 (value,1);}}) .groupBy (0) .sum (1) .map () } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(3, Little Leader, Chengdu) (1pm PK Brother, Beijing) (4, Sky Blue, -) (2Jing J Brother, Shanghai)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) / / firstFunction (env) / / flatMapFunction (env) / / distinctFunction (env) joinFunction (env)} def joinFunction (env: ExecutionEnvironment): Unit = {val info1 = List ((1 Val info2 = List ((1, "Beijing"), (2, "Shanghai"), (3, "Chengdu"), (5, "Hangzhou")) val data1 = env.fromCollection (info1) val data2 = env.fromCollection (info2) data1.leftOuterJoin (data2). Where (0) .equalTo (0). Second) = > {if (second = = null) {(first._1,first._2, "-")} else {(first._1,first._2,second._2)}) .print ()} def distinctFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink,flink") val data = env.fromCollection (info) data.flatMap (_ .split (" Distinct (). Print ()} def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink,flink") val data = env.fromCollection (info) data.flatMap (_ .split (","). Map ((_, 1)) .groupBy (0). Print ()} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1, "Hadoop"), (1) "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4, "VUE")) val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(3, Little Leader, Chengdu) (1pm PK Brother, Beijing) (4, Sky Blue, -) (2Jing J Brother, Shanghai)

Right connection

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); / / firstFunction (env); / / flatMapFunction (env); / / distinctFunction (env); / / joinFunction (env); outJoinFunction (env) } public static void outJoinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitive Brother"), new Tuple2 (2, "Brother J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue") List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (5, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.rightOuterJoin (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {if (first = = null) {return new Tuple3 (second.getField (0), "-", second.getField (1));} return new Tuple3 (first.getField (0), first.getField (1), second.getField (1)) }}). Print ();} public static void joinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitor"), new Tuple2 (2, "J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue")) List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (4, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.join (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {return new Tuple3 (first.getField (0), first.getField (1), second.getField (1));}) .print () } public static void distinctFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (","); Stream.of (tokens) .forEach (collector::collect) ) .returns (String.class) .returns () .print ();} public static void flatMapFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (",") Stream.of (tokens) .forEach (collector::collect);}) .returns (String.class) .map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {return new Tuple2 (value,1);}}) .groupBy (0) .sum (1) .map () } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(3, Little Leader, Chengdu) (1pr PK Brother, Beijing) (5meme PK Brother, Hangzhou) (2Perry J Brother, Shanghai)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) / / firstFunction (env) / / flatMapFunction (env) / / distinctFunction (env) joinFunction (env)} def joinFunction (env: ExecutionEnvironment): Unit = {val info1 = List ((1 Val info2 = List ((1, "Beijing"), (2, "Shanghai"), (3, "Chengdu"), (5, "Hangzhou")) val data1 = env.fromCollection (info1) val data2 = env.fromCollection (info2) data1.rightOuterJoin (data2). Where (0) .equalTo (0). Second) = > {if (first = = null) {(second._1, "-", second._2)} else {(first._1,first._2,second._2)}) .print ()} def distinctFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink,flink") val data = env.fromCollection (info) data.flatMap (_ .split (" Distinct (). Print ()} def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink,flink") val data = env.fromCollection (info) data.flatMap (_ .split (","). Map ((_, 1)) .groupBy (0). Print ()} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1, "Hadoop"), (1) "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4, "VUE")) val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(3, Little Leader, Chengdu) (1pr PK Brother, Beijing) (5meme PK Brother, Hangzhou) (2Perry J Brother, Shanghai)

Full external connection

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); / / firstFunction (env); / / flatMapFunction (env); / / distinctFunction (env); / / joinFunction (env); outJoinFunction (env) } public static void outJoinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitive Brother"), new Tuple2 (2, "Brother J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue") List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (5, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.fullOuterJoin (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {if (first = = null) {return new Tuple3 (second.getField (0), "-", second.getField (1)) } else if (second = = null) {return new Tuple3 (first.getField (0), first.getField (1), "-");} return new Tuple3 (first.getField (0), first.getField (1), second.getField (1));}}) .print () } public static void joinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitive Brother"), new Tuple2 (2, "Brother J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue") List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (4, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.join (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {return new Tuple3 (first.getField (0), first.getField (1), second.getField (1));}) .print () } public static void distinctFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (","); Stream.of (tokens) .forEach (collector::collect) ) .returns (String.class) .returns () .print ();} public static void flatMapFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (",") Stream.of (tokens) .forEach (collector::collect);}) .returns (String.class) .map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {return new Tuple2 (value,1);}}) .groupBy (0) .sum (1) .map () } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(3, Little Leader, Chengdu) (1pm PK Brother, Beijing) (4, Sky Blue, -) (5recoveryLittle Leader, Hangzhou) (2recoveryJ Brother, Shanghai)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) / / firstFunction (env) / / flatMapFunction (env) / / distinctFunction (env) joinFunction (env)} def joinFunction (env: ExecutionEnvironment): Unit = {val info1 = List ((1 Val info2 = List ((1, "Beijing"), (2, "Shanghai"), (3, "Chengdu"), (5, "Hangzhou")) val data1 = env.fromCollection (info1) val data2 = env.fromCollection (info2) data1.fullOuterJoin (data2). Where (0) .equalTo (0). Second) = > {if (first = = null) {(second._1, "-", second._2)} else if (second = = null) {(first._1,first._2, "-")} else {(first._1,first._2,second._2)}}) .print ()} def distinctFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop") Flink "," flink,flink ") val data = env.fromCollection (info) data.flatMap (_ .split (", "). Distinct () .print ()} def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List (" hadoop,spark "," hadoop,flink "," flink,flink ") val data = env.fromCollection (info) data.flatMap (_ .split (", ")) .map ((_ ) .groupBy (0) .sum (1). Print ()} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1, "Hadoop"), (1, "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4, "VUE") val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(3, Little Leader, Chengdu) (1pm PK Brother, Beijing) (4, Sky Blue, -) (5recoveryLittle Leader, Hangzhou) (2recoveryJ Brother, Shanghai)

Cross operator

Cartesian product

Public class DataSetTransformationApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); / mapFunction (env); / / filterFunction (env); / / mapPartitionFunction (env); / / firstFunction (env); / / flatMapFunction (env); / / distinctFunction (env); / / joinFunction (env); / / outJoinFunction (env); crossFunction (env) } public static void crossFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList; List info2 = Arrays.asList; DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2); data1.cross (data2). Print () } public static void outJoinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitive Brother"), new Tuple2 (2, "Brother J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue") List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (5, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.fullOuterJoin (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {if (first = = null) {return new Tuple3 (second.getField (0), "-", second.getField (1)) } else if (second = = null) {return new Tuple3 (first.getField (0), first.getField (1), "-");} return new Tuple3 (first.getField (0), first.getField (1), second.getField (1));}}) .print () } public static void joinFunction (ExecutionEnvironment env) throws Exception {List info1 = Arrays.asList (new Tuple2 (1, "competitive Brother"), new Tuple2 (2, "Brother J"), new Tuple2 (3, "Leader"), new Tuple2 (4, "Sky Blue") List info2 = Arrays.asList (new Tuple2 (1, "Beijing"), new Tuple2 (2, "Shanghai"), new Tuple2 (3, "Chengdu"), new Tuple2 (4, "Hangzhou"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.join (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {return new Tuple3 (first.getField (0), first.getField (1), second.getField (1));}) .print () } public static void distinctFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (","); Stream.of (tokens) .forEach (collector::collect) ) .returns (String.class) .returns () .print ();} public static void flatMapFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList ("hadoop,spark", "hadoop,flink", "flink,flink"); DataSource data = env.fromCollection (info); data.flatMap ((FlatMapFunction) (value, collector)-> {String tokens [] = value.split (",") Stream.of (tokens) .forEach (collector::collect);}) .returns (String.class) .map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {return new Tuple2 (value,1);}}) .groupBy (0) .sum (1) .map () } public static void firstFunction (ExecutionEnvironment env) throws Exception {List info = Arrays.asList (new Tuple2 (1, "Hadoop"), new Tuple2 (1, "Spark"), new Tuple2 (1, "Flink"), new Tuple2 (2, "Java"), new Tuple2 (2, "Spring boot"), new Tuple2 (3, "Linux") New Tuple2 (4, "VUE")) DataSource data = env.fromCollection (info); data.groupBy (0) .sortGroup (1, Order.DESCENDING) .first (2). Print ();} public static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List students = new ArrayList (); for (int I = 0; I

< 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student ->

{int connection = DBUntils.getConnection (); / / TODO database operation DBUntils.returnConnection (connection); return connection;}) .print (); System.out.println ("-"); / / data.mapPartition ((MapPartitionFunction) (student, collector)-> {int connection = DBUntils.getConnection () according to the number of parallelism / / TODO database operation DBUntils.returnConnection (connection); collector.collect (connection);}) .returns (Integer.class). Print ();} public static void filterFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList (Arrays.asList (Arrays.asList)); data.map (x-> x + 1) .filter (x-> x > 5). Print () } public static void mapFunction (ExecutionEnvironment env) throws Exception {DataSource data = env.fromCollection (Arrays.asList); data.map (x-> x + 1). Print ();}}

Run result (omit log)

(Manchester United, 3) (Manchester United, 1) (Manchester United, 0) (Manchester City, 3) (City, 1) (Manchester City, 0)

Scala code

Import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction (env) / / filterFunction (env) / / mapPartitionFunction (env) / / firstFunction (env) / / flatMapFunction (env) / / distinctFunction (env) / / joinFunction (env) crossFunction (env)} def crossFunction (env: ExecutionEnvironment): Unit = {val info1 = List ("Manchester United" Manchester City) val info2 = List (3Power1) val data1 = env.fromCollection (info1) val data2 = env.fromCollection (info2) data1.cross (data2). Print ()} def joinFunction (env: ExecutionEnvironment): Unit = {val info1 = List ((1, "competing Brother"), (2, "J Brother"), (3, "Leader"), (4, "Sky Blue") val info2 = List ((1, "Beijing"), (2, "Shanghai"), (3) Chengdu), (5, "Hangzhou") val data1 = env.fromCollection (info1) val data2 = env.fromCollection (info2) data1.fullOuterJoin (data2) .where (0) .equalTo (0). Apply ((first,second) = > {if (first = = null) {(second._1, "-", second._2)} else if (second = = null) {(first._1,first._2) Else {(first._1,first._2,second._2)}) .print ()} def distinctFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink,flink") val data = env.fromCollection (info) data.flatMap (_ .split (" Distinct (). Print ()} def flatMapFunction (env: ExecutionEnvironment): Unit = {val info = List ("hadoop,spark", "hadoop,flink", "flink,flink") val data = env.fromCollection (info) data.flatMap (_ .split (","). Map ((_, 1)) .groupBy (0). Print ()} def firstFunction (env: ExecutionEnvironment): Unit = {val info = List ((1, "Hadoop"), (1) "Spark"), (1, "Flink"), (2, "Java"), (2, "Spring boot"), (3, "Linux"), (4, "VUE")) val data = env.fromCollection (info) data.groupBy (0) .sortGroup (1) Order.DESCENDING) .first (2) .print ()} def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [string] for (I {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) connection}) .print () println ("-") data.mapPartition ((student) Collector: Collector [Int]) = > {val connection = DBUntils.getConnection () / / TODO database operation DBUntils.returnConnection (connection) collector.collect (connection)}). Print ()} def filterFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List ) data.map (_ + 1). Filter (_ > 5). Print ()} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List (1), 2, 4, 5)) data.map (_ + 1). Print ()}}

Run result (omit log)

(Manchester United, 3) (Manchester United, 1) (Manchester United, 0) (Manchester City, 3) (City, 1) (Manchester City, 0)

Sink (output)

We add a sink-out folder under the flink folder, and the folder is empty

Export to a text file

Public class DataSetSinkApp {public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); List data = new ArrayList (); for (int I = 1; I {println ("received:" + x) x}). Filter (_% 2 = = 0). SetParallelism (1)}}

Run result (omit log)

Received: 1 received: 22 received: 3 received: 44 received: 5 received: 66.

Union

Public class DataStreamTransformationApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / filterFunction (env); unionFunction (env); env.execute ("DataStreamTransformationApp");} public static void unionFunction (StreamExecutionEnvironment env) {DataStreamSource data1 = env.addSource (new CustomNonParallelSourceFunction ()); DataStreamSource data2 = env.addSource (new CustomNonParallelSourceFunction ()); data1.union (data2). Print (). SetParallelism (1) } public static void filterFunction (StreamExecutionEnvironment env) {DataStreamSource data = env.addSource (new CustomNonParallelSourceFunction ()); data.map (x-> {System.out.println ("received:" + x); return x;}) .filter (x-> x% 2 = = 0). Print () .setParallelism (1);}}

Run result (omit log)

1122334455..

Scala code

Import com.guanjian.flink.scala.until.CustomNonParallelSourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object DataStreamTransformationApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// filterFunction (env) unionFunction (env) env.execute ("DataStreamTransformationApp")} def unionFunction (env: StreamExecutionEnvironment): Unit = {val data1 = env.addSource (new CustomNonParallelSourceFunction) val data2 = env.addSource (new CustomNonParallelSourceFunction) Data1.union (data2). Print (). SetParallelism (1)} def filterFunction (env: StreamExecutionEnvironment): Unit = {val data = env.addSource (new CustomNonParallelSourceFunction) data.map (x = > {println ("received:" + x) x}). Filter (_% 2 = = 0). Print (). SetParallelism (1)}}

Run result (omit log)

1122334455..

Split and select

Split a stream into multiple streams and pick one of them

Public class DataStreamTransformationApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / filterFunction (env); / / unionFunction (env); splitSelectFunction (env); env.execute ("DataStreamTransformationApp");} public static void splitSelectFunction (StreamExecutionEnvironment env) {DataStreamSource data = env.addSource (new CustomNonParallelSourceFunction ()) SplitStream splits = data.split (value-> {List list = new ArrayList (); if (value% 2 = = 0) {list.add ("even");} else {list.add ("odd");} return list;}); splits.select ("odd"). Print (). SetParallelism (1) } public static void unionFunction (StreamExecutionEnvironment env) {DataStreamSource data1 = env.addSource (new CustomNonParallelSourceFunction ()); DataStreamSource data2 = env.addSource (new CustomNonParallelSourceFunction ()); data1.union (data2). Print (). SetParallelism (1);} public static void filterFunction (StreamExecutionEnvironment env) {DataStreamSource data = env.addSource (new CustomNonParallelSourceFunction ()); data.map (x-> {System.out.println ("received:" + x) Return x;}) .filter (x-> x% 2 = = 0) .print () .setParallelism (1);}}

Run result (omit log)

1357911..

Scala code

Import com.guanjian.flink.scala.until.CustomNonParallelSourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.collector.selector.OutputSelectorimport java.util.ArrayListimport java.lang.Iterableobject DataStreamTransformationApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// filterFunction (env) / / unionFunction (env) splitSelectFunction (env) env.execute ("DataStreamTransformationApp")} Def splitSelectFunction (env: StreamExecutionEnvironment): Unit = {val data = env.addSource (new CustomNonParallelSourceFunction) val splits = data.split (new OutputSelector [Long] {override def select (value: Long): Iterable [String] = {val list = new ArrayList [string] if (value% 2 = = 0) {list.add ("even")} else {list.add ("odd")} list} }) splits.select ("odd"). Print (). SetParallelism (1)} def unionFunction (env: StreamExecutionEnvironment): Unit = {val data1 = env.addSource (new CustomNonParallelSourceFunction) val data2 = env.addSource (new CustomNonParallelSourceFunction) data1.union (data2). Print (). SetParallelism (1)} def filterFunction (env: StreamExecutionEnvironment): Unit = {val data = env.addSource (new CustomNonParallelSourceFunction) data.map (x = > {println ("received:" + x) x}) .filter (_% 2 = = 0) .print () .setParallelism (1)}

Run result (omit log)

1357911..

What needs to be noted here is that split has been set to a deprecated method.

@ deprecateddef split (selector: OutputSelector [T]): SplitStream [T] = asScalaStream (stream.split (selector))

Because the return type of the OutputSelector functional interface is an Java proprietary type, which is unfriendly to Scala, Scala cannot use lambda expressions here either.

Of course, select can also select multiple streams.

Public class DataStreamTransformationApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / filterFunction (env); / / unionFunction (env); splitSelectFunction (env); env.execute ("DataStreamTransformationApp");} public static void splitSelectFunction (StreamExecutionEnvironment env) {DataStreamSource data = env.addSource (new CustomNonParallelSourceFunction ()) SplitStream splits = data.split (value-> {List list = new ArrayList (); if (value% 2 = = 0) {list.add ("even");} else {list.add ("odd");} return list;}) Splits.select ("odd", "even"). Print (). SetParallelism (1);} public static void unionFunction (StreamExecutionEnvironment env) {DataStreamSource data1 = env.addSource (new CustomNonParallelSourceFunction ()); DataStreamSource data2 = env.addSource (new CustomNonParallelSourceFunction ()); data1.union (data2). Print (). SetParallelism (1);} public static void filterFunction (StreamExecutionEnvironment env) {DataStreamSource data = env.addSource (new CustomNonParallelSourceFunction ()) Data.map (x-> {System.out.println ("received:" + x); return x;}) .filter (x-> x% 2 = = 0) .print () .setParallelism (1);}}

Run result (omit log)

123456..

The Scala code modification is the same, so I won't repeat it here.

Stream Sink

Custom Sink

Transfer data from socket to mysql

@ Data@ToString@AllArgsConstructor@NoArgsConstructorpublic class Student {private int id; private String name; private int age;} public class SinkToMySQL extends RichSinkFunction {private Connection connection; private PreparedStatement pstmt; private Connection getConnection () {Connection conn = null; try {Class.forName ("com.mysql.cj.jdbc.Driver"); String url = "jdbc:mysql://127.0.0.1:3306/flink" Conn = DriverManager.getConnection (url, "root", "abcd123");} catch (Exception e) {e.printStackTrace ();} return conn;} @ Override public void open (Configuration parameters) throws Exception {super.open (parameters); connection = getConnection (); String sql = "insert into student (id,name,age) values (?,?)"; pstmt = connection.prepareStatement (sql) } @ Override public void invoke (Student value) throws Exception {System.out.println ("invoke-"); pstmt.setInt (1meme value.getId ()); pstmt.setString (2meme value.getName ()); pstmt.setInt (3value.getAge ()); pstmt.executeUpdate ();} @ Override public void close () throws Exception {super.close () If (pstmt! = null) {pstmt.close ();} if (connection! = null) {connection.close ();} public class CustomSinkToMySQL {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource source = env.socketTextStream ("127.0.0.1", 9999) SingleOutputStreamOperator studentStream = source.map (value-> {String [] splits = value.split (","); Student stu = new Student (Integer.parseInt (splits [0]), splits [1], Integer.parseInt (splits [2]); return stu;}) .returns (Student.class); studentStream.addSink (new SinkToMySQL ()); env.execute ("CustomSinkToMySQL");}}

Execute before code execution

Nc-lk 9999

Enter after executing the code

Execution result

Scala code

Class Student (var id: Int,var name: String,var age: Int) {} import java.sql. {Connection, DriverManager PreparedStatement} import com.guanjian.flink.scala.test.Studentimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.RichSinkFunctionclass SinkToMySQL extends RichSinkFunction [Student] {var connection: Connection = null var pstmt: PreparedStatement = null def getConnection:Connection = {var conn: Connection = null Class.forName ("com.mysql.cj.jdbc.Driver") val url = "jdbc:mysql://127.0.0.1:3306/flink" conn = DriverManager.getConnection (url, "root") "abcd123") conn} override def open (parameters: Configuration): Unit = {connection = getConnectionval sql = "insert into student (id,name,age) values" Pstmt = connection.prepareStatement (sql)} override def invoke (value: Student): Unit = {println ("invoke-") pstmt.setInt (1memvalue.id) pstmt.setString (2memvalue.name) pstmt.setInt (3 Value.age) pstmt.executeUpdate ()} override def close (): Unit = {if (pstmt! = null) {pstmt.close ()} if (connection! = null) {connection.close ()}} import com.guanjian.flink.scala.until.SinkToMySQLimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object CustomSinkToMySQL {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val source = env.socketTextStream ("127.0.0.1" 9999) val studentStream = source.map (value = > {val splits = value.split (",") val stu = new Student (splits (0). ToInt, splits (1), splits (2) .toInt) stu}) studentStream.addSink (new SinkToMySQL) env.execute ("CustomSinkToMySQL")}}

Console input

Running result

Table API and SQL

To use the Table API,Java project of flink, you need to add Scala dependent libraries.

Org.apache.flink flink-scala_$ {scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_$ {scala.binary.version} ${flink.version} org.scala-lang scala-library ${scala.version} org.apache.flink flink-table_2.11 ${flink.version}

Add a sales.csv file to the data directory with the following contents

TransactionId,customerId,itemId,amountPaid111,1,1100.0112,2,3505.0113,3,3510.0114,4,4600.0115,1,2500.0116,1,2500.0117,1,2500.0118,1,2600.0119,2,3400.0120,1,2500.0121,1,4500.0122,1,2500.0123,1,4500.0124,1,2600.0import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource Import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.BatchTableEnvironment;import org.apache.flink.types.Row;public class TableSQLAPI {@ Data @ ToString @ AllArgsConstructor @ NoArgsConstructor public static class SalesLog {private String transactionId; private String customerId; private String itemId; private Double amountPaid;} public static void main (String [] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment () BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment (env); String filePath = "/ Users/admin/Downloads/flink/data/sales.csv"; DataSource csv = env.readCsvFile (filePath). IgnoreFirstLine () .pojoType (SalesLog.class, "transactionId", "customerId", "itemId", "amountPaid"); Table sales = tableEnv.fromDataSet (csv); tableEnv.registerTable ("sales", sales) Table resultTable = tableEnv.sqlQuery ("select customerId,sum (amountPaid) money from sales" + "group by customerId"); DataSet result = tableEnv.toDataSet (resultTable, Row.class); result.print ();}}

Run result (omit log)

3510.04600.01,4800.02905.0

Scala code

Scala projects also need to put dependencies

Org.apache.flink flink-table_2.11 ${flink.version} import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.types.Rowimport org.apache.flink.api.scala._object TableSQLAPI {case class SalesLog (transactionId: String,customerId: String,itemId: String AmountPaid: Double) def main (args: Array [String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment (env) val filePath = "/ Users/admin/Downloads/flink/data/sales.csv" val csv = env.readCsvFile [SalesLog] (filePath,ignoreFirstLine = true,includedFields = Array ("sales", sales) val resultTable = tableEnv.sqlQuery ("select customerId") Sum (amountPaid) money from sales "+" group by customerId ") tableEnv.toDataSet [Row] (resultTable) .print ()}}

Run result (omit log)

3510.04600.01,4800.02905.0

Time and window

There are three important times in Flink, including event time (Event Time), processing time (Processing Time), and time to enter the Flink system (Ingestion Time).

Usually we use the event time as the benchmark.

Set the code for the time

Env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime)

The event time is usually contained in a field in the incoming data in the form of a timestamp, which is extracted to determine when the window will execute.

Window (Windows) is mainly for stream processing (infinite flow), which divides the stream data into buckets according to time period or size. Windows are divided into two types, one is based on key statistics, the other is all. Its processing process is as follows

Keyed Windowsstream .keyby (...) Ele.getField (0) .flatMap (new RichFlatMapFunction () {private transient ValueState state; @ Override public void open (Configuration parameters) throws Exception {super.open (parameters)) State = getRuntimeContext () .getState (new ValueStateDescriptor ("avg", TypeInformation.of (new TypeHint () {}));} @ Override public void flatMap (Tuple2 value, Collector out) throws Exception {Tuple2 tmpState = state.value () Tuple2 currentState = tmpState = = null? Tuple2.of (0): tmpState; Tuple2 newState = new Tuple2 ((int) currentState.getField (0) + 1, (int) currentState.getField (1) + (int) value.getField (1)); state.update (newState) If ((int) newState.getField (0) > = 2) {out.collect (new Tuple2 (value.getField (0), (int) newState.getField (1) / (int) newState.getField (0)); state.clear () ) .print () .setParallelism (1); env.execute ("KeyedStateApp");}}

Running result

(1pr 4) (1pr 5)

Scala code

Import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state. {ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala.createTypeInformationobject KeyedStateApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection (List ((1m 3), (1m 5), (1m 7), (1m 4) (1getRuntimeContext.getState 2)) .keyby (_ 1) .flatMap (new RichFlatMapFunction [(Int,Int), (Int,Int)] {var state: ValueState [(Int,Int)] = _ override def open (parameters: Configuration): Unit = {state = getRuntimeContext.getState (new ValueStateDescriptor [(Int,Int)] ("avg", createTypeInformation [(Int,Int)} override def flatMap (value: (Int,Int), out: Collector [(Int) Int)]) = {val tmpState = state.value () val currentState = if (tmpState! = null) {tmpState} else {(0memo)} val newState = (currentState._1 + 1) CurrentState._2 + value._2) state.update (newState) if (newState._1 > = 2) {out.collect ((value._1,newState._2 / newState._1)) state.clear ()}) .print () .setParallelism (1) env.execute ("KeyedStateApp")}}

Running result

(1pr 4) (1pr 5)

Reducing State

/ * count the number of data entries and add up * / public class ReducingStateApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () Env.fromCollection (Arrays.asList (new Tuple2), new Tuple2), new Tuple2, new Tuple2, new Tuple2) .keyby (ele-> ele.getField (0)) .flatMap (new RichFlatMapFunction () {private transient ReducingState state) @ Override public void open (Configuration parameters) throws Exception {super.open (parameters) State = getRuntimeContext () .getReducingState (new ReducingStateDescriptor ("sum", new ReduceFunction () {@ Override public Tuple2 reduce (Tuple2 value1, Tuple2 value2) throws Exception {Tuple2 tuple2 = new Tuple2 ((int) value1.getField (0) + 1) (int) value1.getField (1) + (int) value2.getField (1)) Return tuple2;}}, TypeInformation.of (new TypeHint () {})) } @ Override public void flatMap (Tuple2 value, Collector out) throws Exception {Tuple2 tuple2 = new Tuple2 (value.getField (0), value.getField (1)); state.add (tuple2) Out.collect (new Tuple2 (state.get (). GetField (0), state.get (). GetField (1));}}) .print () .setParallelism (1); env.execute ("ReducingStateApp");}}

Running result

(2pr 8) (3pr 15) (4pr 19) (5pr 21)

Scala code

Import org.apache.flink.api.common.functions. {ReduceFunction, RichFlatMapFunction} import org.apache.flink.api.common.state. {ReducingState, ReducingStateDescriptor} import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorobject ReducingStateApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection (List ((1m 3), (1m 5), (1m 7), (1m 4) (1getRuntimeContext.getReducingState 2)) .keyby (_ 1) .flatMap (new RichFlatMapFunction [(Int,Int), (Int,Int)] {var state: ReducingState [(Int,Int)] = _ override def open (parameters: Configuration): Unit = {state = getRuntimeContext.getReducingState (new ReducingStateDescriptor [(Int,Int)] ("sum", new ReduceFunction [(Int,Int)] {override def reduce (value1: (Int,Int), value2: (Int) Int): (Int,Int) = {(value1._1 + 1 value1.value2 + value2._2)}}, createTypeInformation [(Int,Int)])} override def flatMap (value: (Int,Int), out: Collector [(Int,Int)]) = {val tuple2 = (value._1) Value._2) state.add (tuple2) out.collect ((state.get (). _ 1) state.get () _ 2)}) .print () .setParallelism (1) env.execute ("ReducingStateApp")}}

Running result

(2pr 8) (3pr 15) (4pr 19) (5pr 21)

List State

/ * get the location of each entry * / public class ListStateApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () Env.fromCollection (Arrays.asList (new Tuple2), new Tuple2), new Tuple2, new Tuple2, new Tuple2) .keyby (ele-> ele.getField (0)) .flatMap (new RichFlatMapFunction () {private transient ListState state) @ Override public void open (Configuration parameters) throws Exception {super.open (parameters); state = getRuntimeContext () .getListState (new ListStateDescriptor ("list", TypeInformation.of (new TypeHint () {}) } @ Override public void flatMap (Tuple2 value, Collector out) throws Exception {state.add (value.getField (0)); Iterator iterator = state.get () .iterator (); Integer l = 0 While (iterator.hasNext ()) {l + = iterator.next ();} Tuple3 tuple3 = new Tuple3 (value.getField (0), value.getField (1), l); out.collect (tuple3) ). Print (). SetParallelism (1); env.execute ("ListStateApp");}}

Running result

(1) (1) (1) (1) (1) (5) (2) (7) (4) (5)

Scala code

Import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state. {ListState, ListStateDescriptor} import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorobject ListStateApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection (List ((1m 3), (1m 5), (1m 7), (1m 4) (1Unit 2)) .keyby (_. _ 1) .flatMap (new RichFlatMapFunction [(Int,Int), (Int,Int,Int)] {var state: ListState [Int] = _ override def open (parameters: Configuration): Unit = {state = getRuntimeContext.getListState (new ListStateDescriptor [Int] ("list", createTypeInformation [Int]) } override def flatMap (value: (Int, Int), out: Collector [(Int, Int, Int)]) = {state.add (value._1) val iterator = state.get () .iterator () var l: Int = 0 while (iterator.hasNext) {l + = iterator.next ()} val tuple3 = (value._1,value._2) L) out.collect (tuple3)}) .print () .setParallelism (1) env.execute ("ListStateApp")}}

Running result

(1) (1) (1) (1) (1) (5) (2) (7) (4) (5)

Fold State

/ * * count the number of entries from an initial value * / public class FoldStateApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () Env.fromCollection (Arrays.asList (new Tuple2), new Tuple2), new Tuple2, new Tuple2, new Tuple2) .keyby (ele-> ele.getField (0)) .flatMap (new RichFlatMapFunction () {private transient FoldingState state) @ Override public void open (Configuration parameters) throws Exception {super.open (parameters) State = getRuntimeContext () .getFoldingState (new FoldingStateDescriptor ("fold", 1, (accumulator, value)-> accumulator + value, TypeInformation.of (new TypeHint () {}) } @ Override public void flatMap (Tuple2 value, Collector out) throws Exception {state.add (value.getField (0)); out.collect (new Tuple3 (value.getField (0), value.getField (1), state.get ());}}) .print () .setParallelism (1); env.execute ("FoldStateApp") }}

Running result

(1) (1) (1) (7) (4) (1) (4) (5) (1) (2) (5) (3) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (5) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (5) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1

Scala code

Import org.apache.flink.api.common.functions. {FoldFunction, RichFlatMapFunction} import org.apache.flink.api.common.state. {FoldingState, FoldingStateDescriptor} import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorobject FoldStateApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection (List ((1m 3), (1m 5), (1m 7), (1m 4) (1Unit 2)) .keyBy (_. _ 1) .flatMap (new RichFlatMapFunction [(Int,Int), (Int,Int,Int)] {var state: FoldingState [Int,Int] = _ override def open (parameters: Configuration): Unit = {state = getRuntimeContext.getFoldingState (new FoldingStateDescriptor [Int,Int] ("fold", 1) new FoldFunction [Int,Int] {override def fold (accumulator: Int) Value: Int) = {accumulator + value}}, createTypeInformation [Int])} override def flatMap (value: (Int, Int), out: Collector [(Int, Int, Int)]) = {state.add (value._1) out.collect ((value._1,value._2) State.get ())}) .print () .setParallelism (1) env.execute ("FoldStateApp")}}

Running result

(1) (1) (1) (7) (4) (1) (4) (5) (1) (2) (5) (3) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (5) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (5) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1) (1

Map State

/ * add each piece of data to the previous one, and the first item keeps itself * / public class MapStateApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () Env.fromCollection (Arrays.asList (new Tuple2), new Tuple2), new Tuple2, new Tuple2, new Tuple2) .keyby (ele-> ele.getField (0)) .flatMap (new RichFlatMapFunction () {private transient MapState state) @ Override public void open (Configuration parameters) throws Exception {super.open (parameters); state = getRuntimeContext () .getMapState (new MapStateDescriptor ("map", TypeInformation.of (new TypeHint () {}), TypeInformation.of (new TypeHint () {}) } @ Override public void flatMap (Tuple2 value, Collector out) throws Exception {Integer tmp = state.get (value.getField (0)); Integer current = tmp = = null? 0: tmp; state.put (value.getField (0), value.getField (1)) Tuple2 tuple2 = new Tuple2 (value.getField (0), current + (int) value.getField (1)); out.collect (tuple2);}}) .print () .setParallelism (1); env.execute ("MapStateApp");}}

Running result

(1) (1) (1) (8) (1) (12) (1) (11) (1)

Scala code

Import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state. {MapState, MapStateDescriptor} import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala.createTypeInformationobject MapStateApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection (List ((1m 3), (1m 5), (1m 7), (1m 4) ) .keyby (_. _ 1) .flatMap (new RichFlatMapFunction [(Int,Int), (Int,Int)] {var state: MapState [Int,Int] = _ override def open (parameters: Configuration): Unit = {state = getRuntimeContext.getMapState (new MapStateDescriptor [Int,Int] ("map", createTypeInformation [Int], createTypeInformation [int]))} override def flatMap (value: (Int,Int), out: Collector [(Int) Int)]) = {val tmp: Int = state.get (value._1) val current: Int = if (tmp = = null) {0} else {tmp} state.put (value._1,value._2) val tuple2 = (value._1 Current + value._2) out.collect (tuple2)}) .print () .setParallelism (1) env.execute ("MapStateApp")}}

Running result

(1) (1) (1) (8) (1) (12) (1) (11) (1)

Aggregating State

/ * find the average of each piece of data and all previous data * / public class AggregatingStateApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () Env.fromCollection (Arrays.asList (new Tuple2), new Tuple2), new Tuple2, new Tuple2, new Tuple2) .keyby (ele-> ele.getField (0)) .flatMap (new RichFlatMapFunction () {private transient AggregatingState state) @ Override public void open (Configuration parameters) throws Exception {super.open (parameters); state = getRuntimeContext () .getAggregatingState (new AggregatingStateDescriptor ("agg", new AggregateFunction () {@ Override public Tuple2 createAccumulator () {return new Tuple2 (0)) } @ Override public Tuple2 add (Integer value, Tuple2 accumulator) {return new Tuple2 ((int) accumulator.getField (0) + value, (int) accumulator.getField (1) + 1) } @ Override public Integer getResult (Tuple2 accumulator) {return (int) accumulator.getField (0) / (int) accumulator.getField (1) } @ Override public Tuple2 merge (Tuple2 a, Tuple2 b) {return new Tuple2 ((int) a.getField (0) + (int) b.getField (0), (int) a.getField (1) + (int) b.getField (1)) }}, TypeInformation.of (new TypeHint () {}));} @ Override public void flatMap (Tuple2 value, Collector out) throws Exception {state.add (value.getField (1)) Tuple2 tuple2 = new Tuple2 (value.getField (0), state.get (); out.collect (tuple2);}}) .print () .setParallelism (1); env.execute ("AggregatingStateApp");}}

Running result

(1) (1) (1) (4) (1) (5) (1) (4)

Scala code

Import org.apache.flink.api.common.functions. {AggregateFunction, RichFlatMapFunction} import org.apache.flink.api.common.state. {AggregatingState, AggregatingStateDescriptor} import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorobject AggregatingStateApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection (List ((1m 3), (1m 5), (1m 7), (1m 4) ) .keyby (_. _ 1) .flatMap (new RichFlatMapFunction [(Int,Int), (Int,Int)] {var state: AggregatingState [Int,Int] = _ override def open (parameters: Configuration): Unit = {state = getRuntimeContext.getAggregatingState (new AggregatingStateDescriptor [Int, (Int,Int), Int] ("agg", new AggregateFunction [Int, (Int,Int), Int] {override def add (value: Int) Accumulator: (Int, Int)) = {(accumulator._1 + value,accumulator._2 + 1)} override def createAccumulator () = {(0Power0)} override def getResult (accumulator: (Int) Int)) = {accumulator._1 / accumulator._2} override def merge (a: (Int,Int), b: (Int,Int)) = {(a. Int,Int), b: (Int,Int)) = {(a. Override def flatMap 1 + b. (Int))}, createTypeInformation [(Int,Int)])} override def flatMap (value: (Int)) Int), out: Collector [(Int, Int)]) = {state.add (value._2) val tuple2 = (value._1,state.get ()) out.collect (tuple2)}) .print () .setParallelism (1) env.execute ("AggregatingStateApp")}}

Running result

(1) (1) (1) (4) (1) (5) (1) (4)

Checkpoint mechanism

Every operator in Flink can be stateful. In order to make the state fault-tolerant and persist the state, there is a Checkpoint mechanism. Checkpoint can restore the state and the location of consumption in the stream, providing a fault-free way to execute.

By default, the checkpoint mechanism is disabled and requires us to turn it on manually.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / enable Checkpoint and perform Checkpointenv.enableCheckpointing every 4 seconds (4000); / / set the mode of Checkpoint, which is also the default mode of Checkpoint, and is suitable for most applications. / / there is a CheckpointingMode.AT_LEAST_ONCE at least once, which is generally used in ultra-low latency scenarios env.getCheckpointConfig () .setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE). / / set the timeout of Checkpoint, here is 10 seconds env.getCheckpointConfig () .setCheckpointTimeout (10000); / / set the concurrency number of Checkpoint, which can be one or multiple env.getCheckpointConfig () .setMaxConcurrentCheckpoints (1); public class CheckpointApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.enableCheckpointing (5000); DataStreamSource stream = env.socketTextStream ("127.0.0.1", 9999) Stream.map (x-> {if (x.contains)) {throw new RuntimeException ("out of bug...");} else {return x;}}) .print () .setParallelism (1); env.execute ("CheckpointApp");}}

In general, if we do not open nc-lk 9999, the program will hang up directly, but we have opened Checkpoint here. Although port 9999 is not open, it will always try to connect to port 9999 and will not hang up. The number of retries of Checkpoint is Integer.MAX_VALUE, so we will always see this log.

Java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect (Native Method) at java.net.AbstractPlainSocketImpl.doConnect (AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress (AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect (AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect (SocksSocketImpl.java:392) at java.net.Socket.connect (Socket.java At org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run (SocketTextStreamFunction.java:96) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:94) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:99) at org. Apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run (Task.java:704) at java.lang.Thread.run (Thread.java:748)

Scala code

Import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object CheckpointApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing (5000) val stream = env.socketTextStream ("127.0.0.1" 9999) stream.map (x = > {if (x.contains)) {throw new RuntimeException ("bug...")} else {x}) .print () .setParallelism (1) env.execute ("CheckpointApp")}}

Restart strategy

As we just saw, if the restart policy is not set, Checkpoint will have a default restart policy with an Integer.MAX_VALUE number of times and a delay of 1 second. If we only want to restart twice, we need to set the restart policy, which can be set in the configuration file of Flink or in the code.

For example, edit flink-conf.yaml under the conf directory of flink

Restart-strategy.fixed-delay.attempts: 2restart-strategy.fixed-delay.delay: 5 s

The number of restarts after failure is 2 and the delay interval is 5 seconds.

Set in the code

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.enableCheckpointing (5000); env.setRestartStrategy (RestartStrategies.fixedDelayRestart (2, Time.of (5, TimeUnit.SECONDS)

It should be noted here that using the restart policy, the Checkpoint mechanism must be enabled, otherwise it will be invalid.

Public class CheckpointApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.enableCheckpointing (5000); env.setRestartStrategy (RestartStrategies.fixedDelayRestart (2, Time.of (5, TimeUnit.SECONDS); DataStreamSource stream = env.socketTextStream ("127.0.0.1", 9999); stream.map (x-> {if (x.contains ("Competition")) {throw new RuntimeException ("out of bug...") } else {return x;}}) .print () .setParallelism (1); env.execute ("CheckpointApp");}}

When we open nc-lk 9999, and then run the program, when we output two contests on the console, the program will throw an exception

Java.lang.RuntimeException: there is a bug. At com.guanjian.flink.java.test.CheckpointApp.lambda$main$95f17bfa$1 (CheckpointApp.java:18) at org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput (StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run (OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask .invoke (StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run (Task.java:704) at java.lang.Thread.run (Thread.java:748)

But it won't hang up. When we enter the third competition, the program will die completely.

Scala code

Import java.util.concurrent.TimeUnitimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.time.Timeimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object CheckpointApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing (5000) env.setRestartStrategy (RestartStrategies.fixedDelayRestart (2, Time.of (5, TimeUnit.SECONDS) val stream = env.socketTextStream ("127.0.0.1" 9999) stream.map (x = > {if (x.contains)) {throw new RuntimeException ("bug...")} else {x}) .print () .setParallelism (1) env.execute ("CheckpointApp")}}

StateBackend

By default, the State of Checkpoint is stored in memory. Once our program is hung up and restarted, the previous state will be lost. For example, we typed it in nc.

A,a,a

In the case of the previous CheckpointApp, let's make some changes.

Public class CheckpointApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.enableCheckpointing (5000); env.setRestartStrategy (RestartStrategies.fixedDelayRestart (2, Time.of (5, TimeUnit.SECONDS); DataStreamSource stream = env.socketTextStream ("127.0.0.1", 9999); stream.map (x-> {if (x.contains ("Competition")) {throw new RuntimeException ("out of bug...") } else {return x;}) .flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector out) throws Exception {String [] splits = value.split (","); Stream.of (splits) .forEach (token-> out.collect (new Tuple2 (token,1) }) .keyby (0) .sum (1) .print () .setParallelism (1); env.execute ("CheckpointApp");}}

The running result is

(a ~ ~ 1) (a ~ ~ 2) (a ~ ~ 3)

There is no problem with this. Now once the program is dead and the program is started again, we will do the same treatment again, and the result will remain the same.

But if we don't want such a result, what we want is

(a _ pr _ 4) (a _ r 5) (a _ r _ 6)

The results before hanging up before keeping continue to accumulate.

Public class CheckpointApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.enableCheckpointing (5000); / / non-memory external extension env.getCheckpointConfig (). EnableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); / / State stores env.setStateBackend as a file (new FsStateBackend ("hdfs://172.18.114.236:8020/backend")) Env.setRestartStrategy (RestartStrategies.fixedDelayRestart (2, Time.of (5, TimeUnit.SECONDS)); DataStreamSource stream = env.socketTextStream ("host1", 9999); stream.map (x-> {if (x.contains ("Competition")) {throw new RuntimeException ("out of bug...");} else {return x ) .flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector out) throws Exception {String [] splits = value.split (","); Stream.of (splits) .forEach (token-> out.collect (new Tuple2 (token,1) }) .keyby (0) .sum (1) .print () .setParallelism (1); env.execute ("CheckpointApp");}}

The main class that adjusts and runs in pom

Com.guanjian.flink.java.test.CheckpointApp

Package and upload the test directory of the server flink

Modify the flink-conf.yaml under the conf directory of flink to add the following

State.backend: filesystemstate.checkpoints.dir: hdfs://172.18.114.236:8020/backendstate.savepoints.dir: hdfs://172.18.114.236:8020/backend

Create a new backend directory in HDFS

Hdfs dfs-mkdir / backend

Restart Flink, turn on

Nc-lk 9999

The method of first submission remains the same.

. / flink run-m yarn-cluster-yn 1.. / test/flink-train-java-1.0.jar

Continue with previous input

A,a,a

If you stop the program submitted by flink at this time, you will find a folder with many numbers in hdfs.

Now let's start the program again, but it's a little different from before.

. / flink run-s hdfs://172.18.114.236:8020/backend/4db93b564e17b3806230f7c2d053121e/chk-5-m yarn-cluster-yn 1.. / test/flink-train-java-1.0.jar

Continue typing in nc at this time

A,a,a

The result of the operation met our expectations.

Scala code

Import java.util.concurrent.TimeUnitimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.time.Timeimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.streaming.api.environment.CheckpointConfigobject CheckpointApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing (5000) env.getCheckpointConfig .enableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend (new FsStateBackend ("hdfs://172.18.114.236:8020/backend")) env.setRestartStrategy (RestartStrategies.fixedDelayRestart (2) Time.of (5, TimeUnit.SECONDS)) val stream = env.socketTextStream ("host1", 9999) stream.map (x = > {if (x.contains ("Competition")) {throw new RuntimeException ("bug...")} else {x}) .flatMap (_ .split (",")) .map ((_ 1)) .keyby (0) .sum (1) .print () .setParallelism (1) env.execute ("CheckpointApp")}}

RocksDBStateBackend

To use RocksDBBackend, you need to add dependencies first.

Org.apache.flink flink-statebackend-rocksdb_2.11 ${flink.version} public class CheckpointApp {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.enableCheckpointing (5000); / / non-memory external extension env.getCheckpointConfig () .enableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) / State is stored in RockDB database and brushed onto hdfs to env.setStateBackend (new RocksDBStateBackend ("hdfs://172.18.114.236:8020/backend/rocksDB", true)); env.setRestartStrategy (RestartStrategies.fixedDelayRestart (2, Time.of (5, TimeUnit.SECONDS)); DataStreamSource stream = env.socketTextStream ("host1", 9999) Stream.map (x-> {if (x.contains)) {throw new RuntimeException ("out of bug...");} else {return x;}) .flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector out) throws Exception {String [] splits = value.split (",") Stream.of (splits) .forEach (token-> out.collect (new Tuple2 (token,1));}}) .keyby (0) .sum (1) .print () .setParallelism (1); env.execute ("CheckpointApp");}}

Under the test directory of the server flink

Create a directory for hdfs

Hdfs dfs-mkdir / backend/rocksDB

Configure flink-conf.yaml for flink, modify and add the following

State.backend: rocksdbstate.checkpoints.dir: hdfs://172.18.114.236:8020/backend/rocksDBstate.savepoints.dir: hdfs://172.18.114.236:8020/backend/rocksDBstate.backend.incremental: truestate.backend.rocksdb.checkpoint.transfer.thread.num: 1state.backend.rocksdb.localdir: / raid/db/flink/checkpointsstate.backend.rocksdb.timer-service.factory: HEAP

Restart Flink. Execution

Nc-lk 9999

The method of first submission remains the same.

. / flink run-m yarn-cluster-yn 1.. / test/flink-train-java-1.0.jar

Continue with previous input

A,a,a

If you stop the program submitted by flink at this time, you will find a folder with many numbers in hdfs.

On a cluster server, it can only be said that it is a certain server, not necessarily the server where you submitted the task, and you can see the local data files of rocksdb.

Rocksdbbackend first stores the data there and then brushes it to the hdfs.

Start the program again

. / flink run-s hdfs://172.18.114.236:8020/backend/rocksDB/6277c8adfba91c72baa384a0d23581d9/chk-64-m yarn-cluster-yn 1.. / test/flink-train-java-1.0.jar

Enter at this time

A,a,a

At this point, we go to observe that the result is the same as before.

Scala code

Import java.util.concurrent.TimeUnitimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.time.Timeimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.contrib.streaming.state.RocksDBStateBackendimport org.apache.flink.streaming.api.environment.CheckpointConfigobject CheckpointApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing (5000) env.getCheckpointConfig .enableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend (new RocksDBStateBackend ("hdfs://172.18.114.236:8020/backend/rocksDB") True)) env.setRestartStrategy (RestartStrategies.fixedDelayRestart (2, Time.of (5, TimeUnit.SECONDS)) val stream = env.socketTextStream ("host1", 9999) stream.map (x = > {if (x.contains ("Competition")) {throw new RuntimeException ("bug...")} else {x}) .flatMap (_ .split (",")) .map ((_ 1) .keyby (0) .sum (1) .print () .setParallelism (1) env.execute ("CheckpointApp")} Monitoring and tuning

HistoryServer

HistoryServer is used to view information about Job that has already been run.

Edit flink-conf.yaml in the conf directory of flink to add the following content

Jobmanager.archive.fs.dir: hdfs://172.18.114.236:8020/completed-jobs/historyserver.web.address: 0.0.0.0historyserver.web.port: 8082historyserver.archive.fs.dir: hdfs://172.18.114.236:8020/completed-jobs/historyserver.archive.fs.refresh-interval: 10000

Run under the bin directory

. / historyserver.sh start

When you visit the public network ip:8082 in the browser, you can see a Web interface (there is no content here when you first came in. I left it here after running a Job).

As usual, we run a task, and when it is over, we can see the following information

You can also see the information retained by the task in hdfs.

Information provides REST API interfaces that can be accessed in Json format, such as

This is the end of the content of "what is the use of Flink technology". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report