In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces how to build Apache Flink applications, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let the editor take you to understand.
1. Preparation of development environment
Flink can run on Linux, Max OS X, or Windows. In order to develop Flink applications, you need a Java 8.x and maven environment on the local machine.
If you have a Java 8 environment, running the following command outputs the following version information:
$java-versionjava version "1.8.0 to 65" Java (TM) SE Runtime Environment (build 1.8.0_65-b17) Java HotSpot (TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
If you have a maven environment, running the following command outputs the following version information:
In addition, we recommend using ItelliJ IDEA (the free version of the community is enough) as the development IDE of Flink applications. Eclipse is OK, but Eclipse has known problems with a mix of Scala and Java projects, so Eclipse is not recommended. In the next section, we will show you how to create a Flink project and import it into ItelliJ IDEA.
2. Write Flink program
Start IntelliJ IDEA, select "Import Project" (Import Project), and select pom.xml under the root directory of my-flink-project. Complete the project import according to the guide.
Create a SocketWindowWordCount.java file under src/main/java/myflink:
Package myflink;public class SocketWindowWordCount {public static void main (String [] args) throws Exception {}}
Now this program is still very basic, we will fill in the code step by step. Note that we will not write out the import statements below, because IDE will automatically add them. I will show the complete code at the end of this section. If you want to skip the following steps, you can paste the final complete code directly into the editor.
The first step in the Flink program is to create a StreamExecutionEnvironment. This is an entry class that can be used to set parameters and create data sources and submit tasks. So let's add it to the main function:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ()
Next we will create a data source that reads data from the socket of the local port number 9000:
DataStream text = env.socketTextStream ("localhost", 9000, "\ n")
This creates a DataStream of type string. DataStream is the core API for streaming in Flink, which defines many common operations (such as filtering, transformation, aggregation, window, association, etc.). In this example, we are interested in the number of times each word appears in a particular time window, such as a five-second window. To do this, we first parse the string data into words and times (represented by Tuple2). The first field is the word, and the second field is the number, and the initial value of the number is set to 1. We implemented a flatmap to do the parsing, because there may be multiple words in a row of data.
DataStream > wordCounts = text .flatMap (new FlatMapFunction > () {@ Override public void flatMap (String value, Collector > out) {for (String word: value.split ("\\ s")) {out.collect (Tuple2.of (word, 1)) })
Then we group the data flow according to the word field (that is, the index field 0). Here we can simply use the keyBy (int index) method to get a Tuple2 data stream with the word as key. Then we can specify the desired window on the stream and calculate the result based on the data in the window. In our example, we want to aggregate the number of words every 5 seconds, with each window counting from scratch:
DataStream > windowCounts = wordCounts .keyby (0) .timeWindow (Time.seconds (5)) .sum (1)
The .timeWindow () of the second call specifies the tumbling window (Tumble) that we want for 5 seconds. The third call specifies the sum aggregate function for each key, each window, which in our example is added by the number of fields (that is, the No. 1 index field). The resulting data stream will output the number of occurrences of each word every 5 seconds.
The last thing to do is to print the data flow to the console and start executing:
WindowCounts.print () .setParallelism (1); env.execute ("Socket Window WordCount")
The final env.execute call is required to start the actual Flink job. All operator operations (such as creating sources, aggregating, printing) are just graphs that build internal operator operations. Only when execute () is called will it be executed on the commit to the cluster or on the local computer.
Here is the complete code, some of which has been simplified (the code is also accessible on GitHub):
Package myflink;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector Public class SocketWindowWordCount {public static void main (String [] args) throws Exception {/ / create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / get input data by connecting to socket, which is connected to local port 9876. If port 9876 is already occupied, please change to another port DataStream text = env.socketTextStream ("localhost", 9876, "\ n"). / / parsing data, grouping according to word, and opening window Aggregate DataStream windowCounts = text .flatMap (new FlatMapFunction () {@ Override public void flatMap (String value) Collector out) {for (String word: value.split ("\\ s")) {out.collect (Tuple2.of (word, 1)) }) .keyby (0) .timeWindow (Time.seconds (5)) .sum (1) / / print the results to the console. Note that single-threaded printing is used here instead of multithreaded windowCounts.print (). SetParallelism (1); env.execute ("Socket Window WordCount");}} 3. Run the program
To run the sample program, first start netcat on the terminal to get the input stream:
Nc-lk 9000
If it is a Windows platform, you can install ncat through https://nmap.org/ncat/ and run:
Ncat-lk 9000
Then run the main method of SocketWindowWordCount directly.
You only need to enter words in the netcat console, and you can see the word frequency statistics of each word in the SocketWindowWordCount output console. If you want to see a count greater than 1, type the same word repeatedly in 5 seconds.
I have written a simple tcp_server.py to simulate the above manual nc operation. You can continuously observe the correctness of the statistical results of the flink window:
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.