Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The usage of Flink transformation under Apache

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces "the usage of Flink transformation under Apache". In daily operation, I believe many people have doubts about the usage of Flink transformation under Apache. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about the usage of Flink transformation under Apache! Next, please follow the editor to study!

Map FunctionScala

Create a new Object

Object DataSetTransformationApp {def main (args: Array [String]): Unit = {val environment = ExecutionEnvironment.getExecutionEnvironment} def mapFunction (env: ExecutionEnvironment): Unit = {val data = env.fromCollection (List)

The data source here is a list collection of 1 to 10. The principle of Map is to assume that there are N elements in the data dataset and transform each element:

Data.map {x = > x.toInt}

For example: yallowf (x)

/ / each element in a pair of data does a + 1 operation data.map ((x:Int) = > x + 1) .print ()

Then do a + 1 operation for each element.

Write it simply:

If there is only one element in this, it can be written directly in the following form:

Data.map ((x) = > x + 1) .print ()

A more concise way of writing:

Data.map (x = > x + 1) .print ()

A more concise approach:

Data.map (_ + 1). Print ()

Output result:

234567891011Java public static void main (String [] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment (); mapFunction (executionEnvironment);} public static void mapFunction (ExecutionEnvironment executionEnvironment) throws Exception {List list = new ArrayList (); for (int I = 1; I 5). Print ()}

Filter only returns records that meet the criteria.

Java public static void filterFunction (ExecutionEnvironment env) throws Exception {List list = new ArrayList (); for (int I = 1; I 5;}}). Print ();} MapPartition Function

What's the difference between map function and MapPartition function?

Requirements: there are 100 elements in DataSource to store the results in the database

If you use map function, the implementation is as follows:

/ / there are 100 elements in DataSource. Store the results in the database def mapPartitionFunction (env: ExecutionEnvironment): Unit = {val students = new ListBuffer [String] for (I {/ / every element is to be stored in the database, you must first get connection val connection = DBUtils.getConnection () println (connection + "...) / / TODO.... Save data to DB DBUtils.returnConnection (connection)}) .print ()}

Print the result, and 100 requests to get DBUtils.getConnection () will be printed. If the amount of data increases, it is obviously unrealistic to keep getting connections.

So MapPartition arises at the historic moment, transforming the data in a partition, that is, the data in a partition is called once.

So first set up the partition:

Val data = env.fromCollection (students) .setParallelism (4)

Set up 4 partitions, that is, parallelism, and then use mapPartition to process:

Data.mapPartition (x = > {val connection = DBUtils.getConnection () println (connection + "...") / / TODO.... Save data to DB DBUtils.returnConnection (connection) x}. Print ()

Then there will be 4 connection requests, with each partition getting a connection.

Javapublic static void mapPartitionFunction (ExecutionEnvironment env) throws Exception {List list = new ArrayList (); for (int I = 1; I {(first._1, first._2, second._2)}) .print ()}

The output is as follows:

(3jingflink, Shenzhen) (1recoveryHadoop, Beijing) Java public static void joinFunction (ExecutionEnvironment env) throws Exception {List info1 = new ArrayList (); / / numbered name info1.add (new Tuple2 (1, "hadoop"); info1.add (new Tuple2 (2, "spark")); info1.add (new Tuple2 (3, "flink")) Info1.add (new Tuple2 (4, "java")); List info2 = new ArrayList (); / / numbered city info2.add (new Tuple2 (1, "Beijing")); info2.add (new Tuple2 (2, "Shanghai")); info2.add (new Tuple2 (3, "Shenzhen")); info2.add (new Tuple2 (5, "Guangzhou")) DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2); data1.join (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {return new Tuple3 (first.f0, first.f1,second.f1);}}) .print ();}

Tuple2, Tuple2 represents the set of two inputs, Tuple3 > represents the Tuple3 of the output

OuterJoin

The join mentioned above is an internal connection, and this OuterJoin is an external connection, including a left outer connection and a right outer connection, all connected to two data sets.

Def outJoinFunction (env: ExecutionEnvironment): Unit = {val info1 = ListBuffer [(Int, String)] () / / numbered name info1.append ((1, "hadoop")) info1.append ((2, "spark")) info1.append ((3, "flink")) info1.append ((4, "java")) val info2 = ListBuffer [(Int, String)] () / / numbered city info2.append ((1) "Beijing")) info2.append ((2, "Shanghai") info2.append ((3, "Shenzhen")) info2.append ((5, "Guangzhou")) val data1 = env.fromCollection (info1) val data2 = env.fromCollection (info2) data1.leftOuterJoin (data2) .where (0) .equalTo (0). Apply ((first, second) = > {if (second = null) {(first._1, first._2) "-")} else {(first._1, first._2, second._2)}) .print () / / left outer link shows all the data on the left}

Left outer connection, when the data on the left has no corresponding data on the right, it needs to be processed, otherwise a null pointer exception will occur. The output is as follows:

(3) (3) flink, Shenzhen) (1) (1) Hadoop, Beijing) (2) (2) spark, Shanghai) (4)

Right outer connection:

Data1.rightOuterJoin (data2) .where (0) .equalTo (0). Apply ((first, second) = > {if (first = = null) {(second._1, "-", second._2)} else {(first._1, first._2, second._2)}) .print ()

Right outer connection, output:

(3recoveryflink, Shenzhen) (1recoveryhadoop, Beijing) (5recoverylichy, Guangzhou) (2recoverspark, Shanghai)

Fully connected:

Data1.fullOuterJoin (data2) .where (0) .equalTo (0). Apply ((first, second) = > {if (first = = null) {(second._1, "-", second._2)} else if (second = = null) {(second._1, "-", second._2)} else {(first._1, first._2, second._2)}). Print () (3Flink Shenzhen) (1) Hadoop, Beijing) (5) Java, Guangzhou) (2) (2) Spark, Shanghai) (4) java -)

