Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of delta write Operation ACID transaction in spark

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Editor to share with you the spark delta write operation ACID transaction example analysis, I believe that most people do not know much about it, so share this article for your reference, I hope you will learn a lot after reading this article, let's go to know it!

# # Analysis

Go directly to WriteIntoDelta.run

Override def run (sparkSession: SparkSession): Seq [Row] = {deltaLog.withNewTransaction {txn = > val actions = write (txn, sparkSession) val operation = DeltaOperations.Write (mode, Option (partitionColumns), options.replaceWhere, options.userMetadata) txn.commit (actions, operation)} Seq.empty}

Let's look at the deltaLog.withNewTrancation method:

Def withNewTransaction [T] (thunk: OptimisticTransaction = > T): t = {try {update () val txn = new OptimisticTransaction (this) OptimisticTransaction.setActive (txn) thunk (txn)} finally {OptimisticTransaction.clearActive ()}

First, the update method calls updateInternal synchronously to update the snapshot of the current deltalog. The specific updateInternal is as follows:

Val segment = getLogSegmentForVersion (currentSnapshot.logSegment.checkpointVersion) if (segment.version = = currentSnapshot.version) {/ / Exit early if there is no new file lastUpdateTimestamp = clock.getTimeMillis () return currentSnapshot} logInfo (s "Loading version ${segment.version}" + segment.checkpointVersion.map (v = > s "starting from checkpoint version $v.")) Val newSnapshot = createSnapshot (segment, minFileRetentionTimestamp, segment.lastCommitTimestamp)... CurrentSnapshot.uncache () currentSnapshot = newSnapshot

First, get the latest snapshot through the getLogSegmentForVersion method, and then update it to memory

Set OptimisticTransaction and execute the current statement in the current transaction

Val actions = write (txn, sparkSession) val operation = DeltaOperations.Write (mode, Option (partitionColumns), options.replaceWhere, options.userMetadata) txn.commit (actions, operation)

Val atcions = write (txn, sparksession) We have written the file basic class FileFormat/FileCommitProtocol analysis in the spark delta write operation ACID transaction prequel, that is, Seq [AddAction] will be returned, and the actual data files have been stored in the file directory.

Val operation = DeltaOperations.Write (mode, Option (partitionColumns) records that this is a delta write Operation

Txn.commit (actions, operation) is the key to submitting the delta log:

Def commit (actions: Seq [Action], op: DeltaOperations.Operation): Long = recordDeltaOperation (deltaLog, "delta.commit") {commitStartNano = System.nanoTime () val version = try {/ / Try to commit at the next version. Var finalActions = prepareCommit (actions, op) / / Find the isolation level to use for this commit val noDataChanged = actions.collect {case f: FileAction = > f.dataChange}. Forall (_ = = false) val isolationLevelToUse = if (noDataChanged) {/ / If no data has changed (i.e. Its is only being rearranged), then SnapshotIsolation / / provides Serializable guarantee. Hence, allow reduced conflict detection by using / / SnapshotIsolation of what the table isolation level is. SnapshotIsolation} else {Serializable} val isBlindAppend = {val dependsOnFiles = readPredicates.nonEmpty | | readFiles.nonEmpty val onlyAddFiles = finalActions.collect {case f: FileAction = > f}. Forall (_ .isInstanceOf [AddFile]) onlyAddFiles & &! dependsOnFiles} if (spark.sessionState.conf.getConf (DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {commitInfo = CommitInfo (clock.getTimeMillis ()) Op.name, op.jsonEncodedValues, Map.empty, Some (readVersion). Filter (_ > = 0), None, Some (isBlindAppend), getOperationMetrics (op) GetUserMetadata (op) finalActions = commitInfo +: finalActions} / / Register post-commit hooks if any lazy val hasFileActions = finalActions.collect {case f: FileAction = > f}. NonEmpty if (DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.fromMetaData (metadata) & & hasFileActions) {registerPostCommitHook (GenerateSymlinkManifest)} val commitVersion = doCommit (snapshot.version + 1, finalActions, 0 IsolationLevelToUse) logInfo (s "Committed delta # $commitVersion to ${deltaLog.logPath}") postCommit (commitVersion, finalActions) commitVersion} catch {case e: DeltaConcurrentModificationException = > recordDeltaEvent (deltaLog, "delta.commit.conflict." + e.conflictType) throw e case NonFatal (e) = > recordDeltaEvent (deltaLog, "delta.commit.failure" Data = Map ("exception"-> Utils.exceptionString (e)) throw e} runPostCommitHooks (version, actions) version}

PrepareCommit is used to do some pre-submission checks and to add some actions

If it is the first time to submit, you have to add Protocol, such as {"protocol": {"minReaderVersion": 1, "minWriterVersion": 2}} if metadata has changed, you have to add newMetadata. For example, {"metaData": {"id": "2b2457e3-ce74-4897-abbd-04a94692304a", "format": {"provider": "parquet", "options": {}}, "schemaString": "{\" type\ ":\" struct\ ",\" fields\ ": [{\" name\ ":\" value\ ",\" type\ ":\" string\ ",\" nullable\ ": true,\" metadata\ ": {}]}", "partitionColumns": [], "configuration": {} "createdTime": 1609398723678}}

If spark.databricks.delta.commitInfo.enabled is configured (default is true), commitInfo information will also be added, such as {"commitInfo": {"timestamp": 1609400013646, "operation": "WRITE", "operationParameters": {"mode": "Append", "partitionBy": "[]"}, "readVersion": 0, "isBlindAppend": true, "operationMetrics": {"numFiles": "1", "numOutputBytes": "306"," numOutputRows ":" 0 "}, if Presto / Athena is compatible. The GenerateSymlinkManifest postCommitHook will also be registered and the call will be made after the commit is successful.

DoCommit (snapshot.version + 1, finalActions, 0, isolationLevelToUse) synchronously submit the final action to deltalog:

Protected def doCommit (attemptVersion: Long, actions: Seq [Action], attemptNumber: Int, isolationLevel: IsolationLevel): Long = deltaLog.lockInterruptibly {try {... DeltaLog.store.write (deltaFile (deltaLog.logPath, attemptVersion), actions.map (_ .json) .toIterator) val commitTime = System.nanoTime () val postCommitSnapshot = deltaLog.update () if (postCommitSnapshot.version)

< attemptVersion) { recordDeltaEvent(deltaLog, "delta.commit.inconsistentList", data = Map( "committedVersion" ->

AttemptVersion, "currentVersion"-> postCommitSnapshot.version) throw new IllegalStateException (s "The committed version is $attemptVersion" + s "but the current version is ${postCommitSnapshot.version}.")} / / Post stats var numAbsolutePaths = 0 var pathHolder: Path = null val distinctPartitions = new mutable.Hashset [Map [string] String]] val adds = actions.collect {case a: AddFile = > pathHolder = new Path (new URI (a.path)) if (pathHolder.isAbsolute) numAbsolutePaths + = 1 distinctPartitions + = a.partitionValues a}. AttemptVersion} catch {case e: java.nio.file.FileAlreadyExistsException = > checkAndRetry (attemptVersion, actions, attemptNumber, isolationLevel)}}

The deltaLog.store.write (deltaFile (deltaLog.logPath, attemptVersion), actions.map (_ .json) .toIterator) `method directly calls the write method of HDFSLogStore, and finally calls the writeInternal method, where attemptVersion is the current version+1. Let's take a look at the writeInternal method:

Private def writeInternal (path: Path, actions: Iterator [String], overwrite: Boolean): Unit = {val fc: FileContext = try {getFileContext (path)} catch {case e: IOException if e.getMessage.contains (noAbstractFileSystemExceptionMessage) = > val newException = DeltaErrors.incorrectLogStoreImplementationException (sparkConf, e) logError (newException.getMessage) NewException.getCause) throw newException} if (! overwrite & & fc.util.exists (path)) {/ / This is needed for the tests to throw error with local file system throw new FileAlreadyExistsException (path.toString)} val tempPath = createTempPath (path) var streamClosed = false / / This flag is to avoid double close var renameDone = false / / This flag is to save the delete operation in most of cases. Val stream = fc.create (tempPath, EnumSet.of (CREATE), CreateOpts.checksumParam (ChecksumOpt.createDisabled () try {actions.map (_ + "\ n"). Map (_ .getBytes (UTF_8)) .foreach (stream.write) stream.close () streamClosed = true try {val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE fc.rename (tempPath, path) RenameOpt) renameDone = true / / TODO: this is a workaround of HADOOP-16255-remove this when HADOOP-16255 is resolved tryRemoveCrcFile (fc, tempPath)} catch {case e: org.apache.hadoop.fs.FileAlreadyExistsException = > throw new FileAlreadyExistsException (path.toString)} finally {if (! streamClosed) {stream.close ()} if (! renameDone) {fc.delete (tempPath, false)} if the file exists An exception is thrown. Otherwise, write the log file, note here: if it is a local file, you need to synchronize the rename operation, because the rename operation of the local file will not report an exception even if the target file exists, and other file types do not need synchronization. If there is no exception in deltaLog.store.write, get the latest snaphost, record it, and return the incoming attemptVersion. If there is an exception, execute checkAndRetry (attemptVersion, actions, attemptNumber, isolationLevel) to retry. When you retry, we have to find the following Actions committed by other programs during the period from transaction commit to failure. If there is no conflict with the current actions, continue to commit, otherwise throw an exception.

PostCommit performs checkpoint operation. Every 10 submissions, Delta Lake automatically generates a Parquet format checkpoint file under the _ delta_log subdirectory, which facilitates fast replays and clears the expired deltalog. The default storage is 30 days (addfile and removefile are merged here, for example, add A files are merged first, and then remove A merges An and then there is no record after merging A):

Def checkpoint (): Unit = recordDeltaOperation (this, "delta.checkpoint") {val checkpointMetaData = checkpoint (snapshot) val json = JsonUtils.toJson (checkpointMetaData) store.write (LAST_CHECKPOINT, Iterator (json), overwrite = true) doLogCleanup ()}

If runPostCommitHooks registers postCommitHooks, it executes

At this point, the whole process of writing deltalog for delta is over, and the whole process is as follows:

Update gets the latest snapshot | vwrite () writes delta data | vcommit transaction commit-> prepareCommit is used to do some pre-commit checks and to add some actions | v doCommit actually writes to deltalog Will continue to retry until the conflict | v postCommit performs checkpoint operation, merging Addfile and RemoveFile to facilitate fast replays And clear the expired delta log | v runPostCommitHooks execute the corresponding hook if there is a hook | the above is all the contents of the article "sample Analysis of ACID transactions written by delta in spark". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report