In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The concept of transaction processing in Spark Streaming is different from that of relational database transactions, which focus on statement-level consistency, such as bank transfers. Spark Streaming transactions, on the other hand, focus on the consistency of a particular job execution. That is, how to ensure that Job does the following two things in the process of processing data:
No data loss
Do not reprocess data
The SparkStreaming program execution architecture is roughly as follows:
First, let's talk about the loss of data:
After Receiver receives the data, it first saves the data at the Executor level (according to the settings of StorageLevel), such as socketTextStream's Receiver. Keep 2 copies of data on memory and disk
Def socketTextStream (hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream [String] = withNamedScope ("socket text stream") {socketStream [String] (hostname, port, SocketReceiver.bytesToLines, storageLevel)}
If StorageLevel is set for memory-level storage only, then when the program crashes, even if you Checkpoint the Driver, then restart the program. This part of the data will also be lost. Because Driver's Checkpoint does not save the calculated data.
We assume that StorageLevel sets disk-level storage, and there is no complete guarantee that data will not be lost, because instead of receiving a piece of data to write to disk once, Receiver writes data in blocks. The metadata information of the data block is then sent to the metadata information of the number of Block recorded by the Checkpoint of the Driver,Driver. When the data block is halfway written, or before the metadata is sent to Driver, the Executor crashes and the data is lost.
Solution: to reduce the transmission of this situation, the WAL write mechanism can be introduced on the receiver side, because the frequency of WAL writes is much higher than that of data blocks. In this way, when the Executor is restored, the WAL log recovery data block can be read.
However, the performance of Receivers receiving data in Spark Streaming will be greatly damaged by WAL.
WAL can not completely solve the problem of data loss, just like Oracle, the log file is written to memory first, and then the data is written to disk according to certain trigger conditions. If the WAL log has not been written yet, the data will also be inconsistent at this time (the data has been received, but this part of the data has not been written to WAL can not be recovered. ).
Spark Streaming 1.3 provides Kafka Direct API to avoid the performance loss of WAL and implement Exactly Once, using Kafka as a file storage system! At this time, it has both the advantages of streaming and the advantages of file system, so Spark Streaming+Kafka has built a perfect streaming world! All Executors consumes data directly through Kafka API and manages Offset directly, so it does not repeat consumption data; transactions are implemented!
2. When Driver crashes, all the data being processed by Job, including the data that Receiver has received and not yet processed, will be lost.
Solution: Checkpoint Driver, where Checkpoint is not the same as RDD's Checkpoint.
Let's take a look at what attributes the Checkpoint contains:
Private [private] class Checkpoint (ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable {val master = ssc.sc.master val framework = ssc.sc.appNameval jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes (). ToArray val delaySeconds = MetadataCleaner.getDelaySeconds (ssc.conf) val sparkConfPairs = ssc.conf.getAll
Where graph is the instantiation of DStreamGraph, which contains InputDStream
Privateval inputStreams = new ArrayBuffer [InputDStream [_]] ()
Let's take DirectKafkaInputDStream as an example, which includes checkpointData
Protected [streaming] overrideval checkpointData = new DirectKafkaInputDStreamCheckpointData
It just contains:
Class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData (this) {def batchForTime: mutable.HashMap [Time, Array [(String, Int, Long, Long)]] = {data.asInstanceOf [mutable.HashMap [Time, Array [OffsetRange.OffsetRangeTuple]}
It is the time object that uniquely identifies each batch, and the Kafka offset information corresponding to each KafkaRDD.
So:
Checkpoint is very efficient. The storage of actual data is not involved. The general size is only a few tens of K, because only information such as the offset of Kafka is stored.
Checkpoint uses a serialization mechanism, especially the introduction of DStreamGraph, which contains functions such as ForeachRDD, etc., and functions in ForeachRDD should also be serialized. If you use the CheckPoint mechanism and your package has been changed, there may be some problems after recovery.
Second, there are two aspects involved in data duplication processing:
Data is read repeatedly: in the case of using Kafka, Receiver receives the data and saves it to the persistence engine such as HDFS, but there is no time for updateOffsets. After the Receiver crashes, restart will read the data again through the metadata in the ZooKeeper that manages the Kafka. But at this time, SparkStreaming thinks it is successful, but Kafka thinks it failed (because the offset has not been updated to ZooKeeper), which will lead to data consumption again.
Data output is rewritten multiple times
Why is this a problem, because Spark Streaming naturally does the following things based on Spark Core,Spark Core when calculating (for example, after data output, an error occurs in the subsequent program of the Task and an error occurs in the task, and Spark Core enters the following program):
Task retry; slow task guess (two identical tasks may be executed at the same time), Stage repeat; Job retry
Specific solutions:
Set the number of spark.task.maxFailures to 1
Set spark.speculation to off (because slow tasks are supposed to consume a lot of performance, turning off can significantly improve Spark Streaming processing performance)
If Spark Streaming on Kafka, you can set auto.offset.reset to "largest" if Job fails.
Exactly Once transactions must meet the following requirements:
Zero loss of Receiver data: there must be reliable data sources and reliable Receiver, and data security must be ensured through WAL.
The metadata of the entire application must be checkpoint
Finally, it is emphasized again that transform and foreachRDD can be used for logic control based on business logic code to achieve non-repetitive data consumption and output. These two ways are similar to the back door of Spark Streaming, and you can do any imaginary control operation!
Note:
1. DT big data DreamWorks Wechat official account DT_Spark
2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580
3. Sina Weibo: http://www.weibo.com/ilovepains
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.