Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Case Analysis of Executor Fault-tolerant Security

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "Executor fault-tolerant security case analysis". In daily operation, I believe many people have doubts about Executor fault-tolerant security case analysis. The editor consulted all kinds of data and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "Executor fault-tolerant security case analysis". Next, please follow the editor to study!

Sparkstreaming will continue to receive data, generate job, and submit job constantly. So one of the most important issues is data security. Because sparkstreaming is based on sparkcore, if we can ensure that the data is safe and reliable (sparkstreaming is based on RDD when producing job), even if there is an error or failure at runtime, it can be automatically recovered based on the fault tolerance of RDD. So ensure the security of the data.

The security fault tolerance for executor is mainly data security fault tolerance. The security and fault tolerance of Executor computing is based on spark core's RDD, so it is naturally safe.

One way to secure data is to store a copy, and the other is not to make a copy, but the data source supports replay (that is, data from the data source can be read repeatedly), and if there is a problem with the previously read data, you can re-read the data.

The way to make a copy can make a backup with the help of blockmanager. When Blockmanager stores data, there are many storagelevel,Receiver after receiving the data, and when storing it, specify the way that storagelevel is MEMORY_AND_DISK_SER_2. Blockmanager will first consider memory when storing early, and disk will only be considered when there is not enough memory. Generally, memory is sufficient. So there will be data on at least two executor, and if one executor dies, it will immediately switch to another executor.

There are two ways for ReceiverSupervisorImpl to store data, one is the way of WAL, whether it is WAL or not is modified by configuration. The default is false. If you use WAL, you must have a directory of checkpoint, because the data of WAL is placed under the directory of checkpoint.

Def enableReceiverLog (conf: SparkConf): Boolean = {

Conf.getBoolean (RECEIVER_WAL_ENABLE_CONF_KEY, false)

}

Storagelevel is passed in when you build inputDstream, and the default is MEMORY_AND_DISK_SER_2.

* @ param storageLevel Storage level to use for storing the received objects

* (default: StorageLevel.MEMORY_AND_DISK_SER_2)

, /

Def socketTextStream (

Hostname: String

Port: Int

StorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

): ReceiverInputDStream [String] = withNamedScope ("socket text stream") {

SocketStream [String] (hostname, port, SocketReceiver.bytesToLines, storageLevel)

}

Now let's take a look at the other way ReceiverSupervisorImpl stores data (copy mode). It is clearly stated in the comment that the received blocks is given to the blockmanager according to the specified storagelevel. That is, it is stored through blockmanager.

/ * *

* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which

* stores the received blocks into a block manager with the specified storage level.

, /

Private [streaming] class BlockManagerBasedBlockHandler (

BlockManager: BlockManager, storageLevel: StorageLevel)

Blockmanager is stored in a variety of different data types, ArrayBufferBlock,IteratorBlock,ByteBufferBlock.

Blockmanager storing data has been talked about earlier. After receiving the data, Receiver not only stores it on its own executor, but also stores it on another executor. If there is a problem with one executor, it will instantly switch to another executor.

The principle of WAL method: a log will be made in a specific directory. If there is a problem in the subsequent processing, it can be recovered based on the log, which is written under checkpoint. In a production environment, checkpoint is on HDFS, so there are three copies of the log.

Here are the classes that use WAL to store data, write a log and then give it to blockmanager for storage.

/ * *

* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which

* stores the received blocks in both, a write ahead log and a block manager.

, /

