In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
[TOC]
Real-time WordCount case
The main purpose is to monitor the data in the network port and calculate the wc in real time.
Java version
The test code is as follows:
Package cn.xpleaf.bigdata.spark.java.streaming.p1;import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream Import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.util.Arrays / * * the first application to develop SparkStreaming using Java * * is used to listen to a port in the network socket and obtain the corresponding text content in real time * calculate the number of occurrences of each word in the text content * / public class _ 01SparkStreamingNetWorkWCOps {public static void main (String [] args) {if (args = = null | | args.length)
< 2) { System.err.println("Parameter Errors! Usage: "); System.exit(-1); } Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setAppName(_01SparkStreamingNetWorkWCOps.class.getSimpleName()) /* * 设置为local是无法计算数据,但是能够接收数据 * 设置为local[2]是既可以计算数据,也可以接收数据 * 当master被设置为local的时候,只有一个线程,且只能被用来接收外部的数据,所以不能够进行计算,如此便不会做对应的输出 * 所以在使用的本地模式时,同时是监听网络socket数据,线程个数必须大于等于2 */ .setMaster("local[2]"); /** * 第二个参数:Duration是SparkStreaming用于进行采集多长时间段内的数据将其拆分成一个个batch * 该例表示每隔2秒采集一次数据,将数据打散成一个个batch(其实就是SparkCore中的一个个RDD) */ JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2)); String hostname = args[0].trim(); int port = Integer.valueOf(args[1].trim()); JavaReceiverInputDStream lineDStream = jsc.socketTextStream(hostname, port);// 默认的持久化级别StorageLevel.MEMORY_AND_DISK_SER_2 JavaDStream wordsDStream = lineDStream.flatMap(new FlatMapFunction() { @Override public Iterable call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream pairsDStream = wordsDStream.mapToPair(word ->{return new Tuple2 (word, 1);}); JavaPairDStream retDStream = pairsDStream.reduceByKey (new Function2 () {@ Override public Integer call (Integer v1, Integer v2) throws Exception {return v1 + v2;}}); retDStream.print (); / / start streaming jsc.start () / / wait for the end of the execution jsc.awaitTermination (); System.out.println ("is it over, ~"); jsc.close ();}}
Start the program while using the nc command on the host:
[uplooking@uplooking01 ~] $nc-lk 4893hello youe hello he hello me
The output is as follows:
-Time: 1525929096000 ms--- (youe,1) (hello,3) (me,1) (he,1)
You can also view the corresponding job execution on Spark UI:
As you can see, a calculation is performed every 2 seconds, that is, data is collected every 2 seconds, breaking the data into batch (which is actually the RDD in SparkCore).
Scala version
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream. {DStream, ReceiverInputDStream} import org.apache.spark.streaming. {Seconds, StreamingContext} object _ 01SparkStreamingNetWorkOps {def main (args: Array [String]): Unit = {if (args = = null | | args.length
< 2) { System.err.println( """Parameter Errors! Usage: |hostname: 监听的网络socket的主机名或ip地址 |port: 监听的网络socket的端口 """.stripMargin) System.exit(-1) } Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) val hostname = args(0).trim val port = args(1).trim.toInt val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port) val wordsDStream:DStream[String] = linesDStream.flatMap({case line =>Line.split ("")}) val pairsDStream:DStream [(String, Integer)] = wordsDStream.map ({case word = > (word, 1)}) val retDStream:DStream [(String, Integer)] = pairsDStream.reduceByKey {case (v1, v2) = > v1 + v2} retDStream.print () ssc.start () ssc.awaitTermination () ssc.stop () / stop, set to true, and close the SparkContext corresponding to the ssc. Defaults to false and only closes itself}}
Start the program while using the nc command on the host:
[uplooking@uplooking01 ~] $nc-lk 4893hello youe hello he hello me
The output is as follows:
-Time: 1525929574000 ms--- (youe,1) (hello,3) (me,1) (he,1) StreamingContext and DStream
How StreamingContext is created
1. There are two ways to create StreamingContext in Spark
1) create according to SparkConf
Val conf = new SparkConf (). SetAppName (appname) .setMaster (master); val ssc = new StreamingContext (conf, Seconds (10))
2) it can also be created according to SparkContext
Val sc = new SparkContext (conf) val ssc = new StreamingContext (sc, Seconds (10)
Appname is the name of the application used to display on the Spark UI. Master is the URL of a Spark, Mesos, or Yarn cluster, or local [*].
2. Batch interval:Seconds (10) can be set differently according to the situation of our own application.
Creation, startup and destruction of StreamingContext
1. After a StreamingContext definition, the following procedures must be executed for real-time calculation
1. Create an input DStream to create a data source with different inputs.
2. Define various operator operations such as transformation and output for DStream to define all kinds of real-time computing logic we need.
3. Call the start () method of StreamingContext to start our real-time data processing.
4. Call the awaitTermination () method of StreamingContext to wait for the termination of the application. You can use CTRL+C to stop manually, or just let it run continuously for calculation.
5. You can also stop the application by calling the stop () method of StreamingContext.
2. Remarks (very important)
1. As long as we have a StreamingContext started, we can no longer add any computing logic to this Application. For example, after executing the start () method, an operator is executed for some DStream, which is not allowed.
2. After a StreamingContext is stopped, it certainly cannot be restarted. After calling stop (), you can no longer call start ()
3. You must ensure that a JVM can only be started by one StreamingContext at a time. You cannot create two StreamingContext in your application.
4. When the stop () method is called, the internal SparkContext is stopped at the same time. If you don't want to do so, you want to continue to use SparkContext to create other types of Context, such as SQLContext, then use stop (false).
5. A SparkContext can create multiple StreamingContext, as long as the previous one is stopped with stop (false), and then the next one is created. (note the difference from point 2, here is the creation of another StreamingContext)
Enter DStream and Receiver
The input DStream represents the input data stream from the data source. We have done some examples before, such as reading from files, reading from TCP, reading from HDFS, and so on. Each DSteam binds a Receiver object, which is a key core component that is used to receive data from our various data sources and store it in Spark memory. We can specify the StorageLevel of this memory ourselves, which the teacher will explain in later examples.
Spark Streaming provides two built-in data source support:
1. Basic data sources: support for these data sources is directly provided in SSC API, such as files, tcp socket, Akka Actor, etc.
2. Advanced data sources: for example, data sources such as Kafka, Flume, Kinesis and Twitter, third-party JAR should be introduced to complete our work.
3. Custom data sources: for example, our custom data sources in any format, such as ZMQ, RabbitMQ, ActiveMQ, etc. With regard to custom data sources, the teacher will use this custom data source when explaining the last project if the data is read from ZMQ. The official Spark-ZMQ is based on the zmq2.0 version, because the current production environment is based on the above version of ZMQ4, so you must define and implement a custom receiver mechanism.
Analysis of the operating Mechanism of Spark Streaming local [*]
1. If we want to read N multiple data in parallel in our Spark Streaming application, we can start to create multiple DStream. In this way, multiple Receiver will be created, and one of the most common cases for teachers is to launch 128Receive, with more than 1000W of data per second for each receiver.
2. But it should be noted that the Executor process of our Spark Streaming Application is a long-running process, so it will monopolize its cpu core. So it has to deal with this matter on its own and can't do any other work.
3. When using local mode local to run our Spark Streaming program, we must not use local or local [1] mode. Because when Spark Streaming runs, there must be at least 2 threads. If only one is given, the Spark Streaming Application program will go straight to where the hang is. One of the two threads is assigned to the Receiver to receive data, and the other thread is used to process the received data. So if we want to do local testing, we must satisfy local [N], which must be greater than 2.
4. If we want to run on our Spark in the cluster, then first of all, we must require more than 1 cpu core on each node of our cluster. Second, the core assigned to each executor of the Spark Streaming must be > 1, so that the input DStream assigned to run on the executor can be guaranteed to run in parallel with two threads, one running Receiver to receive data, and one processing data. Otherwise, only the data will be received, not processed.
DStream and HDFS integrated input DFStream basic data source
Real-time computing based on HDFS files is actually monitoring one of our HDFS directories and processing them in real time whenever there are new files in it. It is equivalent to dealing with real-time file streams.
Spark Streaming monitors the specified HDFS directory and processes files that appear in the directory.
1) files in all directories in HDFS must meet the same format, otherwise it will not be easy to deal with. Files must be moved into a directory by moving or renaming. Once processed, the contents of the document will be changed in time and will no longer be processed.
2) HDFS-based data node source reads do not have receiver, so they do not take up a cpu core.
3) in fact, in the following test case, it has no effect, that is, the files in HDFS cannot be monitored, and the local files have no effect.
Real-time WordCounter case based on HDFS
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.streaming. {Seconds StreamingContext} / * SparkStreaming listens for changes in a directory in hdfs (new file) * / object _ 02SparkStreamingHDFSOps {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf () .setAppName (_ 02SparkStreamingHDFSOps.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf) Seconds (5) val linesDStream:DStream [String] = ssc.textFileStream ("hdfs://ns1/input/spark/streaming/") / / val linesDStream:DStream [String] = ssc.textFileStream ("D:/data/spark/streaming") linesDStream.flatMap (_ .split (")) .map ((_ 1). ReduceByKey (_ + _). Print () ssc.start () ssc.awaitTermination () ssc.stop ()} DStream and Kafka integration (based on Receiver) Spark and Kafka integration
1. Use the Receiver method of Kafka for integration
2. Use the Direct method of Kafka to integrate.
There are two ways for Spark-Streaming to obtain kafka data-Receiver and Direct, which can be simply understood from the code that Receiver is connected to the kafka queue through zookeeper, and Direct is directly connected to the node of kafka to obtain data.
Receiver Integration based on Kafka
This way you use Receiver to get the data. Receiver is implemented using Kafka's high-level Consumer API. The data that receiver gets from Kafka is stored in Spark Executor's memory, and then the job started by Spark Streaming will process that data. However, in the default configuration, this approach may lose data due to underlying failures. If you want to enable a highly reliable mechanism with zero data loss, you must enable Spark Streaming's pre-write logging mechanism (Write Ahead Log,WAL). This mechanism synchronously writes received Kafka data to pre-written logs on distributed file systems such as HDFS. Therefore, even if the underlying node fails, the data in the prewritten log can be used for recovery.
Supplementary note:
(1) the partition of topic in Kafka is not related to the partition of RDD in Spark. So, in KafkaUtils.createStream (), increasing the number of partition only increases the number of threads reading partition in one Receiver. Does not increase the parallelism of the data processed by Spark.
(2) you can create multiple Kafka input DStream and use different consumer group and topic to receive data through multiple receiver in parallel.
(3) if a fault-tolerant file system, such as HDFS, enables the pre-write log mechanism, a copy of the received data will be copied to the pre-write log. Therefore, in KafkaUtils.createStream (), the persistence level is set to StorageLevel.MEMORY_AND_DISK_SER.
Integration with Kafka-Maven org.apache.spark spark-streaming-kafka_2.10 1.6.2Kafka startup, verification and testing
Start the kafka service
Kafka-server-start.sh-daemon config/server.properties
Create topic
Kafka-topics.sh-create-topic spark-kafka-zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181-partitions 3-replication-factor 3
List the topic that has been created in kafka
Kafka-topics.sh-list-zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
List the topic and Partition protected by each node
Kafka-topics.sh-- describe-- zookeeper uplooking01:2181, uplooking02:2181, uplooking03:21821-- topic spark-kafka leader: responsible for reading and writing messages. Leader is randomly selected from all nodes. Replicas: lists all replica nodes, regardless of whether they are in service or not. Isr: a node that is in service.
Generate data
Kafka-console-producer.sh-topic spark-kafka-broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
Consumption data
Kafka-console-consumer.sh-topic spark-kafka-zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 case
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.ReceiverInputDStreamimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming. {Seconds StreamingContext} / * Kafka and SparkStreaming Receiver-based schema integration * / object _ 03SparkStreamingKafkaReceiverOps {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf () .setAppName (_ 03SparkStreamingKafkaReceiverOps.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf) Seconds (5)) / / ssc.checkpoint ("hdfs://ns1/checkpoint/streaming/kafka") / / checkpoint file is saved to hdfs ssc.checkpoint ("file:///D:/data/spark/streaming/checkpoint/streaming/kafka") / / checkpoint file is saved to the local file system / * using Kafka Receiver To create the input DStream, you need to use the Kafka provided by SparkStreaming to integrate API * KafkaUtils * / val zkQuorum = "uplooking01:2181,uplooking02:2181,uplooking03:2181" val groupId = "kafka-receiver-group-id" val topics:Map [String, Int] = Map ("spark-kafka"-> 3) / / the key in the current data is the key in kafka. Value is the corresponding valueval linesDStream:ReceiverInputDStream [(String, String)] = KafkaUtils.createStream (ssc, zkQuorum, groupId, topics) val retDStream = linesDStream.map (t = > t.split 2). FlatMap (_ .split (")). Map ((_, 1). ReduceByKey (_ + _) retDStream.print () ssc.start () ssc.awaitTermination () ssc.stop ()}}
Production data in kafka:
[uplooking@uplooking02 kafka] $kafka-console-producer.sh-- topic spark-kafka-- broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092hello you hello he hello me
The output is as follows:
-Time: 1525965130000 ms--- (hello,3) (me,1) (you,1) (he,1)
In the above code, Spark Streaming's pre-write logging mechanism (Write Ahead Log,WAL) is also enabled.
If the data is saved on the local file system, it is as follows:
If the data is saved in HDFS, it is as follows:
Integration of DStream and Kafka (based on Direct) characteristics of Spark and Kafka Integration of Direct
(1) the way of Direct is to directly manipulate the underlying metadata information of kafka, so that if the calculation fails, the data can be reread and re-processed. That is, the data must be processed. To pull data, RDD directly pulls data when it is executed.
(2) because the direct operation is kafka,kafka, it is equivalent to your underlying file system. At this time, strict transaction consistency can be guaranteed, that is, it will be processed, and will only be processed once. The Receiver approach is not guaranteed, because the data in Receiver and ZK may be out of sync, and Spark Streaming may repeatedly consume data. This tuning can be solved, but it is obviously not as convenient as Direct. While Direct api directly operates kafka, spark streaming is responsible for tracking the offset or offset of consumption data, and saves it to checkpoint, so its data must be synchronized and will not be repeated. Even if the restart will not be repeated, because the checkpoint, but the program upgrade, can not read the original checkpoint, in the face of the upgrade checkpoint invalid this problem, how to solve? When upgrading, just read the backup I specified, that is, it is possible to specify checkpoint manually, which once again perfectly ensures that there is a transactional, and only one-time transaction mechanism. So how to manually checkpoint? When building a SparkStreaming, with the api of getorCreate, it will get the contents of the checkpoint and specify where the checkpoint is.
(3) because the underlying layer is to read data directly, there is no so-called Receiver, it is directly a periodic (Batch Intervel) query kafka, when dealing with data, we will use Consumer api based on kafka to obtain data in a specific range (offset range) in kafka. At this time, one of the obvious performance benefits of Direct Api accessing kafka is that if you want to read multiple partition,Spark, you will also create RDD's partition, at this time RDD's partition and kafka's partition are the same. In Receiver's way, the two partition have nothing to do with each other. This advantage is your RDD, in fact, in essence, when reading kafka at the bottom, the partition of kafka is equivalent to a block on the original hdfs. This is consistent with the locality of the data. RDD and kafka data are all over here. So the place where the data is read, the place where the data is processed and the program that drives the data processing are all on the same machine, which can greatly improve performance. The deficiency is that because the patition of RDD and kafka is one-to-one, it will be troublesome to improve parallelism. Improving parallelism is still repartition, that is, repartitioning, because it is time-consuming to generate shuffle. For this problem, the new version may be free to configure the proportion in the future, not one-to-one. Because of the improvement of parallelism, we can make better use of the computing resources of the cluster, which is very meaningful.
(4) there is no need to turn on the wal mechanism. From the point of view of zero data loss, it greatly improves the efficiency and at least doubles the disk space. Getting data from kafka is definitely faster than getting data from hdfs, because the zero copy way.
Kafka Direct VS Receiver
From a high-level point of view, the previous and Kafka integration scenarios (reciever methods) use WAL to work as follows:
1) Kafka Receivers running on Spark workers/executors continuously reads data from Kafka, which uses the high-level consumer API in Kafka.
2) the received data is stored in memory in Spark workers/executors and is also written to WAL. Kafka Receivers will update the offset of Kafka in Zookeeper only if the received data is persisted into log.
3) the received data and WAL storage location information are stored reliably, and if a failure occurs during the period, the information is used to recover from the error and continue to process the data.
This method ensures that the data received from Kafka will not be lost. But in the case of failure, some data are likely to be processed more than once! This occurs when some received data is reliably stored in WAL, but the Kafka offset in Zookeeper has not been updated and the system fails. This leads to data inconsistency: Spark Streaming knows that the data has been received, but Kafka believes that the data has not been received, so when the system returns to normal, Kafka will send the data again. The reason for this inconsistency is that the two systems are unable to perform atomic operations on the preservation of the data information that has been received. To solve this problem, only one system is needed to maintain consistent views that have been sent or received, and the system needs to have all the control rights to recover from failure. Based on these considerations, the community decided to store all consumption offset information only in Spark Streaming and to use Kafka's low-level consumer API to recover data from anywhere.
In order to build this system, the newly introduced Direct API uses a completely different approach from Receivers and WALs. Instead of starting a Receivers to continuously receive data from the Kafka and write it to the WAL, it simply gives the offset position to be read for each batch interval. Finally, the Job for each batch is run, and the data corresponding to the offset is ready in the Kafka. The offset information is also reliably stored (checkpoint) and recovered from failure.
It is important to note that Spark Streaming can re-read and process those segments from Kafka after a failure. However, because the semantics are processed only once, the result of the final reprocessing is consistent with the result of no failed processing. Therefore, Direct API eliminates the need to use WAL and Receivers and ensures that each Kafka record is received only once and efficiently. This allows us to integrate Spark Streaming and Kafka very well. In general, these features make the flow processing pipeline highly fault-tolerant, efficient, and easy to use. Case
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.streaming.p1import kafka.serializer.StringDecoderimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream. {InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming. {Seconds StreamingContext} / * Kafka and SparkStreaming Direct-based schema integration * * use the Kafka-Direct approach in the company * / object _ 04SparkStreamingKafkaDirectOps {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf () .setAppName (_ 04SparkStreamingKafkaDirectOps.getClass.getSimpleName) .setMaster ("local [2") ] ") val ssc = new StreamingContext (conf Seconds (5)) / / ssc.checkpoint ("hdfs://ns1/checkpoint/streaming/kafka") / / checkpoint file can also be saved to hdfs For direct, however, val kafkaParams:Map [String, String] = Map ("metadata.broker.list"-> "uplooking01:9092,uplooking02:9092,uplooking03:9092") val topics:Set [String] = Set ("spark-kafka") val linesDStream:InputDStream [(String, String)] = KafkaUtils. / / parameters are: key type, value type, key × × ×, value × × createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topics) val retDStream = linesDStream.map (t = > t.split 2). FlatMap (_ .split (")). Map ((_, 1). ReduceByKey (_ + _) retDStream.print () ssc.start () ssc.stop ()}}
Production data:
[uplooking@uplooking02 kafka] $kafka-console-producer.sh-- topic spark-kafka-- broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092hello you hello he hello me
The output is as follows:
-Time: 1525966750000 ms--- (hello,3) (me,1) (you,1) (he,1) Custom Receiver
The test code is as follows:
Package cn.xpleaf.bigdata.spark.scala.streaming.p1import java.io. {BufferedReader, InputStreamReader} import java.net.Socketimport org.apache.log4j. {Level, Logger} import org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.dstream. {DStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver.Receiverimport org.apache.spark.streaming. {Seconds StreamingContext} / * SparkStreaming Custom Receiver * learn custom Receiver * * customization steps by simulating Network: * 1. Create a class to inherit a class or implement an interface * 2. Individual methods of replication startup * 3. Make a registration call * / object _ 05SparkStreamingCustomReceiverOps {def main (args: Array [String]): Unit = {if (args = = null | | args.length < 2) {System.err.println ("" Parameter Errors! | Usage: | hostname: the hostname or ip address of the listening network socket | port: the port of the listening network socket "" .stripMargin) System.exit (- 1)} Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) val conf = new SparkConf () .setAppName (_ 05SparkStreamingCustomReceiverOps.getClass.getSimpleName) .setMaster ("local [2]") val ssc = new StreamingContext (conf Seconds (5) val hostname = args (0). Trim val port = args (1). Trim.toInt val linesDStream:ReceiverInputDStream [String] = ssc.receiverStream [String] (new MyNetWorkReceiver (hostname, port)) val retDStream:DStream [(String, Int)] = linesDStream.flatMap (_ .split (")). Map ((_ ReduceByKey (_ + _) retDStream.print () ssc.start () ssc.awaitTermination () ssc.stop ()}} / * Custom receiver * / class MyNetWorkReceiver (storageLevel:StorageLevel) extends Receiver [String] (storageLevel) {private var hostname:String = _ private var port:Int = _ def this (hostname:String, port:Int) StorageLevel:StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) {this (storageLevel) this.hostname = hostname this.port = port} / * start and initialize receiver resources * / override def onStart (): Unit = {val thread = new Thread () {override def run (): Unit = {receive ( )}} thread.setDaemon (true) / / set to be the core of the background thread thread.start ()} / / read the data in the network socket api (): Unit = {val socket = new Socket (hostname) Port) val ins = socket.getInputStream () val br = new BufferedReader (new InputStreamReader (ins)) var line:String = null while ((line = br.readLine ())! = null) {store (line)} ins.close () socket.close ()} override def onStop (): Unit = {}}
Start nc and enter data:
[uplooking@uplooking01 ~] $nc-lk 4893hello you hello he hello me
The output is as follows:
(hello,3) (me,1) (you,1) (he,1)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.