Left outer connection:

Public static void outjoinFunction (ExecutionEnvironment env) throws Exception {List info1 = new ArrayList (); / / numbered name info1.add (new Tuple2 (1, "hadoop"); info1.add (new Tuple2 (2, "spark")); info1.add (new Tuple2 (3, "flink")); info1.add (new Tuple2 (4, "java")); List info2 = new ArrayList () / / numbered city info2.add (new Tuple2 (1, "Beijing"); info2.add (new Tuple2 (2, "Shanghai")); info2.add (new Tuple2 (3, "Shenzhen")); info2.add (new Tuple2 (5, "Guangzhou")); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2) Data1.leftOuterJoin (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {if (second = = null) {return new Tuple3 (first.f0, first.f1, "-") } return new Tuple3 (first.f0, first.f1,second.f1);}}) .print ();}

Right outer connection:

Data1.rightOuterJoin (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {if (first = = null) {return new Tuple3 (second.f0, "-", second.f1);} return new Tuple3 (first.f0, first.f1, second.f1) }}) .print ()

Fully connected:

Data1.fullOuterJoin (data2) .where (0) .equalTo (0) .with (new JoinFunction () {@ Override public Tuple3 join (Tuple2 first, Tuple2 second) throws Exception {if (first = = null) {return new Tuple3 (second.f0, "-", second.f1) } else if (second = = null) {return new Tuple3 (first.f0, first.f1, "-");} return new Tuple3 (first.f0, first.f1, second.f1);}}) .print (); cross functionScala

Cartesian product, left and right cross processing

Def crossFunction (env: ExecutionEnvironment): Unit = {val info1 = List ("Qiao Feng", "Murong Fu") val info2 = List val data1 = env.fromCollection (info1) val data2 = env.fromCollection (info2) data1.cross (data2). Print ()}

Output:

(Qiao Feng, 3) (Qiao Feng, 1) (Qiao Feng, 0) (Murong Fu, 3) (Murong Fu, 1) (Murong Fu, 0) Javapublic static void crossFunction (ExecutionEnvironment env) throws Exception {List info1 = new ArrayList (); info1.add ("Qiao Feng"); info1.add ("Murong Fu"); List info2 = new ArrayList (); info2.add ("3"); info2.add ("1") Info2.add ("0"); DataSource data1 = env.fromCollection (info1); DataSource data2 = env.fromCollection (info2); data1.cross (data2). Print ();} at this point, the study of "the usage of Flink transformation under Apache" is over, hoping to solve everyone's doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report