Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How spark delta reads data

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Editor to share with you how to read spark delta data, I believe that most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to understand it!

Analysis.

The construction of delta datasource for spark starts from DataSource.lookupDataSourceV2, and then flows to loadV1Source. Here, dataSource.createRelation is used to build the Relation for building datasource, and then go directly to the createRelation of deltaDataSource:

Override def createRelation (sqlContext: SQLContext, parameters: Map [String, String]): BaseRelation = {val maybePath = parameters.getOrElse ("path", {throw DeltaErrors.pathNotSpecifiedException}) / / Log any invalid options that are being passed in DeltaOptions.verifyOptions (CaseInsensitiveMap (parameters)) val timeTravelByParams = DeltaDataSource.getTimeTravelVersion (parameters) DeltaTableV2 (sqlContext.sparkSession, new Path (maybePath), timeTravelOpt = timeTravelByParams). ToBaseRelation}

DeltaOptions.verifyOptions verifies the parameters. Valid parameters are as follows:

Val validOptionKeys: Set [String] = Set (REPLACE_WHERE_OPTION, MERGE_SCHEMA_OPTION, EXCLUDE_REGEX_OPTION, OVERWRITE_SCHEMA_OPTION, USER_METADATA_OPTION, MAX_FILES_PER_TRIGGER_OPTION, IGNORE_FILE_DELETION_OPTION, IGNORE_CHANGES_OPTION, IGNORE_DELETES_OPTION, OPTIMIZE_WRITE_OPTION, DATA_CHANGE_OPTION, "queryName", "checkpointLocation", "path", "timestampAsOf" "versionAsOf")

DeltaDataSource.getTimeTravelVersion gets the specified version based on the specified timestampAsOf or versionAsOf

Directly call the toBaseRelation method of DeltaTableV2:

