In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the relevant knowledge of "how to implement the WordCount program in java/scala". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
The program receives multiple lines of text separated by newline characters from port 9999 on one of the sockets of windows, and prints the word count every two seconds.
Socket data sending command
Window sends the command nc-l-p 9999
Linux sends the command nc-lk 9999
Java version:
Package com.unicom.ljs.spark220.study.streaming
Import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.function.*;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.Time;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2
Import java.util.Arrays;import java.util.Iterator
/ * * @ author: Created By lujisen * @ company ChinaUnicom Software JiNan * @ date: 2020-01-30 22:21 * @ version: v1.0 * @ description: com.unicom.ljs.spark220.study.streaming * / public class StreamingWordCount {public static void main (String [] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf () .setMaster ("local [*]") .setAppName ("StreamingWordCount") / * here JavaStreamingContext sparkCore-like SparkContext * with two parameters * the first parameter: SparkConf configuration * the second parameter: the interval between each collected data stream is processed as a batch * / JavaStreamingContext jsc=new JavaStreamingContext (sparkConf, Durations.seconds (2)) / * specify to receive data from socket data source * specify two parameters 1: hostname 2: Port * window send command nc-l-p 9999 * linux send command nc-lk 9999 /
JavaReceiverInputDStream sourceDStream = jsc.socketTextStream ("localhost", 9999)
/ * the next step is to process each batch, where the data flow of a batch every 2 seconds is split into word streams * / JavaDStream wordDStream = sourceDStream.flatMap (new FlatMapFunction () {@ Override public Iterator call (String line) throws Exception {return Arrays.asList (")). Iterator () }}); / * convert to hello 1 * world 1 * a 1 * b 1 format * / JavaPairDStream wordPairDStream = wordDStream.mapToPair (new PairFunction () {@ Override public Tuple2 call (String word) throws Exception {return new Tuple2 (word, 1)) }}); JavaPairDStream wordCountResult = wordPairDStream.reduceByKey (new Function2 () {@ Override public Integer call (Integer v1, Integer v2) throws Exception {return v1roomv2;}})
/ * print result * / wordCountResult.print ()
/ * jsc the start () function application must be called to start execution and receive data * / jsc.start (); jsc.awaitTermination (); / * stop * / jsc.stop ();}}
Scala version:
Package com.unicom.ljs.study.streaming
Import org.apache.spark.SparkConfimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Seconds
/ * * @ author: Created By lujisen * @ company ChinaUnicom Software JiNan * @ date: 2020-01-31 08:59 * @ version: v1.0 * @ description: com.unicom.ljs.study.streaming * / object StreamingWordCount {def main (args: Array [String]): Unit = {
/ * build SparkConf configuration * / val sparkConf = new SparkConf () .setMaster ("local [*]") .setAppName ("StreamingWordCountScala") val ssc = new StreamingContext (sparkConf,Seconds (2))
/ * specify socket data source * / val sourceDStream=ssc.socketTextStream ("localhost", 9999)
Val wordDStream=sourceDStream.flatMap (x = > x.split (""))
Val wordPairDStream=wordDStream.map (x = > (xprimel)) val wordCountResult=wordPairDStream.reduceByKey (_ + _)
/ * print result * / wordCountResult.print () / * start * / ssc.start () ssc.awaitTermination () / * stop * / ssc.stop ()}}
This is the end of the content of "how to implement the WordCount program in java/scala". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.