Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement delta Custom sql by integrating spark 3.0.1 with delta 0.7.0

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces spark 3.0.1 integrated delta 0.7.0 how to achieve delta custom sql, the article is very detailed, has a certain reference value, interested friends must read it!

Custom DeltaDataSource

When we use delta, we have to specify a delta-specific format, as follows:

Val data = spark.range (5,10) data.write.format ("delta"). Mode ("overwrite"). Save ("/ tmp/delta-table") df.show ()

So how is this delta datasource integrated into spark? Let's analyze it:

Go directly to DataStreamWriter, as follows:

Val cls = DataSource.lookupDataSource (source, df.sparkSession.sessionState.conf) val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split (",") val useV1Source = disabledSources.contains (cls.getCanonicalName) | | / file source v2 does not support streaming yet. ClassOf [FileDataSourceV2] .isAssignableFrom (cls)

The DataSource.lookupDataSource method is the key point. As follows:

Def lookupDataSource (provider: String, conf: SQLConf): Class [_] = {val provider1 = backwardCompatibilityMap.getOrElse (provider) Provider) match {case name if name.equalsIgnoreCase ("orc") & & conf.getConf (SQLConf.ORC_IMPLEMENTATION) = = "native" = > native [OrcDataSourceV2] .getCanonicalName case name if name.equalsIgnoreCase ("orc") & & conf.getConf (SQLConf.ORC_IMPLEMENTATION) = = "hive" = > "org.apache.spark.sql.hive.orc.OrcFileFormat" case "com.databricks.spark.avro" if conf .replaceDatabricksSparkAvroEnabled = > "org.apache.spark.sql.avro.AvroFileFormat" case name = > name} val provider2 = s "$provider1.DefaultSource" val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load Loader)

The ServiceLoader.load method is used here, which should be the SPI of java. The specific details can be found on the Internet. We say that the key point is to find the ServiceLoader.LazyIterator section directly.

