In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Editor to share with you spark 3.0.1 integrated delta 0.7.0 delta how to operate DDL, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!
Analysis.
Delta in 0.7.0 before the save table operation can only be stored in the file, that is to say, his metadata and other spark metadata is separate, delta is independent, and other tables can not be associated with the operation, only to the delta 0.7.0 version, the real sense of the integration with spark, which is also due to the Catalog plugin API features of spark 3.x.
Let's start with delta's configurate sparksession, as follows:
Import org.apache.spark.sql.SparkSessionval spark = SparkSession .builder () .appName ("...") .master ("...") .config ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config ("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate ()
For the second configuration config ("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") from spark configuration, we can see that the interpretation of the spark.sql.catalog.spark_catalog is
A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'.
In other words, the metadata can be unified through this configuration, which is actually a result of the interaction between the spark community and the delta community.
Catalog plugin API of spark 3.x
In order to understand why delta can operate DDL and DML, it is necessary to know the Catalog plugin mechanism SPARK-31121 of spark 3.x.
The first is interface CatalogPlugin, which is the top-level interface for catalog plugin, as the note says:
* A marker interface to provide a catalog implementation for Spark. *
* Implementations can provide catalog functions by implementing additional interfaces for tables, * views, and functions. *
* Catalog implementations must implement this marker interface to be loaded by * {@ link Catalogs#load (String, SQLConf)} The loader will instantiate catalog classes using the * required public no-arg constructor. After creating an instance, it will be configured by calling * {@ link # initialize (String, CaseInsensitiveStringMap)}. *
* Catalog implementations are registered to a name by adding a configuration option to Spark: * {@ code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties * in the Spark configuration that share the catalog name prefix, * {@ code spark.sql.catalog.catalog-name. (key) = (value)} will be passed in the case insensitive * string map of options in initialization with the prefix removed. * {@ code name}, is also passed and is the catalog's name; in this case, "catalog-name".
Can be integrated into spark through spark.sql.catalog.catalog-name=com.example.YourCatalogClass
The implementation of this class can also integrate other additional tables views functions interfaces, so it is important to mention the interface TableCatalog, which provides methods related to tables:
/ * List the tables in a namespace from the catalog. *
* If the catalog supports views, this must return identifiers for only tables and not views. * * @ param namespace a multi-part namespace * @ return an array of Identifiers for tables * @ throws NoSuchNamespaceException If the namespace does not exist (optional). * / Identifier [] listTables (String [] namespace) throws NoSuchNamespaceException; / * Load table metadata by {@ link Identifier identifier} from the catalog. *
* If the catalog supports views and contains a view for the identifier and not a table, this * must throw {@ link NoSuchTableException}. * * @ param ident a table identifier * @ return the table's metadata * @ throws NoSuchTableException If the table doesn't exist or is a view * / Table loadTable (Identifier ident) throws NoSuchTableException
In this way, you can develop your own catalog based on TableCatalog, thus implementing multi-catalog support.
There has to be an interface DelegatingCatalogExtension, which is an abstract class that implements the CatalogExtension interface, and CatalogExtension inherits TableCatalog, SupportsNamespaces. DeltaCatalog implements DelegatingCatalogExtension, and this part is analyzed later.
Finally, there is a class CatalogManager, which is used to manage CatalogPlugins and is thread safe:
/ * * A thread-safe manager for [[CatalogPlugin]] s. It tracks all the registered catalogs, and allow * the caller to look up a catalog by name. * * There are still many commands (e.g. ANALYZE TABLE) that do not support v2 catalog API. They * ignore the current catalog and blindly go to the v1 `SessionCatalog. To avoid tracking current * namespace in both `SessionCatalog` and `CatalogManger`, we let `CatalogManager` to set/get * current database of `SessionCatalog` Catalog`. * / / TODO: all commands should look up table from the current catalog. The `SessionCatalog` doesn't// need to track current database at all.private[ SQL] class CatalogManager (conf: SQLConf, defaultSessionCatalog: CatalogPlugin, val v1SessionCatalog: SessionCatalog) extends Logging {
We see that CatalogManager manages v2 version of CatalogPlugin and v1 version of sessionCatalog, which is due to historical reasons that must be compatible with v1 version.
So where is CatalogManager called? If we take a look at BaseSessionStateBuilder, we can see that this class is the place where CatalogManager is authentic:
/ * Catalog for managing table and database states. If there is a pre-existing catalog, the state * of that catalog (temp tables & current database) will be copied into the new catalog. * * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields. * / protected lazy val catalog: SessionCatalog = {val catalog = new SessionCatalog (() = > session.sharedState.externalCatalog, () = > session.sharedState.globalTempViewManager, functionRegistry, conf, SessionState.newHadoopConf (session.sparkContext.hadoopConfiguration, conf), sqlParser, resourceLoader) parentState.foreach (_ .catalog.copyStateTo (catalog)) catalog} protected lazy val v2SessionCatalog = new V2SessionCatalog (catalog, conf) protected lazy val catalogManager = new CatalogManager (conf, v2SessionCatalog, catalog)
SessionCatalog is v1, which mainly communicates with the underlying metadata storage and manages temporary views, udf. This part will not be analyzed for the time being, but will focus on the v2 version of sessionCatalog. Let's take a look at V2SessionCatalog:
/ * * A [[TableCatalog]] that translates calls to the v1 SessionCatalog. * / class V2SessionCatalog (catalog: SessionCatalog, conf: SQLConf) extends TableCatalog with SupportsNamespaces {import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import V2SessionCatalog. _ overrideval defaultNamespace: Array [String] = Array ("default") override def name: String = CatalogManager.SESSION_CATALOG_NAME / / This class is instantiated by Spark, so `initialize` method will not be called. Override def initialize (name: String, options: CaseInsensitiveStringMap): Unit = {} override def listTables (namespace: Array [String]): Array [Identifier] = {namespace match {case Array (db) = > catalog .listTables (db) .map (ident = > Identifier.of (Array (ident.database.getOrElse ("), ident.table)) .toArray case _ = > throw new NoSuchNamespaceException (namespace)}}
If we analyze the listTables method, we can see that the sessionCatalog operations of v2 are delegated to the v1 version of sessionCatalog, and the other methods are the same, and the name defaults to CatalogManager.SESSION_CATALOG_NAME, that is, spark_catalog, which will be mentioned later. Note. Also, catalogmanager is used in parsers and optimizers in logical plans because the metadata is used:
Protected def analyzer: Analyzer = new Analyzer (catalogManager, conf) {... protected def optimizer: Optimizer = {new SparkOptimizer (catalogManager, catalog, experimentalMethods) {override def earlyScanPushDownRules: Seq [logical Plan] = super.earlyScanPushDownRules + + customEarlyScanPushDownRules override def extendedOperatorOptimizationRules: Seq [logical Plan] = super.extendedOperatorOptimizationRules + + customOperatorOptimizationRules}}
Analyzer and optimizer are at the core of spark sql parsing and, of course, the generation of physical plans. So where are these analyzer and optimizer called?
For example, the filter method in DataSet calls:
* / def filter (conditionExpr: String): Dataset [T] = {filter (Column (sparkSession.sessionState.sqlParser.parse_Expression (conditionExpr)}
SessionState.sqlParser is what I just called sqlParser:
Protected lazy val sqlParser: ParserInterface = {extensions.buildParser (session, new SparkSqlParser (conf))}
Only with the whole logic parsing from sql to the data link that uses metadata, we can get a rough idea of what's going on.
DeltaCatalog of delta
Let's go back and see how delta's DeltaCatalog combines with spark 3.x, with the source code DeltaCatalog:
Class DeltaCatalog (val spark: SparkSession) extends DelegatingCatalogExtension with StagingTableCatalog with SupportsPathIdentifier {def this () = {this (SparkSession.active)}...
As mentioned earlier, DeltaCatalog inherits DelegatingCatalogExtension, and you can see from the name that this is a delegate class, so how and to whom is it delegated?
Public abstract class DelegatingCatalogExtension implements CatalogExtension {private CatalogPlugin delegate; public final void setDelegateCatalog (CatalogPlugin delegate) {this.delegate = delegate;}
There is a setDelegateCatalog method in this class, which is called in the loadV2SessionCatalog method in CatalogManager:
Private def loadV2SessionCatalog (): CatalogPlugin = {Catalogs.load (SESSION_CATALOG_NAME, conf) match {case extension: CatalogExtension = > extension.setDelegateCatalog (defaultSessionCatalog) extension case other = > other}}
This method is called by v2SessionCatalog:
Private [sql] def v2SessionCatalog: CatalogPlugin = {conf.getConf (SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION). Map {customV2SessionCatalog = > try {catalogs.getOrElseUpdate (SESSION_CATALOG_NAME, loadV2SessionCatalog ())} catch {case NonFatal (_) = > logError ("Fail to instantiate the custom v2 session catalog:" + customV2SessionCatalog) defaultSessionCatalog}} .getOrElse (defaultSessionCatalog)}
This is to return the default v2 version of the SessionCatalog instance and analyze this method:
First get the configuration item SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION, that is, the spark.sql.catalog.spark_catalog configuration. If spark is configured, call loadV2SessionCatalog to load the class, otherwise, load the default v2SessionCatalog, that is, the V2SessionCatalog instance.
Here we find:
The spark.sql.catalog.spark_catalog configured by delta is "org.apache.spark.sql.delta.catalog.DeltaCatalog", that is, V2SessionCatalog in spark is an instance of DeltaCatalog, while DeltaCatalog is delegated to the V2SessionCatalog instance in BaseSessionStateBuilder.
Take a look at the createTable method of DeltaCatalog. The other methods are similar:
Override def createTable (ident: Identifier, schema: StructType, partitions: Array [Transform], properties: util.Map [String, String]): Table = {if (DeltaSourceUtils.isDeltaDataSourceName (getProvider (properties) {createDeltaTable (ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)} else {super.createTable (ident, schema, partitions, properties)}. Private def createDeltaTable (ident: Identifier) Schema: StructType, partitions: Array [Transform], properties: util.Map [String, String], sourceQuery: Option [LogicalPlan], operation: TableCreationModes.CreationMode): Table = {. Val tableDesc = new CatalogTable (identifier = TableIdentifier (ident.name (), ident.namespace () .lastoption), tableType = tableType, storage = storage, schema = schema, provider = Some ("delta"), partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, properties = tableProperties.toMap, comment = Option (properties.get ("comment") / / END: copy-paste from the super method finished. Val withDb = verifyTableAndSolidify (tableDesc, None) ParquetSchemaConverter.checkFieldNames (tableDesc.schema.fieldNames) CreateDeltaTableCommand (withDb, getExistingTableIfExists (tableDesc), operation.mode, sourceQuery, operation TableByPath = isByPath) .run (spark) loadTable (ident)} override def loadTable (ident: Identifier): Table = {try {super.loadTable (ident) match {case v1: V1Table if DeltaTableUtils.isDeltaTable (v1.catalogTable) = > DeltaTableV2 (spark, new Path (v1.catalogTable.location), catalogTable = Some (v1.catalogTable)) TableIdentifier = Some (ident.toString) case o = > o}}
Determine whether it is a delta data source, and if so, skip to the createDeltaTable method, otherwise call the super.createTable method directly
CreateDeltaTable will first write delta data with the delta-specific CreateDeltaTableCommand.run () command, and then load loadTable
LoadTable will call super's loadTable, and the method will call V2SessionCatalog's loadTable, and V2SessionCatalog will eventually call the v1 version of sessionCatalog's getTableMetadata method to form a V1Table (catalogTable) return, thus persisting the metadata information of delta to the metadata database managed by v1 SessionCatalog.
If it is not a delta data source, call the super.createTable method, which calls the createTable of V2SessionCatalog and ultimately the createTable method of the v1 version of sessionCatalog
Here we focus on the storage of delta data sources to metadata. The code of non-delta data sources is not pasted. If you are interested, you can compile the source code and track it.
We should also mention that the default configuration of spark.sql.defaultCatalog is spark_catalog, that is, the default catalog of sql is spark_catalog, which corresponds to delta, which is DeltaCatalog.
The above is all the contents of the article "how to operate DDL with delta of spark 3.0.1 integrated delta 0.7.0". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.