Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of basic Class FileFormat/FileCommitProtocol in spark delta write Operation ACID transaction

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Editor to share with you the spark delta write operation ACID transactions in the basic class FileFormat/FileCommitProtocol example analysis, I believe that most people do not know much about it, so share this article for your reference, I hope you will learn a lot after reading this article, let's go to understand it!

Analysis.

Go directly to the topic FileFormatWriter.write, which is the entry where spark writes to the file:

Def write (sparkSession: SparkSession, plan: SparkPlan, fileFormat: FileFormat, committer: FileCommitProtocol, outputSpec: OutputSpec, hadoopConf: Configuration, partitionColumns: Seq [Attribute], bucketSpec: Option [BucketSpec], statsTrackers: Seq [WriteJobStatsTracker], options: Map [String, String]): Set [String] = {

Because delta is implemented based on parquet, fileformat chooses to analyze ParquetFileFormat, while for FileCommitProtocol, we analyze SQLHadoopMapReduceCommitProtocol

The implementation of this write method is relatively long, so let's focus on:

Committer.setupJob (job)

This does some preparatory work before job submission, such as setting jobId,taskId, which is used to set OutputCommitter,OutputCommitter.

Override def setupJob (jobContext: JobContext): Unit = {/ / Setup IDs val jobId = SparkHadoopWriterUtils.createJobID (new Date, 0) val taskId = new TaskID (jobId, TaskType.MAP, 0) val taskAttemptId = new TaskAttemptID (taskId, 0) / / Setup the configuration object jobContext.getConfiguration.set ("mapreduce.job.id", jobId.toString) jobContext.getConfiguration.set ("mapreduce.task.id", taskAttemptId.getTaskID.toString) jobContext.getConfiguration.set ("mapreduce.task.attempt.id" TaskAttemptId.toString) jobContext.getConfiguration.setBoolean ("mapreduce.task.ismap", true) jobContext.getConfiguration.setInt ("mapreduce.task.partition", 0) val taskAttemptContext = new TaskAttemptContextImpl (jobContext.getConfiguration, taskAttemptId) committer = setupCommitter (taskAttemptContext) committer.setupJob (jobContext)}

The OutputCommitter for ParquetFileFormat is ParquetOutputCommitter. Let's take a look at the method: format.getOutputCommitter (context), and ParquetOutputCommitter is:

@ Override public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException {if (committer = = null) {Path output = getOutputPath (context); committer = new ParquetOutputCommitter (output, context);} return committer;}

Finally, the constructor of the parent class is called:

Public FileOutputCommitter (Path outputPath, TaskAttemptContext context) throws IOException {this (outputPath, (JobContext) context); if (outputPath! = null) {workPath = getTaskAttemptPath (context, outputPath);}}

Note that the workPath (global variable) assignment here is $outputPath/_temporary, which will be used in the following newTaskTempFile method

The setupJob operation is then performed:

Public void setupJob (JobContext context) throws IOException {if (hasOutputPath ()) {Path jobAttemptPath = getJobAttemptPath (context); FileSystem fs = jobAttemptPath.getFileSystem (context.getConfiguration ()); if (! fs.mkdirs (jobAttemptPath)) {LOG.error ("Mkdirs failed to create" + jobAttemptPath);} else {LOG.warn ("OutputPath is null in setupJob ()")

The getJobAttemptPath refers to the $path/_temporary directory (where path is the file output directory), and establishes that directory

The next step is to submit the task:

SparkSession.sparkContext.runJob (rddWithNonEmptyPartitions, (taskContext: TaskContext, iter: Iterator [InternalRow]) = > {executeTask (description = description, jobIdInstant = jobIdInstant, sparkStageId = taskContext.stageId (), sparkPartitionId = taskContext.partitionId (), sparkAttemptNumber = taskContext.taskAttemptId (). ToInt & Integer.MAX_VALUE, committer, iterator = iter)} RddWithNonEmptyPartitions.partitions.indices, (index, res: WriteTaskResult) = > {committer.onTaskCommit (res.commitMsg) ret (index) = res})

Let's focus on the executeTask method:

Committer.setupTask (taskAttemptContext) val dataWriter = if (sparkPartitionId! = 0 & &! iterator.hasNext) {/ / In case of empty job, leave first partition to save meta for file format like parquet. New EmptyDirectoryDataWriter (description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty & & description.bucketIdExpression.isEmpty) {new SingleDirectoryDataWriter (description, taskAttemptContext, committer)} else {new DynamicPartitionDataWriter (description, taskAttemptContext, committer)}

For SQLHadoopMapReduceCommitProtocol:setupTask, the implementation is as follows:

Committer = setupCommitter (taskContext) committer.setupTask (taskContext) addedAbsPathFiles = mutable.Map [String, String] () partitionPaths = mutable.Set [String] ()

While committer.setupTask (taskContext) corresponds to an empty implementation of ParquetOutputCommitter.

After that, take a look at the final executor dataWriter of the data write. If there is no partition, it is SingleDirectoryDataWriter:

Class SingleDirectoryDataWriter (description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol) extends FileFormatDataWriter (description, taskAttemptContext, committer) {private var fileCounter: Int = _ private var recordsInFile: Long = _ / / Initialize currentWriter and statsTrackers newOutputWriter () private def newOutputWriter (): Unit = {recordsInFile = 0 releaseResources () val ext = description.outputWriterFactory.getFileExtension (taskAttemptContext) val currentPath = committer.newTaskTempFile (taskAttemptContext, None) F "- c$fileCounterd" + ext) currentWriter = description.outputWriterFactory.newInstance (path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) statsTrackers.foreach (_ .newFile (currentPath))} override def write (record: InternalRow): Unit = {if (description.maxRecordsPerFile > 0 & & recordsInFile > = description.maxRecordsPerFile) {fileCounter + = 1 assert (fileCounter)

< MAX_FILE_COUNTER, s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") newOutputWriter() } currentWriter.write(record) statsTrackers.foreach(_.newRow(record)) recordsInFile += 1 }} 这里写文件是哪里呢? val currentPath = committer.newTaskTempFile( taskAttemptContext, None, f"-c$fileCounterd" + ext) 对应到HadoopMapReduceCommitProtocol到newTaskTempFile方法为: override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { val filename = getFilename(taskContext, ext) val stagingDir: Path = committer match { case _ if dynamicPartitionOverwrite =>

Assert (dir.isDefined, "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") PartitionPaths + = dir.get this.stagingDir / / For FileOutputCommitter it has its own staging path called "work path". Case f: FileOutputCommitter = > new Path (Option (f.getWorkPath). Map (_ .toString) .getOrElse (path) case _ = > new Path (path)} dir.map {d = > new Path (new Path (stagingDir, d), filename). ToString}. GetOrElse {new Path (stagingDir, filename). ToString}}

If partitionOverwriteMode is enabled, it is set to new Path (path, ".spark-staging-" + jobId) if partitionOverwriteMode is not enabled, and the subclass of FileOutputCommitter, if workpath exists, is set to workPath, otherwise it is path. Note that workPath has been set in our previous FileOutputCommitter constructor, so the final output directory is $path/_temporary.

So job writes data to the directory. For the analysis of DynamicPartitionDataWriter, readers can do similar analysis, but the directory adds partition information and is only written into their own partition directory.

If the write is successful, execute as follows:

Try {Utils.tryWithSafeFinallyAndFailureCallbacks (block = {/ / Execute the task to write rows out and commit the task. While (iterator.hasNext) {dataWriter.write (iterator.next ())} dataWriter.commit ()}) (catchBlock = {/ / If there is an error, abort the task dataWriter.abort () logError (s "Job $jobId aborted.")}, finallyBlock = {dataWriter.close ()})

DataWriter.commit () is as follows:

Override def commit (): WriteTaskResult = {releaseResources () val summary = ExecutedWriteSummary (updatedPartitions = updatedPartitions.toSet, stats = statsTrackers.map (_. GetFinalStats ()) WriteTaskResult (committer.commitTask (taskAttemptContext), summary)}

Resources are first released, that is, FileCommitProtocol.commitTask () is called after closing writer.

Override def commitTask (taskContext: TaskAttemptContext): TaskCommitMessage = {val attemptId = taskContext.getTaskAttemptID logTrace (s "Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask (committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) new TaskCommitMessage (addedAbsPathFiles.toMap-> partitionPaths.toSet)}

And SparkHadoopMapRedUtil.commitTask finally calls the commitTask method of FileOutputCommitter to mv the file under $PATH/_temporary to $PATH.

Then the statistical value is returned. The data format is as follows:

Case class BasicWriteTaskStats (numPartitions: Int, numFiles: Int, numBytes: Long, numRows: Long) extends WriteTaskStats

Then the committer.onTaskCommit (res.commitMsg) operation will be performed.

The implementation of SQLHadoopMapReduceCommitProtocol is: logDebug (s "onTaskCommit ($taskCommit)")

Next step committer.commitJob (job, commitMsgs):

... Committer.commitJob (jobContext)... For ((src, dst) Seq.empty)} statsTrackers.zip (statsPerTracker). Foreach {case (statsTracker, stats) = > statsTracker.processStats (stats)}}

The main purpose is to pass the metrics of job to driver through statsTrackers, while the current statsTracker implementation class is BasicWriteJobStatsTracker, which means that it will eventually be transmitted in the form of events through listenerbus, as shown in the following code:

Class BasicWriteJobStatsTracker (serializableHadoopConf: SerializableConfiguration, @ transient val metrics: Map [String, SQLMetric]) extends WriteJobStatsTracker {... Override def processStats (stats: Seq [WriteTaskStats]): Unit = {... Metrics (BasicWriteJobStatsTracker.NUM_FILES_KEY) .add (numFiles) metrics (BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY) .add (totalNumBytes) metrics (BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY). Add (totalNumOutput) metrics (BasicWriteJobStatsTracker.NUM_PARTS_KEY) .add (numPartitions) val executionId = sparkContext.getLocalProperty (SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates (sparkContext, executionId, metrics.values.toList)}}

So far, we have gone through all the data flow of the whole spark parquet write file, and some of the details have not been shown. The final data flow is as follows:

Instantiate Job object | vFileCommitProtocol.setupJob-> OutputCommitter.setupJob to prepare before the job runs For example, set up a temporary directory _ temporary, etc. | vexecuteTask ()-> FileCommitProtocol.setupTask-> OutputCommitter.setupTask is currently empty | v FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath sets up a temporary directory for writing tasks | v dataWriter.write () | v dataWriter. Commit () releases resources and returns metric information written to the file-> HadoopMapReduceCommitProtocol.commitTask | v SparkHadoopMapRedUtil.commitTask completes the mv $PATH/_temporary file to the $PATH directory And do outputCommitCoordination | v return information that requires additional temporary directories | vFileCommitProtocol.onTaskCommit | vFileCommitProtocol.commitJob-> OutputCommitter.commitJob cleans up the $PATH/_temporary directory and writes the files in the extra temporary directory mv to the final path directory | vprocessStats, processing the file metrics written

So how does spark write delta data corresponding to delta? In fact, the process is exactly the same as the above process, except that the implementation of the FileCommitProtocol class goes directly to TransactionalWrite.writeFiles:

Def writeFiles (data: Dataset [_], writeOptions: Option [DeltaOptions], isOptimize: Boolean): Seq [AddFile] = {hasWritten = true... Val committer = getCommitter (outputPath)... FileFormatWriter.write (sparkSession = spark, plan = physicalPlan, fileFormat = snapshot.fileFormat, / / TODO doesn't support changing formats. Committer = committer, outputSpec = outputSpec, hadoopConf = spark.sessionState.newHadoopConfWithOptions (metadata.configuration), partitionColumns = partitioningColumns, bucketSpec = None, statsTrackers = statsTrackers, options = Map.empty)} committer.addedStatuses}

The commiter here is DelayedCommitProtocol, as follows:

New DelayedCommitProtocol ("delta", outputPath.toString, None)

Let's look at the DelayedCommitProtocol method:

Override def newTaskTempFile (taskContext: TaskAttemptContext, dir: Option [String], ext: String): String = {val filename = getFileName (taskContext, ext) val partitionValues = dir.map (parsePartitions) .getOrElse (Map.empty [String, String]) val relativePath = randomPrefixLength.map {prefixLength = > getRandomPrefix (prefixLength) / / Generate a random prefix as a first choice}. OrElse {dir / or else write into the partition directory if it is partitioned}. Map {subDir = > new Path (subDir) Filename)} .getOrElse (new Path (filename)) / / or directly write out to the output path addedFiles.append ((partitionValues, relativePath.toUri.toString)) new Path (path, relativePath). ToString} override def commitTask (taskContext: TaskAttemptContext): TaskCommitMessage = {if (addedFiles.nonEmpty) {val fs = new Path (path, addedFiles.head._2) .getFileSystem (taskContext.getConfiguration) val statuses: Seq [AddFile] = addedFiles.map {f = > val filePath = new Path (path) New Path (new URI (f.room2)) val stat = fs.getFileStatus (filePath) AddFile (f.room2, f.room1, stat.getLen, stat.getModificationTime, true)} new TaskCommitMessage (statuses)} else {new TaskCommitMessage (Nil)}} override def commitJob (jobContext: JobContext, taskCommits: Seq [TaskCommitMessage]): Unit = {val fileStatuses = taskCommits.flatMap (_ .obj.asInstanceOf [SeqAddFile]). ToArray addedStatuses + + = fileStatuses}

There is an extra UUID.randomUUID.toString in the file generated by newTaskTempFile, which can reduce the file conflict.

NewTaskTempFile currently returns the output directory directly instead of the _ temporary directory

CommitTask is just recording additional files.

CommitJob didn't actually commit the job, just saved the AddFile in memory.

Later, we will analyze how delta handles AddFile to make it transactional.

Note that the task output file directory is: ${output.dir.root} / _ temporary/$ {appAttempt} / _ temporary/$ {taskAttempt} / ${fileName} such as: / data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet is all the contents of the article "sample Analysis of the basic class FileFormat/FileCommitProtocol in spark delta write operations ACID transactions". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report