Def toBaseRelation: BaseRelation = {if (deltaLog.snapshot.version = =-1) {val id = catalogTable.map (ct = > DeltaTableIdentifier (table = Some (ct.identifier) .getOrElse (DeltaTableIdentifier (path = Some (path.toString)) throw DeltaErrors.notADeltaTableException (id)} val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters (path.toString, deltaLog.snapshot, partitionFilters) / / TODO (burak): We should pass in the snapshot here deltaLog.createRelation (partitionPredicates, timeTravelSpec)}

If there is a partition, DeltaDataSource.verifyAndCreatePartitionFilter creates a partitionPredicates

TimeTravelSpec, where the timeTravelByParams specified by the user is preferred, otherwise the version specified by path is selected through DeltaDataSource.parsePathIdentifier, such as / some/path/partition=1@v1234 or / some/path/partition=1@yyyyMMddHHmmssSSS

Call deltaLog.createRelation directly:

Def createRelation (partitionFilters: Seq [Expression] = Nil, timeTravel: Option [DeltaTimeTravelSpec] = None): BaseRelation = {val versionToUse = timeTravel.map {tt = > val (version, accessType) = DeltaTableUtils.resolveTimeTravelVersion (spark.sessionState.conf, this, tt) val source = tt.creationSource.getOrElse ("unknown") recordDeltaEvent (this, s "delta.timeTravel.$source", data = Map ("tableVersion"-> snapshot.version, "queriedVersion"-> version) "accessType"-> accessType)) version} / * * Used to link the files present in the table into the query planner. * / val snapshotToUse = versionToUse.map (getSnapshotAt (_)) .getOrElse (snapshot) val fileIndex = TahoeLogFileIndex (spark, this, dataPath, snapshotToUse.metadata.schema, partitionFilters, versionToUse) new HadoopFsRelation (fileIndex, partitionSchema = snapshotToUse.metadata.partitionSchema, dataSchema = snapshotToUse.metadata.schema, bucketSpec = None, snapshotToUse.fileFormat, snapshotToUse.metadata.format.options) (spark) with InsertableRelation {def insert (data: DataFrame Overwrite: Boolean): Unit = {val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append WriteIntoDelta (deltaLog = DeltaLog.this, mode = mode, new DeltaOptions (Map.empty [String, String], spark.sessionState.conf), partitionColumns = Seq.empty, configuration = Map.empty, data = data) .run (spark)} override def inputFiles: Array [String] = {getSnapshot (stalenessAcceptable = false) .filesForScan (projection = Nil) PartitionFilters) .files.map (f = > absolutePath (f.path) .toString) .toArray}

This method calls the filesForScan method of snapshot:

Def filesForScan (projection: Seq [Attribute], filters: Seq [Expression]): DeltaScan = {implicit val enc = SingleAction.addFileEncoderval partitionFilters = filters.flatMap {filter = > DeltaTableUtils.splitMetadataAndDataPredicates (filter, metadata.partitionColumns, spark). _ 1} val files = DeltaLog.filterFileList (metadata.partitionSchema, allFiles.toDF (), partitionFilters). As [AddFile]. DeltaScan (version = version, files, null, null, null) (null, null)}

. Obtain the corresponding snapshot through the specified version

. Build TahoeLogFileIndex, because what you build here is HadoopFsRelation, so we focus on the inputfiles method of TahoeLogFileIndex:

Through the analysis of the previous article, we until deltalog recorded the AddFile and Remove records, so how to read the data now? All in the allFiles method.

Let's focus on this: the allFiles method:

Def allFiles: Dataset [AddFile] = {val implicits = spark.implicits import implicits._ state.where ("add IS NOT NULL") .select ($"add" .as [AddFile])}

Here the state method is called, which in turn calls the stateReconstruction method

Private lazy val cachedState = cacheDS (stateReconstruction, s "Delta Table State # $version-$redactedPath") / * * The current set of actions in this [[Snapshot]]. * / def state: Dataset [SingleAction] = cachedState.getDS

The stateReconstruction method is used in checkpoint, as well as here, mainly to reconstruct the file state and merge AddFile and RemoveFile:

Private def stateReconstruction: Dataset [SingleAction] = {... LoadActions.mapPartitions {actions = > val hdpConf = hadoopConf.value.value actions.flatMap {_ .unwrap match {case add: AddFile = > Some (add.copy (path = canonicalizePath (add.path, hdpConf)) .wrap) case rm: RemoveFile = > Some (rm.copy (path = canonicalizePath (rm.path) HdpConf) .wrap) case other if other = = null = > None case other = > Some (other.wrap)}}. .mapPartitions {iter = > val state = new InMemoryLogReplay (time) state.append (0, iter.map (_ .unwrap) state.checkpoint.map (_ .wrap)}}

The key lies in the append method and checkpoint method of InMemoryLogReplay, where the file states are merged:

Assert (currentVersion = =-1 | | version = = currentVersion + 1, s "Attempted to replay version $version, but state is at $currentVersion") currentVersion = version actions.foreach {case a: SetTransaction = > transactions (a.appId) = a case a: Metadata = > currentMetaData = a case a: Protocol = > currentProtocolVersion = a case add: AddFile = > activeFiles (add.pathAsUri) = add.copy (dataChange = false) / / Remove the tombstone to make sure we only output one `FileAction. Tombstones.remove (add.pathAsUri) case remove: RemoveFile = > activeFiles.remove (remove.pathAsUri) tombstones (remove.pathAsUri) = remove.copy (dataChange = false) case ci: CommitInfo = > / / do nothing case null = > / / Some crazy future feature. Ignore}}

The focus is on the case add: AddFile and case remove: RemoveFile processing and the checkpoint method, which can merge file states very well.

Then call the collect method, return DeltaScan, and then get the file path as the file path to be processed.

Input TahoeLogFileIndex into HadoopFsRelation to get the final BaseRelation return

Note: the whole process of spark reading delta format is the same as that of spark reading other data formats. The main difference is that before reading the data, the file state will be merged in memory once, so you only need to read the file status of Addfile.

These are all the contents of the article "how to read data in spark delta". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report