Private [streaming] class WriteAheadLogBasedBlockHandler (

If you use the WAL method, you don't need to have two copies when storing the data, which is a waste of memory. If the storagelevel.replication is greater than 1, the warning log will be printed.

Privateval effectiveStorageLevel = {

If (storageLevel.deserialized) {

LogWarning (s "Storage level serialization ${storageLevel.deserialized} is not supported when" +

S "write ahead log is enabled, change to serialization false")

}

If (storageLevel.replication > 1) {

LogWarning (s "Storage level replication ${storageLevel.replication} is unnecessary when" +

S "write ahead log is enabled, change to replication 1")

}

StorageLevel (storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)

}

Here, a thread pool of two threads is used to enable blockmanager to store data and write ahead log to execute concurrently.

/ / For processing futures used in parallel block storing into block manager and write ahead log

/ / # threads = 2, so that both writing to BM and WAL can proceed in parallel

Implicit privateval executionContext = ExecutionContext.fromExecutorService (

ThreadUtils.newDaemonFixedThreadPool (2, this.getClass.getSimpleName))

This is to write the log to WAL.

/ / Store the block in write ahead log

Val storeInWriteAheadLogFuture = Future {

WriteAheadLog.write (serializedBlock, clock.getTimeMillis ())

}

The person responsible for reading and writing WAL is WriteAheadLog, which is an abstract class that is responsible for writing, reading, and clearing data. After the data is written, a handle is returned for reading the data.

Take a look at the implementation of the specific written data. If it fails and the number of failures is less than the maximum number of failures, it will retry. A handle is indeed returned.

/ * *

* Write a byte buffer to the log file. This method synchronously writes the data in the

* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed

* to HDFS, and will be available for readers to read.

, /

Def write (byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {

Var fileSegment: FileBasedWriteAheadLogSegment = null

Var failures = 0

Var lastException: Exception = null

Var succeeded = false

While (! succeeded & & failures)

< maxFailures) { try { fileSegment = getLogWriter(time).write(byteBuffer) if (closeFileAfterWrite) { resetWriter() } succeeded = true } catch { case ex: Exception =>

LastException = ex

LogWarning ("Failed to write to write ahead log")

ResetWriter ()

Failures + = 1

}

}

If (fileSegment = = null) {

LogError (s "Failed to write to write ahead log after $failures failures")

Throw lastException

}

FileSegment

}

Here is the code to write the data to HDFS

/ * * Write the bytebuffer to the log file * /

Def write (data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {

AssertOpen ()

Data.rewind () / / Rewind to ensure all data in the buffer is retrieved

Val lengthToWrite = data.remaining ()

Val segment = new FileBasedWriteAheadLogSegment (path, nextOffset, lengthToWrite)

Stream.writeInt (lengthToWrite)

If (data.hasArray) {

Stream.write (data.array ())

} else {

/ / If the buffer is not backed by an array, we transfer using temp array

/ / Note that despite the extra array copy, this should be faster than byte-by-byte copy

While (data.hasRemaining) {

Val array = new Array [Byte] (data.remaining)

Data.get (array)

Stream.write (array)

}

}

Flush ()

NextOffset = stream.getPos ()

Segment

}

Either WAL or directly to blockmanager is in the form of a copy. There is also a data source that supports data storage, typically kafka. Kafka has become a data storage system, it is naturally fault-tolerant and data copy.

Kafka has ways of receiver and direct. Receiver is actually assigned to zookeper to manage matadata (offset offset). If data processing fails, kafka will re-read the data based on offset. Why can I reread? If the program crashes or the data is not processed, it will not send ack to zookeper. Zookeper believes that this data is not being consumed. In the actual production environment, more and more people use directAPI to operate kafka directly and manage offset by themselves. This ensures that there is one and only one fault-tolerant processing. DirectKafkaInputDstream, which will go to see the latest offset and put this content in the batch.

Get the latest offset and subtract the previous offset from the latest offset to determine which data to read, that is, the data in a batch.

@ tailrec

Protected final def latestLeaderOffsets (retries: Int): Map [TopicAndPartition, LeaderOffset] = {

Val o = kc.getLatestLeaderOffsets (currentOffsets.keySet)

/ / Either.fold would confuse @ tailrec, do it manually

If (o.isLeft) {

Val err = o.left.get.toString

If (retries

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report