In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
Today, I will talk to you about how to use CoProcessFunction in Flink. Many people may not know much about it. In order to make you understand better, the editor has summarized the following for you. I hope you can get something according to this article.
This article is the fifth in a series of "Flink processing functions", which focuses on how to deal with data from two data sources at the same time.
When facing two input streams, if there is a business relationship between the data of the two streams, how to encode the implementation, such as the operation in the following figure, listening to ports 9998 and 9999 at the same time, the received output is processed separately, and then processed (printed) by the same sink:
The way Flink supports it is to extend CoProcessFunction to deal with it. For a better understanding, let's look at the class diagrams of KeyedProcessFunction and CoProcessFunction together, as shown below:
As can be seen from the above figure, the inheritance relationship between CoProcessFunction and KeyedProcessFunction is the same, and CoProcessFunction itself is very simple. It can handle two upstream inflows of data in processElement1 and processElement2 respectively, and supports timer setting.
Coding actual combat
Next, let's develop an application to experience CoProcessFunction. The function is very simple, described as follows:
Set up two data sources, which come from local ports 9998 and 9999 respectively
Each port receives data like aaa,123 and converts it to Tuple2 instance. F0 is aaa,f1 and 123.
In the implementation class of CoProcessFunction, the data from each data source is logged and then passed to the downstream operator.
The downstream operation is printing, so all data received at ports 9998 and 9999 will be printed on the console.
The functions of the entire demo are shown in the figure below:
Next, code to achieve the above functions.
Source code download
If you don't want to write code, the source code for the entire series can be downloaded from GitHub. The address and link information are shown in the following table (https://github.com/zq2599/blog_demos):
Name Link Note Project Home Page https://github.com/zq2599/blog_demos the home page of the project on GitHub git warehouse address (https) https://github.com/zq2599/blog_demos.git the warehouse address of the project source code, https protocol git warehouse address (ssh) git@github.com:zq2599/blog_demos.git the warehouse address of the project source code, ssh protocol
There are multiple folders in this git project. The application of this chapter is under the flinkstudy folder, as shown in the red box below:
Map operator
Do a map operator to convert the string aaa,123 into a Tuple2 instance, f0 is aaa,f1 is 123
The operator name is WordCountMap.java:
Package com.bolingcavalry.coprocessfunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.StringUtils;public class WordCountMap implements MapFunction {@ Override public Tuple2 map (String s) throws Exception {if (StringUtils.isNullOrWhitespaceOnly (s)) {System.out.println ("invalid line"); return null } String [] array = s.split (","); if (null==array | | array.length getCoProcessFunctionInstance () / * listener according to the specified port, * the data obtained is first converted to Tuple2 instance through map, * timestamp the element, and then partition by f0 field. * convert the partitioned KeyedStream to * @ param port * @ return * / protected KeyedStream buildStreamFromSocket (StreamExecutionEnvironment env, int port) {return env / / listening port .socketTextStream ("localhost", port) / / the string "aaa,3" to Tuple2 instance, f0 = "aaa" F1room3.map (new WordCountMap ()) / / use the word as the key partition .keyby (0) } / * if the subclass has side output to deal with, please override this method, which will be called * / protected void doSideOutput (SingleOutputStreamOperator mainDataStream) {} / * the method to execute the business * @ throws Exception * / public void execute () throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () after the main process is completed. / / parallelism 1 env.setParallelism (1); / / input KeyedStream stream1 = buildStreamFromSocket (env, 9998) for port 9998; / / input KeyedStream stream2 = buildStreamFromSocket for port 9999 (env, 9999) SingleOutputStreamOperator mainDataStream = stream1 / / two streams are connected. Connect (stream2) / / executes the low-level processing function, and the specific processing logic implements .process (getCoProcessFunctionInstance ()) in the subclass; / / prints out all the elements output by the low-order processing function mainDataStream.print () / / output related logic. Overwrite this method doSideOutput (mainDataStream) when subclasses have side output requirements; / / execute env.execute ("ProcessFunction demo: CoProcessFunction");}}
One of the key points: there are two data sources, and the processing logic of each source is encapsulated in the buildStreamFromSocket method
The second key point: stream1.connect (stream2) connects two streams
The third key point: process receives the CoProcessFunction instance, and the processing logic of the merged stream is in it.
Point 4: getCoProcessFunctionInstance is an abstract method that returns an CoProcessFunction instance to the subclass for implementation, so what to do in CoProcessFunction is entirely determined by the subclass.
Key point 5: nothing is done in the doSideOutput method, but it is called at the end of the main flow code. If the subclass has the requirement of SideOutput, you can rewrite this method. The input parameter of this method is the processed data set, from which you can get the side output.
Subclasses determine the function of CoProcessFunction
The subclass CollectEveryOne.java is shown below, and the logic is simple, outputting the upstream data of each source directly to the downstream operator:
Package com.bolingcavalry.coprocessfunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.functions.co.CoProcessFunction;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class CollectEveryOne extends AbstractCoProcessFunctionExecutor {private static final Logger logger = LoggerFactory.getLogger (CollectEveryOne.class) @ Override protected CoProcessFunction getCoProcessFunctionInstance () {return new CoProcessFunction () {@ Override public void processElement1 (Tuple2 value, Context ctx, Collector out) {logger.info ("elements handling stream 1: {},", value); out.collect (value) } @ Override public void processElement2 (Tuple2 value, Context ctx, Collector out) {logger.info ("elements handling stream 2: {}", value); out.collect (value);}};} public static void main (String [] args) throws Exception {new CollectEveryOne (). Execute ();}
In the above code, the generic definition after CoProcessFunction is very long: a total of three Tuple2 represent the types of data source 1 input, data source 2 input, and downstream output
Verification
Open ports 9998 and 9999 of this machine, respectively. This is MacBook. Execute nc-l 9998 and nc-l 9999.
Start the Flink application. If you are a Mac computer like me, run the CollectEveryOne.main method directly (if it is a windows computer, I have not tried it, but it is also possible to deploy jar online)
Enter aaa,111 and bbb,222 in the console listening on ports 9998 and 9999, respectively.
The following is the output of the flink console. It can be seen that the log code of the processElement1 and processElement1 methods has been executed, and the print method, as the downstream, prints out the data of the two data sources, as expected:
12aaa,111 45INFO CollectEveryOne 38774 INFO CollectEveryOne-elements for handling stream 1: (aaa,111), (aaa,111) 12aaa,111 for handling elements of stream 2: (bbb,222) (bbb,222) after reading the above, do you have any further understanding of how to use CoProcessFunction in Flink? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.