Private class LazyIterator implements Iterator {Class service; ClassLoader loader; Enumeration configs = null; Iterator pending = null; String nextName = null; private LazyIterator (Class service, ClassLoader loader) {this.service = service; this.loader = loader } private boolean hasNextService () {if (nextName! = null) {return true;} if (configs = = null) {try {String fullName = PREFIX + service.getName (); if (loader = = null) configs = ClassLoader.getSystemResources (fullName) Else configs = loader.getResources (fullName);} catch (IOException x) {fail (service, "Error locating configuration files", x);}}

The loader.getResources method is to find specific files under classpath. If there are multiple files, multiple files will be returned. For spark, the search is class DataSourceRegister, that is, META-INF/services/org.apache.spark.sql.sources.DataSourceRegister files. In fact, the implementation of datasource within spark is loaded in this way.

Let's take a look at the META-INF/services/org.apache.spark.sql.sources.DataSourceRegister file of delta as org.apache.spark.sql.delta.sources.DeltaDataSource, and notice that DeltaDatasource is developed on the basis of Datasource v1, so we know the realization of the premise of the combination of delta datasource and spark.

Analysis.

Let's start with delta's configurate sparksession, as follows:

Import org.apache.spark.sql.SparkSessionval spark = SparkSession .builder () .appName ("...") .master ("...") .config ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config ("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate ()

We can see that config ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") from spark configuration, we can see that the interpretation of the spark.sql.extensions is

A comma-separated list of classes that implement Function1 [SparkSessionExtensions, Unit] used to configure SparkSession extensions. The classes must have a no-args constructor. If multiple extensions are specified, they are applied in the specified order. For the case of rules and planner strategies, they are applied in the specified order. For the case of parsers, the last parser is used and each parser can delegate to its predecessor. For the case of function name conflicts, the last registered function name is used.

In a word, it is used to extend sparksession, and the logical plan of spark sql can be extended, and this function has been available since spark 2.2.0.

Take a look at the io.delta.sql.DeltaSparkSessionExtension class

Class DeltaSparkSessionExtension extends (SparkSessionExtensions = > Unit) {override def apply (extensions: SparkSessionExtensions): Unit = {extensions.injectParser {(session, parser) = > new DeltaSqlParser (parser)} extensions.injectResolutionRule {session = > new DeltaAnalysis (session) Session.sessionState.conf)} extensions.injectCheckRule {session = > new DeltaUnsupportedOperationsCheck (session)} extensions.injectPostHocResolutionRule {session = > new PreprocessTableUpdate (session.sessionState.conf)} extensions.injectPostHocResolutionRule {session = > new PreprocessTableMerge (session.sessionState.conf)} extensions.injectPostHocResolutionRule {session = > new PreprocessTableDelete (session.sessionState.conf)}

DeltaSqlParser class is delta's support for its own syntax, so how and what does it support? Let's take a look at the extensions.injectParser code

Private [this] val parserBuilders = mutable.Buffer.empty [ParserBuilder] private [sql] def buildParser (session: SparkSession, initial: ParserInterface): ParserInterface = {parserBuilders.foldLeft (initial) {(parser, builder) = > builder (session, parser)}} / * * Inject a custom parser into the [[SparkSession]] Note that the builder is passed a session * and an initial parser. The latter allows for a user to create a partial parser and to delegate * to the underlying parser for completeness. If a user injects more parsers, then the parsers * are stacked on top of each other. * / def injectParser (builder: ParserBuilder): Unit = {parserBuilders + = builder}

We see that the buildParser method initializes the method of the DeltaSqlParser we passed in, that is, the delegate variable of DeltaSqlParser is assigned to initial, and the buildParser method is called by BaseSessionStateBuilder:

/ * Parser that extracts expressions, plans, table identifiers etc. From SQL texts. * * Note: this depends on the `conf` field. * / protected lazy val sqlParser: ParserInterface = {extensions.buildParser (session, new SparkSqlParser (conf))}

So the argument to initial is SparkSqlParser, that is, SparkSqlParser becomes the DeltaSqlParser proxy. Let's take a look at the DeltaSqlParser method:

Override def parsePlan (sqlText: String): LogicalPlan = parse (sqlText) {parser = > builder.visit (parser.singleStatement ()) match {case plan: LogicalPlan = > plan case _ = > delegate.parsePlan (sqlText)}}

The syntax of antlr4 is involved here, that is, for the parsing of logical plans, if your own DeltaSqlParser can parse, it will be parsed, and if not, it will be delegated to SparkSqlParser for parsing, and parsing is the function of this kind of DeltaSqlAstBuilder:

Class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor [AnyRef] {/ * * Create a [[VacuumTableCommand]] logical plan. Example SQL: * {{* VACUUM ('/ path/to/dir' | delta.` / path/to/ dir`) [RETAIN number HOURS] [DRY RUN] *}} * / override def visitVacuumTable (ctx: VacuumTableContext): AnyRef = withOrigin (ctx) {VacuumTableCommand (Option (ctx.path) .map (string), Option (ctx.table) .map (visitTableIdentifier), Option (ctx.number). Map (_ .getText.toDouble) Ctx.RUN! = null)} override def visitDescribeDeltaDetail (ctx: DescribeDeltaDetailContext): LogicalPlan = withOrigin (ctx) {DescribeDeltaDetailCommand (Option (ctx.path) .map (string), Option (ctx.table) .map (visitTableIdentifier))} override def visitDescribeDeltaHistory (ctx: DescribeDeltaHistoryContext): LogicalPlan = withOrigin (ctx) {DescribeDeltaHistoryCommand (Option (ctx.path) .map (string), Option (ctx.table) .map (visitTableIdentifier)) Option (ctx.limit). Map (_ .getText.toInt)} override def visitGenerate (ctx: GenerateContext): LogicalPlan = withOrigin (ctx) {DeltaGenerateCommand (modeName = ctx.modeName.getText, tableId = visitTableIdentifier (ctx.table))} override def visitConvert (ctx: ConvertContext): LogicalPlan = withOrigin (ctx) {ConvertToDeltaCommand (visitTableIdentifier (ctx.table), Option (ctx.colTypeList) .map (colTypeList = > StructType (visitColTypeList (colTypeList) None)} override def visitSingleStatement (ctx: SingleStatementContext): LogicalPlan = withOrigin (ctx) {visit (ctx.statement) .asInstanceOf [LogicalPlan]} protected def visitTableIdentifier (ctx: QualifiedNameContext): TableIdentifier = withOrigin (ctx) {ctx.identifier.asScala match {case Seq (tbl) = > TableIdentifier (tbl.getText) case Seq (db, tbl) = > TableIdentifier (tbl.getText, Some (db.getText)) case _ = > throw new ParseException (s "Illegal table name ${ctx.getText}") Ctx)}} override def visitPassThrough (ctx: PassThroughContext): LogicalPlan = null}

So where did these methods such as visitVacuumTable,visitDescribeDeltaDetail come from? Let's take a look at DeltaSqlBase.g4:

SingleStatement: statement EOF; / / If you add keywords here that should not be reserved, add them to 'nonReserved' list.statement: VACUUM (path=STRING | table=qualifiedName) (RETAIN number HOURS)? (DRY RUN)? # vacuumTable | (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName) # describeDeltaDetail | GENERATE modeName=identifier FOR TABLE table=qualifiedName # generate | (DESC | DESCRIBE) HISTORY (path=STRING | table=qualifiedName) (LIMIT limit=INTEGER_VALUE)? # describeDeltaHistory | CONVERT TO DELTA table=qualifiedName (PARTITIONED BY'('colTypeList')')? # convert |. *? # passThrough

The antlr4 syntax involved here will not be available on the Internet. Notice that both spark and delta use the visit pattern.

Let's take a look at the operations provided on the official website of delta:

VacuumDescribe HistoryDescribe DetailGenerateConvert to DeltaConvert Delta table to a Parquet table

This can be matched, for example, the Vacuum operation corresponds to vacuumTable,Convert to Delta and corresponds to convert.

In fact, delta supports the extension of spark, and we can also extend spark in the way of delta to implement our own sql syntax.

The above is all the content of the article "how to implement delta Custom sql in spark 3.0.1 Integration delta 0.7.0". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 228

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report