In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
This class is divided into two parts:
I. Spark Streaming on Pulling from Flume actual combat
II. Spark Streaming on Pulling from Flume source code parsing
Let's briefly introduce the two modes of Flume: push mode (Flume push to Spark Streaming) and pull mode (Spark Streaming pull from Flume).
Use push mode: the understanding of push mode is that Flume is used as a cache to store data. Listen on the corresponding port and push the data if the service can be connected. (simple, low coupling), the disadvantage is that if the Spark Streaming program is not started, the Flume side will report an error and cause the Spark Streaming program to be too late to consume.
Use pull mode: pull mode is to define a sink,Spark Streaming to get data from channel and get data according to its own conditions, which has good stability.
Flume pull actual combat:
Step 1: install Flume, which is not explained in this lesson. (lesson 87: Flume push data to SparkStreaming case practice and insider source code decryption)
Step 2: configure Flume, first refer to the official website (http://spark.apache.org/docs/latest/streaming-flume-integration.html) to add dependencies or download 3 jar packages directly, and put them in the lib directory under the Flume installation directory
Spark-streaming-flume-sink_2.10-1.6.0.jar, scala-library-2.10.5.jar, commons-lang3-3.3.2.jar
Step 3: configure Flume environment parameters, modify flume-conf.properties, and make a copy from flume-conf.properties.template for modification
# Flume pull mode
Agent0.sources = source1
Agent0.channels = memoryChannel
Agent0.sinks = sink1
# configure Source1
Agent0.sources.source1.type = spooldir
Agent0.sources.source1.spoolDir = / home/hadoop/flume/tmp/TestDir
Agent0.sources.source1.channels = memoryChannel
Agent0.sources.source1.fileHeader = false
Agent0.sources.source1.interceptors = il
Agent0.sources.source1.interceptors.il.type = timestamp
# configure Sink1
Agent0.sinks.sink1.type = org.apache.spark.streaming.flume.sink.SparkSink
Agent0.sinks.sink1.hostname = SparkMaster
Agent0.sinks.sink1.port = 9999
Agent0.sinks.sink1.channel = memoryChannel
# configure channel
Agent0.channels.memoryChannel.type = file
Agent0.channels.memoryChannel.checkpointDir = / home/hadoop/flume/tmp/checkpoint
Agent0.channels.memoryChannel.dataDirs = / home/hadoop/flume/tmp/dataDir
Start the flume command:
Root@SparkMaster:~/flume/flume-1.6.0/bin#. / flume-ng agent-- conf.. / conf/-- conf-file.. / conf/flume-conf.properties-- name agent0-Dflume.root.logger=INFO,console
Or root@SparkMaster:~/flume/flume-1.6.0# flume-ng agent-conf. / conf/-conf-file. / conf/flume-conf.properties-name agent0-Dflume.root.logger=INFO,console
Step 4: write simple business code (Java version)
Package com.dt.spark.SparkApps.sparkstreaming
Import java.util.Arrays
Import org.apache.spark.SparkConf
Import org.apache.spark.api.java.function.FlatMapFunction
Import org.apache.spark.api.java.function.Function2
Import org.apache.spark.api.java.function.PairFunction
Import org.apache.spark.streaming.Durations
Import org.apache.spark.streaming.api.java.JavaDStream
Import org.apache.spark.streaming.api.java.JavaPairDStream
Import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
Import org.apache.spark.streaming.api.java.JavaStreamingContext
Import org.apache.spark.streaming.flume.FlumeUtils
Import org.apache.spark.streaming.flume.SparkFlumeEvent
Import scala.Tuple2
Public class SparkStreamingPullDataFromFlume {
Public static void main (String [] args) {
SparkConf conf = new SparkConf () .setMaster ("spark://SparkMaster:7077")
Conf.setAppName ("SparkStreamingPullDataFromFlume")
JavaStreamingContext jsc = new JavaStreamingContext (conf, Durations.seconds (30))
/ / obtain data
JavaReceiverInputDStream lines = FlumeUtils.createPollingStream (jsc, "SparkMaster", 9999)
/ / word segmentation
JavaDStream words = lines.flatMap (new FlatMapFunction () {
Public Iterable call (SparkFlumeEvent event) throws Exception {
String line = new String (event.event (). GetBody (). ToString ())
Return Arrays.asList (line.split ("))
}
});
/ / perform map operation and convert to (key,value) format
JavaPairDStream pairs = words.mapToPair (new PairFunction () {
Public Tuple2 call (String word) throws Exception {
Return new Tuple2 (word, 1)
}
});
/ / perform the reduceByKey action to merge the values of the same key
JavaPairDStream wordsCount = pairs.reduceByKey (new Function2 () {
Public Integer call (Integer v1, Integer v2) throws Exception {
Return v1 + v2
}
});
WordsCount.print ()
Jsc.start ()
Jsc.awaitTermination ()
Jsc.close ()
}
}
Package the program into a jar file and upload it to the Spark cluster
Step 5: start HDFS, Spark cluster and Flume
Start Flume:root@SparkMaster:~/flume/flume-1.6.0/bin#. / flume-ng agent-- conf.. / conf/-- conf-file.. / conf/flume-conf.properties-- name agent0-Dflume.root.logger=INFO,console
Step 6: upload the test file to the / home/hadoop/flume/tmp/TestDir directory to check the log changes of Flume
Step 7: run the program through the spark-submit command:
. / spark-submit-- class com.dt.spark.SparkApps.SparkStreamingPullDataFromFlume-- name SparkStreamingPullDataFromFlume / home/hadoop/spark/SparkStreamingPullDataFromFlume.jar
View the running results every 30 seconds
Part two: source code analysis
1. Create createPollingStream (FlumeUtils.scala)
Note: the default storage method is MEMORY_AND_DISK_SER_2
/ * *
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* This stream will use a batch size of 1000 events and run 5 threads to pull data.
* @ param hostname Address of the host on which the Spark Sink is running
* @ param port Port of the host at which the Spark Sink is listening
* @ param storageLevel Storage level to use for storing the received objects
, /
Def createPollingStream (
Ssc: StreamingContext
Hostname: String
Port: Int
StorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream [SparkFlumeEvent] = {
CreatePollingStream (ssc, Seq (new InetSocketAddress (hostname, port)), storageLevel)
}
2. Parameter configuration: default global parameters, private level configuration cannot be modified
Privateval DEFAULT_POLLING_PARALLELISM = 5
Privateval DEFAULT_POLLING_BATCH_SIZE = 1000
/ * *
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* This stream will use a batch size of 1000 events and run 5 threads to pull data.
* @ param addresses List of InetSocketAddresses representing the hosts to connect to.
* @ param storageLevel Storage level to use for storing the received objects
, /
Def createPollingStream (
Ssc: StreamingContext
Addresses: Seq [InetSocketAddress]
StorageLevel: StorageLevel
): ReceiverInputDStream [SparkFlumeEvent] = {
CreatePollingStream (ssc, addresses, storageLevel
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
}
3. Create a FlumePollingInputDstream object
/ * *
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
* This stream will poll the sink for data and will pull events as they are available.
* @ param addresses List of InetSocketAddresses representing the hosts to connect to.
* @ param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
* single RPC call
* @ param parallelism Number of concurrent requests this stream should send to the sink. Note
* that having a higher number of requests concurrently being pulled will
* result in this stream using more threads
* @ param storageLevel Storage level to use for storing the received objects
, /
Def createPollingStream (
Ssc: StreamingContext
Addresses: Seq [InetSocketAddress]
StorageLevel: StorageLevel
MaxBatchSize: Int
Parallelism: Int
): ReceiverInputDStream [SparkFlumeEvent] = {
New FlumePollingInputDStream [SparkFlumeEvent] (ssc, addresses, maxBatchSize
Parallelism, storageLevel)
}
4. Inherit from ReceiverInputDstream and override the getReciver method, call the FlumePollingReciver API
Private [streaming] class FlumePollingInputDStream [T: ClassTag] (
_ ssc: StreamingContext
Val addresses: Seq [InetSocketAddress]
Val maxBatchSize: Int
Val parallelism: Int
StorageLevel: StorageLevel
) extends ReceiverInputDStream [SparkFlumeEvent] (_ ssc) {
Override def getReceiver (): Receiver [SparkFlumeEvent] = {
New FlumePollingReceiver (addresses, maxBatchSize, parallelism, storageLevel)
}
}
5. ReceiverInputDstream builds a thread pool, which is set as a background thread, and uses lazy and factory methods to create threads and NioClientSocket (the way NioClientSocket uses NettyServer at the bottom)
Lazy val channelFactoryExecutor =
Executors.newCachedThreadPool (new ThreadFactoryBuilder (). SetDaemon (true).
SetNameFormat ("Flume Receiver Channel Thread -% d") .build ()
Lazy val channelFactory =
New NioClientSocketChannelFactory (channelFactoryExecutor, channelFactoryExecutor)
6. ReceiverExecutor is also a thread pool. Connections refers to the number of FlumeConnection entity handles linked to a distributed Flume cluster, and threads get entity handles to access data.
Lazy val receiverExecutor = Executors.newFixedThreadPool (parallelism
New ThreadFactoryBuilder () .setDaemon (true) .setNameFormat ("Flume Receiver Thread -% d") .build ()
Private lazy val connections = new LinkedBlockingQueue [FlumeConnection] ()
7. Create a NettyTransceiver at startup, and submit the FlumeBatchFetcher in a cycle based on the degree of parallelism (default 5)
Override def onStart (): Unit = {
/ / Create the connections to each Flume agent.
Addresses.foreach (host = > {
Val transceiver = new NettyTransceiver (host, channelFactory)
Val client = SpecificRequestor.getClient (classOf [SparkFlumeProtocol.Callback], transceiver)
Connections.add (new FlumeConnection (transceiver, client))
})
For (I)
BatchReceived = true
Seq = eventBatch.getSequenceNumber
Val events = toSparkFlumeEvents (eventBatch.getEvents)
If (store (events)) {
SendAck (client, seq)
} else {
SendNack (batchReceived, client, seq)
}
Case None = >
}
} catch {
9. The method of obtaining a batch of data
/ * *
* Gets a batch of events from the specified client. This method does not handle any exceptions
* which will be propogated to the caller.
* @ param client Client to get events from
* @ return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
, /
Private def getBatch (client: SparkFlumeProtocol.Callback): Option [EventBatch] = {
Val eventBatch = client.getEventBatch (receiver.getMaxBatchSize)
If (! SparkSinkUtils.isErrorBatch (eventBatch)) {
/ / No error, proceed with processing data
LogDebug (s "Received batch of ${eventBatch.getEvents.size} events with sequence" +
S "number: ${eventBatch.getSequenceNumber}")
Some (eventBatch)
} else {
LogWarning ("Did not receive events from Flume agent due to error on the Flume agent:" +
EventBatch.getErrorMsg)
None
}
}
Note:
Source: DT_ big data DreamWorks
For more private content, please follow the Wechat official account: DT_Spark
If you are interested in big data Spark, you can listen to the Spark permanent free open course offered by teacher Wang Jialin at 20:00 every evening, address YY room number: 68917580
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.