Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Lesson 85: SparkStreaming case practice and insider source code decryption based on HDFS

2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

One: preparation of Spark cluster development environment

Start HDFS, as shown in the following figure:

Check that the node starts normally through the web, as shown in the following figure:

two。 Start the Spark cluster as shown in the following figure:

Check that the cluster starts normally through the web, as shown in the following figure:

3. Start start-history-server.sh, as shown in the following figure:

Two: the SparkStreaming case of HDFS (code part)

Package com.dt.spark.SparkApps.sparkstreaming

Import org.apache.spark.SparkConf

Import org.apache.spark.SparkContext

Import org.apache.spark.api.java.function.FlatMapFunction

Import org.apache.spark.api.java.function.Function2

Import org.apache.spark.api.java.function.PairFunction

Import org.apache.spark.streaming.Durations

Import org.apache.spark.streaming.api.java.*

Import scala.Tuple2

Import java.util.Arrays

/ * *

* Created by Jonson on 2016-4-17.

, /

Public class SparkStreamingOnHDFS {

Public static void main (String [] args) {

/ * *

* step 1: configure SparkConf

* 1. At least two threads:

* because when the Spark Streaming application is running, at least one thread is used to receive data in a continuous loop

* and at least one thread is used to process the received data (otherwise there is no thread to process the data, and the memory and disk are overwhelmed over time)

* 2. For clusters, there must be more than one thread per Executor, and for applications that deal with Spark Streaming, each Executor is generally

* how many Core should be allocated? Based on our past experience, about five core is the best (an odd number of Core is the best).

, /

Final SparkConf conf = new SparkConf () .setMaster ("spark://Master:7077") .setAppName ("SparkOnStreamingOnHDFS")

/ * *

* step 2: create SparkStreamingContext, which is the starting point for all the functions of SparkStreaming applications and the core of program scheduling

* 1the construction of SparkStreamingContext SparkStreamingContext can be based on either the SparkConf parameter or the persistent SparkStreamingContext.

* to recover (typical scenario is that Driver crashes and restarts, because Spark Streaming runs continuously for 7 hours and 24 hours.

* all need to continue the previous state after the Driver restart, and the recovery of the state needs to be based on the previous checkpoint)

* 2, you can create several SparkStreamingContext objects in a SparkStreaming application, using the next SparkStreamingContext

* We need to close the previous running SparkStreamingContext object, so we get a big inspiration: SparkStreamingContext

* it's just an application on Spark core, but if the Spark Streaming framework box runs, it requires Spark engineers to write business logic.

, /

/ / JavaStreamingContext jsc = new JavaStreamingContext (conf, Durations.seconds (5)); / / Durations.seconds (5) setting every 5 seconds

Final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/Checkpoint_Data"

JavaStreamingContextFactory factory = new JavaStreamingContextFactory () {

@ Override

Public JavaStreamingContext create () {

Return createContext (checkpointDirectory,conf)

}

}

/ * *

* you can recover Driver from a failure, but you still need to develop a Driver process that runs on Cluster and submits the application.

* specify-- supervise

, /

JavaStreamingContext jsc = JavaStreamingContext.getOrCreate (checkpointDirectory, factory)

/ * *

* now is the directory where a file system is being monitored

* there are no Receiver,Spark Streaming applications here, just monitor the new content of each Batch in the directory at intervals (add new ones)

* generate the original RDD as the data source of the RDD

, /

/ / specify the directory to be monitored from HDFS

JavaDStream lines = jsc.textFileStream ("hdfs://Master:9000/library/SparkStreaming/Data")

/ * *

* step 4: next, just like for RDD programming, programming based on DStreaming!

* the reason is:

* DStreaming is a template (or class) generated by RDD.

* before the specific calculation of Spark Streaming, its essence is to translate the DStream operation of each batch into the operation of RDD!

* perform Transformation-level processing on the initial DStream, such as programming of higher-order functions such as Map,filter, to carry out specific data calculations.

* step 4.1: split the string of each line into a single word

, /

JavaDStream words = lines.flatMap (new FlatMapFunction () {

Public Iterable call (String line) throws Exception {

Return Arrays.asList (line.split ("))

}

});

/ * *

* step 4.2: perform Transformation-level processing on the initial JavaRDD, such as programming of higher-order functions such as map,filter, to perform specific data calculations

* on the basis of 4.1, the number of instances of each word is 1 on the basis of word splitting, that is, word = > (word,1)

, /

JavaPairDStream pairs = words.mapToPair (new PairFunction () {

Public Tuple2 call (String word) throws Exception {

Return new Tuple2 (word,1)

}

});

/ * *

* step 4.3: count the total number of times each word appears in the file on the basis of each word instance count

, /

JavaPairDStream wordscount = pairs.reduceByKey (new Function2 () {

Public Integer call (Integer v1, Integer v2) throws Exception {

Return v1 + v2

}

});

/ * *

* the print here will not directly trigger the execution of the Job, because everything is under the control of the Spark Streaming framework. For Spark, whether or not

* triggering a real Job run is based on the set Duration interval

* it is important to note that if a Spark Streaming application wants to perform a specific Job, it must have an output Stream operation on the DStream

* output Stream has many types of function triggers, such as print,saveAsTextFile,saveAsHadoopFiles, etc. Actually one of the most important methods is

* foraeachRDD, because the results of Spark Streaming processing are usually placed on Redis,DB,DashBoard, etc., and foreachRDD is mainly used to accomplish these.

* functional, and you can customize where the specific data is stored at will!

, /

Wordscount.print ()

/ * *

* the Spark Streaming execution engine, that is, Driver, starts running, and Driver starts in a new thread.

* of course, there is a message loop body inside it to receive messages from the application itself or Executor.

, /

Jsc.start ()

Jsc.awaitTermination ()

Jsc.close ()

}

/ * *

* build JavaStreamingContext in factory mode

, /

Private static JavaStreamingContext createContext (String checkpointDirectory,SparkConf conf) {

System.out.println ("Creating new context")

SparkConf = conf

JavaStreamingContext ssc = new JavaStreamingContext (sparkConf,Durations.seconds (5))

Ssc.checkpoint (checkpointDirectory)

Return ssc

}

}

The code is packaged and run in the cluster

Create a directory

two。 Script running

The script reads as follows:

At this point, Spark Streaming will execute every 5 seconds, constantly scanning to see if there are any new files in the directory.

3. Upload files to the Data directory in HDFS

4. Output result

Three: Spark Streaming on HDFS source code decryption

The create method of JavaStreamingContextFactory can create a JavaStreamingContext

And we overwrite this method in the specific implementation, and the internal is to call the createContext method to implement it. In the above actual combat case, we have implemented the createContext method.

/ * Factory interface for creating a new JavaStreamingContext

, /

Trait JavaStreamingContextFactory {

Def create (): JavaStreamingContext

}

3.checkpoint:

On the one hand: maintain fault tolerance

On the one hand, keep in shape.

Each batch has a checkpoint at the beginning and end

* * Sets the context to periodically checkpoint the DStream operations for master

* fault-tolerance. The graph will be checkpointed every batch interval.

* @ param directory HDFS-compatible directory where the checkpoint data will be reliably stored

, /

Def checkpoint (directory: String) {

Ssc.checkpoint (directory)

}

4.remember: the data will be cleaned up after a period of time in streaming, but remember can extend the life cycle of the data in the program and extend the RDD for a longer time.

Application scenarios:

Assuming that the data stream comes in, it sometimes takes a long time to perform ML or Graphx, but bacth clears RDD conditionally, so the data can be extended for a longer time through remember. / * *

5.

If checkpoint is set, getOrCreate () will re-initialize the StreamingContext from the checkpoint directory when the program is restarted.

/ * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.

* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be

* recreated from the checkpoint data. If the data does not exist, then the provided factory

* will be used to create a JavaStreamingContext.

*

* @ param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program

* @ param factory JavaStreamingContextFactory object to create a new JavaStreamingContext

* @ deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor.

, /

@ deprecated ("use getOrCreate without JavaStreamingContextFactor", "1.4.0")

Def getOrCreate (

CheckpointPath: String

Factory: JavaStreamingContextFactory

): JavaStreamingContext = {

Val ssc = StreamingContext.getOrCreate (checkpointPath, () = > {

Factory.create.ssc

})

New JavaStreamingContext (ssc)

}

Thinking about abnormal problems:

Why did you make a mistake?

Streaming will do checkpoint on a regular basis.

When he restarts the program, from the directory where he used to checkpoint, if no additional configuration is made, all the information will be placed in the directory of checkpoint (including the previous application information), so the next time he starts again, he will report an error and cannot initialize ShuffleDStream.

Summary:

Using Spark Streaming can handle various data source types, such as database, HDFS, server log log, network flow, which is more powerful than you can imagine, but most of the time you can't use it. The real reason is that you don't know about Spark and spark streaming itself.

Note:

Source: DT_ big data DreamWorks (IMF legendary Action Top Secret course)-IMF

For more private content, please follow the Wechat official account: DT_Spark

If you are interested in big data Spark, you can listen to the Spark permanent free open course offered by teacher Wang Jialin at 20:00 every evening, address YY room number: 68917580

Life is short,you need to Spark!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report