In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
Editor to share with you how the Javalambda expression to achieve FlinkWordCount, I believe that most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to understand it!
Environmental preparation
Import Flink 1.9 pom dependencies
Org.apache.flink flink-java 1.9.0 org.apache.flink flink-streaming-java_2.11 1.9.0 org.apache.commons commons-lang3 3.7
Build a Flink stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ()
Custom source
Generate a line of text per second
DataStreamSource wordLineDS = env.addSource (new RichSourceFunction () {private boolean isCanal = false) Private String [] words = {"important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before", "downloading and using this product an faq is available here", "commercial license and support is available with a low cost java se subscription" "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"} @ Override public void run (SourceContext ctx) throws Exception {/ / send a line of text per second while (! isCanal) {int randomIndex = RandomUtils.nextInt (0, words.length); ctx.collect (words [words Index]); Thread.sleep (1000);} @ Override public void cancel () {isCanal = true;}})
Word calculation
/ / 3. Word count / / 3.1Segmentation of text lines into words SingleOutputStreamOperator wordsDS = wordLineDS.flatMap ((String line, Collector ctx)-> {/ / syncopated word Arrays.stream (line.split (")) .forEach (word-> {ctx.collect (word);});}) .returns (Types.STRING) / / 3.2.Transforming words into tuples SingleOutputStreamOperator tupleDS = wordsDS .map (word-> Tuple2.of (word, 1)) .returns (Types.TUPLE (Types.STRING, Types.INT)); / / 3.3.grouping words into groups KeyedStream keyedDS = tupleDS.keyBy (tuple-> tuple.f0) SingleOutputStreamOperator resultDS = keyedDS. TimeWindow (Time.seconds (3)). Reduce ((T1, T2)-> Tuple2.of (t1.f0, t1.f1 + t2.f1)); resultDS.print ()
Reference code
Public class WordCount {public static void main (String [] args) throws Exception {/ / 1. Build Flink streaming initialization environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / 2. Custom source-sends a line of text per second DataStreamSource wordLineDS = env.addSource (new RichSourceFunction () {private boolean isCanal = false) Private String [] words = {"important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before", "downloading and using this product an faq is available here", "commercial license and support is available with a low cost java se subscription" "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"} @ Override public void run (SourceContext ctx) throws Exception {/ / send a line of text per second while (! isCanal) {int randomIndex = RandomUtils.nextInt (0, words.length); ctx.collect (words [words Index]); Thread.sleep (1000);} @ Override public void cancel () {isCanal = true;}}); / / 3. Word count / / 3.1Segmentation of text lines into words SingleOutputStreamOperator wordsDS = wordLineDS.flatMap ((String line, Collector ctx)-> {/ / syncopated word Arrays.stream (line.split (")) .forEach (word-> {ctx.collect (word);});}) .returns (Types.STRING) / / 3.2.Transforming words into tuples SingleOutputStreamOperator tupleDS = wordsDS .map (word-> Tuple2.of (word, 1)) .returns (Types.TUPLE (Types.STRING, Types.INT)); / / 3.3.grouping words into groups KeyedStream keyedDS = tupleDS.keyBy (tuple-> tuple.f0) SingleOutputStreamOperator resultDS = keyedDS .timewindow (Time.seconds (3)) .reduce ((T1, T2)-> Tuple2.of (t1.f0, t1.f1 + t2.f1)); resultDS.print (); env.execute ("app");}}
Flink support for Java Lambda expressions
Flink supports Lambda expressions for all Java API operators. However, when Lambda expressions use Java generics, you need to declare type information.
Let's take a look at the above code:
SingleOutputStreamOperator wordsDS = wordLineDS.flatMap ((String line, Collector ctx)-> {/ / syncopated word Arrays.stream (line.split (")) .forEach (word-> {ctx.collect (word);});}) .returns (Types.STRING)
The reason for putting all the type information here is that Flink cannot correctly automatically infer the generics in Collector. Let's take a look at the source code of FlatMapFuntion
Public@FunctionalInterfacepublic interface FlatMapFunction extends Function, Serializable {/ * The core method of the FlatMapFunction. Takes an element from the input data set and transforms * it into zero, one, or more elements. * * @ param value The input value. * @ param out The collector for returning result values. * * @ throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. * / void flatMap (T value, Collector out) throws Exception;}
We found that the second parameter of flatMap is Collector, which is a generic type with parameters. When the Java compiler compiles the code, the parameter type is erased, so the Java compiler becomes:
Void flatMap (T value, Collector out)
In this case, Flink will not be able to automatically infer type information. If we do not provide type information explicitly, the following error will occur:
Org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
In this case, the specified type information must be displayed, or the output will treat the return value as an Object type, which will cause the Flink to not be serialized correctly.
Therefore, we need to specify the parameter type information of the Lambda expression explicitly and display the type information of the specified output through the returns method
Let's look at another piece of code:
SingleOutputStreamOperator tupleDS = wordsDS .map (word-> Tuple2.of (word, 1)) .returns Types.TUPLE (Types.STRING, Types.INT))
Why do you need to specify a type after map?
Because map returns the Tuple2 type here, and Tuple2 takes generic parameters, the generic parameter information will also be found at compilation time, resulting in Flink not being able to infer correctly.
These are all the contents of the article "how Javalambda expressions implement FlinkWordCount". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.