In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article is to share with you about how to stream SocketWordCount in Flink1.8. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
Overview:
This mainly demonstrates the stream window version of the "WordCount" program in the flink source code example.
This program connects to the socket server and reads strings from socket. The easiest way to try is to open a text server (on port 9999) and use the netcat tool
I'll post it here, too:
/ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "ASIS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. , /
Package com.hadoop.ljs.flink.streaming
Import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector
/ * Implements a streaming windowed version of the "WordCount" program. * *
This program connects to a server socket and reads strings from the socket. * The easiest way to try this out is to open a text server (at port 12345) * using the netcat tool via * * nc-l 12345 on Linux or nc-l-p 12345 on Windows * * and run this example with the hostname and the port as arguments. * / @ SuppressWarnings ("serial") public class SocketWordCount {
Public static void main (String [] args) throws Exception {
/ / the host and the port to connect to final String hostname; final int port; try {final ParameterTool params = ParameterTool.fromArgs (args); hostname = params.has ("hostname")? Params.get ("hostname"): "localhost"; port = params.getInt ("port")
/ * hostname = "10.124.165.98"; port = 9999potential /} catch (Exception e) {System.err.println ("No port specified. Please run 'SocketWordCount "+"-hostname-- port', where hostname (localhost by default) "+" and port is the address of the text server "); System.err.println (" To start a simple text server, run 'netcat-l'and "+" type the input text into the command line "); return;}
/ / get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ()
/ / get input data by connecting to the socket DataStream text = env.socketTextStream (hostname, port, "\ n")
/ / parse the data, group it, window it, and aggregate the counts DataStream windowCounts = text
.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector out) {for (String word: value.split ("\\ s")) {out.collect (new WordWithCount (word, 1L));})
.keyby ("word") .timeWindow (Time.seconds (5))
.reduce (new ReduceFunction () {@ Override public WordWithCount reduce (WordWithCount a, WordWithCount b) {return new WordWithCount (a.word, a.count + b.count);}})
/ / print the results with a single thread, rather than in parallel windowCounts.print () .setParallelism (1)
Env.execute ("Socket Window WordCount");}
/ /-
/ * Data type for words with count. * / public static class WordWithCount {
Public String word; public long count
Public WordWithCount () {}
Public WordWithCount (String word, long count) {this.word = word; this.count = count;}
@ Override public String toString () {return word + ":" + count;}
Type out the jar package through maven package: flink191-1.0-SNAPSHOT-jar-with-dependencies
Submit it directly to a session that flink has started in yarn, and upload jar from the flink interface:
After uploading, select the check box in front of jar to enter the relevant parameters directly:
Parameter format:-- Parameter name parameter value-parameter name 2 parameter value 2
The parameter is obtained through the utility class in line 49 above (fixed format):
ParameterTool params = ParameterTool.fromArgs (args)
Finally, click the "Submit" button to submit the task to run.
The interface can also view logs and output:
The above is how to stream SocketWordCount in Flink1.8. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.