Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to achieve compatibility with hive metastore through classloader in spark

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will talk to you about how spark achieves compatibility with hive metastore through classloader. Many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.

Background

We simply mentioned that it can be compatible with different hive metadata according to the configuration of the official website. this time, let's analyze how spark can access different versions of metadata at the code level. Note: as stated on the official website, this section is only used for access to hive metadata, and other versions of hive compiled within spark sql are used for other execution, such as serialization and deserialization, UDF and UDAF, etc.

This point is mentioned here to allay the doubt that there are some classes that do not exist in the lower version in the source code, because this part of spark sql has built-in other versions of hive for interactions other than hive metadata, such as: SerializationUtilities in hive/hiveShim.scala does not exist in hive 1.2.1, but hive higher version 2.3.7 does exist.

We use spark 3.1.1 for analysis.

Analysis.

We know that the interaction between spark and external metadata is an ExternalCatalog-like response. Corresponding to HiveExternalCatalog metadata, go to the client code:

/ * A Hive client used to interact with the metastore. * / lazy val client: HiveClient = {HiveUtils.newClientForMetadata (conf, hadoopConf)}

The client is the final executor of the metadata interaction, and the newClientForMetadata method of HiveUtils is called directly here, skipping directly to the final called method:

Protected [hive] def newClientForMetadata (conf: SparkConf, hadoopConf: Configuration, configurations: Map [String String]: HiveClient = {val sqlConf = new SQLConf sqlConf.setConf (SQLContext.getSQLProperties (conf)) val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion (sqlConf) val hiveMetastoreJars = HiveUtils.hiveMetastoreJars (sqlConf) val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes (sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes (sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion (hiveMetastoreVersion).} else if (hiveMetastoreJars = = "path") {/ / Convert to files and expand any directories. Val jars = HiveUtils.hiveMetastoreJarsPath (sqlConf) .flatMap {case path if path.contains ("\") & & Utils.isWindows = > addLocalHiveJars (new File (path)) case path = > DataSource.checkAndGlobPathIfNecessary (pathStrings = Seq (path), hadoopConf = hadoopConf, checkEmptyGlobPath = true, checkFilesExist = false EnableGlobbing = true) .map (_ .toUri.toURL)} logInfo (s "Initializing HiveMetastoreConnection version $hiveMetastoreVersion" + s "using path: ${jars.mkString (") ) new IsolatedClientLoader (version = metaVersion, sparkConf = conf, hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes).

Val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion (sqlConf) where you directly get the version of the configuration metadata, that is, the spark.sql.hive.metastore.version configuration item

Val hiveMetastoreJars = HiveUtils.hiveMetastoreJars (sqlConf) the acquisition method of hive metadata jar package is configured here. Builtin is built-in by default. It is recommended to use path method, because the general online environment is no network environment val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes (sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes (sqlConf) these two are related to classloader, that is, which classes are loaded with which classloader to isolate class.

Val metaVersion = IsolatedClientLoader.hiveVersion (hiveMetastoreVersion) is mapped to the hive version representation within spark, which is used for refinement of metadata class

Different ways of initializing IsolatedClientLoader will be used here depending on how the metadata jar package is obtained. The createClient method of isolatedLoader will eventually be called:

/ * The isolated client interface to Hive. * / private [hive] def createClient (): HiveClient = synchronized {val warehouseDir = Option (hadoopConf.get (ConfVars.METASTOREWAREHOUSE.varname)) if (! isolationOn) {return new HiveClientImpl (version, warehouseDir, sparkConf, hadoopConf, config, baseClassLoader, this)} / / Pre-reflective instantiation setup LogDebug ("Initializing the logger to avoid disaster...") Val origLoader = Thread.currentThread () .getContextClassLoader Thread.currentThread.setContextClassLoader (classLoader) try {classLoader .loadClass (classOf [HiveClientImpl] .getName) .getConstructor s.head .newInstance (version, warehouseDir, sparkConf, hadoopConf, config, classLoader This) .asInstanceOf [HiveClient]} catch {case e: InvocationTargetException = > if (e.getCause (). IsInstanceOf [NoClassDefFoundError]) {val cnf = e.getCause () .asInstanceOf [NoClassDefFoundError] throw new ClassNotFoundException (s "$cnf when creating Hive client using classpath: ${execJars.mkString (") ")}\ n" + "Please make sure that jars for your version of hive and hadoop are included in the" + s "paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e)} else {throw e}} finally {Thread.currentThread.setContextClassLoader (origLoader)}}

If isolation is not enabled, HiveClientImpl is returned directly, which is shared by all end users. If enabled (default), the current contextClassLoader is set to classLoader: the classLoader is custom:

New URLClassLoader (allJars, rootClassLoader) {override def loadClass (name: String, resolve: Boolean): Class [_] = {val loaded = findLoadedClass (name) if (loaded = = null) doLoadClass (name, resolve) else loaded} def doLoadClass (name: String, resolve: Boolean): Class [_] = {val classFileName = name.replaceAll ("\\." "/") + ".class" if (isBarrierClass (name)) {/ / For barrier classes, we construct a new copy of the class Val bytes = IOUtils.toByteArray (baseClassLoader.getResourceAsStream (classFileName)) logDebug (s "custom defining: $name-${util.Arrays.hashCode (bytes)}") defineClass (name, bytes, 0, bytes.length)} else if (! isSharedClass (name)) {logDebug (s "hive class: $name-${getResource (classToPath (name))}") super.loadClass (name) Resolve)} else {/ / For shared classes, we delegate to baseClassLoader, but fall back in case the / / class is not found. LogDebug (s "shared class: $name") try {baseClassLoader.loadClass (name)} catch {case _: ClassNotFoundException = > super.loadClass (name, resolve)}.

Direct focus, for the isolation enabled (default), then directly return the classLoader, the knowledge of classloader, you can refer to here, if there is really do not understand, you can refer to the source code of the classLoader class.

Here we focus on the loadClass method of the custom classloader, which is the key to class isolation

If it is a BarrierClass, such as HiveClientImpl/Shim/ShimLoader, or contains a custom prefix. Then copy a copy of the class class from the current ContextClassLoader and generate the corresponding class

If it is not a shared class or BarrierClass, use the loadClass method of URLClassLoader to load class

Otherwise, if it is not a barrierClass but a shared class, the current contextclassloader is used to load the current class

Through the way the classLoader is loaded, the class associated with hive metadata is loaded through this custom classLoader (note that the child classloader can see the classes loaded by the parent loader)

After that, the corresponding HiveClientImpl class is loaded by the classloader, and the HiveClientImpl object is instantiated by reflection, so that the dynamic loading is realized according to the incoming metadata jar package at run time.

Resets the contextClassLoader of the current thread.

Important: the dynamic recording of the jar package of hive metadata is realized through custom classloader

As for the real interaction with shim metadata is HiveClientImpl, this class introduces the mechanism of shim, that is, through this shim mechanism, the upgrade of the version of hive metadata is controlled by the shim. For example, if you add methods, you will add corresponding methods to the shim, so as to achieve the backward compatibility of hive metadata. In fact, from the English word shim, we can also see that shim (gasket) is a gasket made to suit the upgrade of the version.

After reading the above, do you have any further understanding of how spark achieves hive metastore compatibility through classloader? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report