In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
From the Flink client submission source code to see what the third-party jar package dynamic loading solution is, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can get something.
1. Source code analysis of flink run submission process
Check the flink script to find the entry class that executes the run command, as follows:
Exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting [@]}"-classpath "`manglePathList" $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS "`" org.apache.flink.client.cli.CliFrontend "$@
The entry class is: org.apache.flink.client.cli.CliFrontend. Eventually, the parseParameters (String [] args) method is called to perform command parsing, and the run command calls the run (params) method, as follows:
Switch (action) {case ACTION_RUN: run (params); return 0; case ACTION_RUN_APPLICATION: runApplication (params); return 0; case ACTION_LIST: list (params); return 0; case ACTION_INFO: info (params) Return 0; case ACTION_CANCEL: cancel (params); return 0; case ACTION_STOP: stop (params); return 0; case ACTION_SAVEPOINT: savepoint (params); return 0;}
The run method code is as follows
Protected void run (String [] args) throws Exception {LOG.info ("Running 'run' command."); final Options commandOptions = CliFrontendParser.getRunCommandOptions (); final CommandLine commandLine = getCommandLine (commandOptions, args, true); / / evaluate help flag if (commandLine.hasOption (HELP_OPTION.getOpt () {CliFrontendParser.printHelpForRun (customCommandLines) Return;} final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine (checkNotNull (commandLine)); final ProgramOptions programOptions = ProgramOptions.create (commandLine); # create PackagedProgram object final PackagedProgram program = getPackagedProgram (programOptions) # parsing to get dependency jar final List jobJars = program.getJobJarAndDependencies (); # generating final submission configuration final Configuration effectiveConfiguration = getEffectiveConfiguration (activeCommandLine, commandLine, programOptions, jobJars); LOG.debug ("Effective executor configuration: {}", effectiveConfiguration) Try {executeProgram (effectiveConfiguration, program);} finally {program.deleteExtractedLibraries ();}}
The run method creates a PackagedProgram object based on the parameters passed by the user, such as the main function, jar package, and so on, which encapsulates the information submitted by the user. As you can see from the getPackagedProgram () method.
Return PackagedProgram.newBuilder () .setJarFile (jarFile) .setUserClassPaths (classpaths) .setEntryPointClassName (entryPointClass) .setConfiguration (configuration) .setSavepointRestoreSet tings (runOptions.getSavepointRestoreSettings ()) .setArguments (programArgs) .build ()
Take a look at the PackagedProgram constructor, which creates several key member variables:
Classpaths: the information passed in by the user-C parameter
JarFile: the jar of the user's main function
ExtractedTempLibraries: extract all the jar package information under the lib/ folder in the above main jar package for later classloader use
UserCodeClassLoader: the classloader of the user code, and this classloader adds classpaths,jarFile,extractedTempLibraries to the classpath. The userCodeClassLoader adopts the child_first priority policy by default.
MainClass: user main function method. The construction method is as follows:
Private PackagedProgram (@ Nullable File jarFile, List classpaths, @ Nullable String entryPointClassName, Configuration configuration, SavepointRestoreSettings savepointRestoreSettings, String...) Args) throws ProgramInvocationException {this.classpaths = checkNotNull (classpaths); this.savepointSettings = checkNotNull (savepointRestoreSettings); this.args = checkNotNull (args); checkArgument (jarFile! = null | | entryPointClassName! = null, "Either the jarFile or the entryPointClassName needs to be non-null."); / / whether the job is a Python job. This.isPython = isPython (entryPointClassName); / / load the jar file if exists this.jarFile = loadJarFile (jarFile); assert this.jarFile! = null | | entryPointClassName! = null; / / now that we have an entry point, we can extract the nested jar files (if any) this.extractedTempLibraries = this.jarFile = = null? Collections.emptyList (): extractContainedLibraries (this.jarFile); this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader (getJobJarAndDependencies (), classpaths, getClass (). GetClassLoader (), configuration) / / load the entry point class this.mainClass = loadMainClass (/ / if no entryPointClassName name was given, we try and look one up through the manifest entryPointClassName! = null? EntryPointClassName: getEntryPointClassNameFromJar (this.jarFile), userCodeClassLoader); if (! hasMainMethod (mainClass)) {throw new ProgramInvocationException ("The given program class does not have a main (String []) method.");}}
The getJobJarAndDependencies method in PackagedProgram, which collects all the dependent jar packages of job. These jar packages are later submitted to the cluster and added to the classpath path.
After the construction of the PackagedProgram object is complete, the final Configuration object is created as follows
Final Configuration effectiveConfiguration = getEffectiveConfiguration (activeCommandLine, commandLine, programOptions, jobJars)
This method sets two parameters:
Pipeline.classpaths: the values are getJobJarAndDependencies () and url in classpaths
Pipeline.jars: the dependency under the jar and lib folders returned by getJobJarAndDependencies (). When submitting the cluster later, the jar will be submitted to the cluster together.
Once PackagedProgram and Configuration are ready, the user program will be executed.
ExecuteProgram (effectiveConfiguration, program)
The detailed code is as follows:
Public static void executeProgram (PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException {checkNotNull (executorServiceLoader); final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader () Final ClassLoader contextClassLoader = Thread.currentThread (). GetContextClassLoader (); try {# sets the user context user class loader Thread.currentThread (). SetContextClassLoader (userCodeClassLoader); LOG.info ("Starting program (detached: {})",! configuration.getBoolean (DeploymentOptions.ATTACHED)) ContextEnvironment.setAsContext (executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout) StreamContextEnvironment.setAsContext (executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout) Try {# reflection calls the user's main function to execute job submission program.invokeInteractiveModeForExecution ();} finally {ContextEnvironment.unsetAsContext (); StreamContextEnvironment.unsetAsContext () }} finally {Thread.currentThread () .setContextClassLoader (contextClassLoader);}}
Finally, summarize the whole process:
Execute flink run naming and pass in relevant parameters
Create PackagedProgram object, prepare related jar, user class loader, Configuration object
Call the user Main method through reflection
Build Pipeline StreamGraph and submit job to the cluster
two。 When submitting job, dynamically load third-party jar (such as udf, etc.)
By analyzing the process, we can find that there are two ways to add dynamic jar.
Dynamically put the tripartite jar in the lib directory of the main function jar package (which can be done by jar uf naming) because in the PackagedProgram constructor, all the jar in the jar lib directory will be obtained through the extractContainedLibraries () method, and these jar will be uploaded to the cluster together.
In the user task main function, the pipeline.classpaths and pipeline.jars properties of the Configuration object are set dynamically through reflection. You also need to load a third-party jar into Thread.contextClassLoader. (see: https://zhuanlan.zhihu.com/p/278482766)
I directly adopted the first scheme in the project and will not add more code.
Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.