Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to execute user programs by Flink

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "how to execute user programs in Flink". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how Flink executes user programs".

Execute the user program CliFrontend to generate the Configuration object

Take flink on yarn as an example:

(1) in the main () method of CliFrontend, three command line objects, GenericCLI, FlinkYarnSessionCli and DefaultCLI, are generated and put into the ArrayList object customCommandLines in turn.

Public static void main (final String [] args) {EnvironmentInformation.logEnvironmentInfo (LOG, "Command Line Client", args);. / / 3. Load the custom command lines final List customCommandLines = loadCustomCommandLines (configuration, configurationDirectory) }

In the following run ()-> validateAndGetActiveCommandLine () method, the command line object is taken from the customCommandLines object in turn, and the isActive () method is called to determine which command line it is.

Public CustomCommandLine validateAndGetActiveCommandLine (CommandLine commandLine) {for (CustomCommandLine cli: customCommandLines) {if (cli.isActive (commandLine)) {return cli;}} throw new IllegalStateException ("No valid command-line found.");

In the isActive () method of FlinkYarnSessionCli, it determines whether the-m parameter is passed in when running the bin/flink script, and whether its value is yarn-cluster

@ Override public boolean isActive (CommandLine commandLine) {final String jobManagerOption = commandLine.getOptionValue (addressOption.getOpt (), null); final boolean yarnJobManager = ID.equals (jobManagerOption); final boolean hasYarnAppId = commandLine.hasOption (applicationId.getOpt ()) | | configuration.getOptional (YarnConfigOptions.APPLICATION_ID). IsPresent () Final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase (configuration.get (DeploymentOptions.TARGET)) | | YarnJobClusterExecutor.NAME.equalsIgnoreCase (configuration.get (DeploymentOptions.TARGET)); return hasYarnExecutor | | yarnJobManager | | hasYarnAppId | (isYarnPropertiesFileMode (commandLine) & & yarnApplicationIdFromYarnProperties! = null);}

AddressOption is a match-m _ department ID is yarn-cluster

(2) in the run () method of CliFrontend, the Configuration object is obtained by the getEffectiveConfiguration () method, and the command line object activeCommandLine is the FlinkYarnSessionCli obtained in the first step above.

In the getEffectiveConfiguration () method, the applyCommandLineOptionsToConfiguration () method of FlinkYarnSessionCli is called to add the configuration related to yarn, as follows:

Public Configuration applyCommandLineOptionsToConfiguration (CommandLine commandLine) throws FlinkException {/ / we ignore the addressOption because it can only contain "yarn-cluster" final Configuration effectiveConfiguration = new Configuration (configuration); applyDescriptorOptionToConfig (commandLine, effectiveConfiguration); final ApplicationId applicationId = getApplicationId (commandLine); if (applicationId! = null) {final String zooKeeperNamespace If (commandLine.hasOption (zookeeperNamespace.getOpt () {zooKeeperNamespace = commandLine.getOptionValue (zookeeperNamespace.getOpt ());} else {zooKeeperNamespace = effectiveConfiguration.getString (HA_CLUSTER_ID, applicationId.toString ()) } effectiveConfiguration.setString (HA_CLUSTER_ID, zooKeeperNamespace); effectiveConfiguration.setString (YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString (applicationId)); effectiveConfiguration.setString (DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);} else {effectiveConfiguration.setString (DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME) }. .}

The key configuration is DeploymentOptions.TARGET, that is, the target running environment of the program; the YarnJobClusterExecutor.NAME value is

Public enum YarnDeploymentTarget {PER_JOB ("yarn-per-job"),.} set the Configuration object for StreamExecutionEnvironment

Set it in executeProgram () of ClientUitls through the following code:

Public static void executeProgram (PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException {. Try {. StreamContextEnvironment.setAsContext (executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); } finally {Thread.currentThread () .setContextClassLoader (contextClassLoader);}} run the user program main () method

InvokeInteractiveModeForExecution () of PackagedProgram is called in the executeProgram () method of ClientUtils to execute the user main () method

Private static void callMainMethod (Class entryClass, String [] args) throws ProgramInvocationException {Method mainMethod; if (! Modifier.isPublic (entryClass.getModifiers () { } try {mainMethod = entryClass.getMethod ("main", String [] .class);} catch (NoSuchMethodException e) {. } catch (Throwable t) {. If (! Modifier.isStatic (mainMethod.getModifiers () {. If (! Modifier.isPublic (mainMethod.getModifiers () {. } try {mainMethod.invoke (null, (Object) args);} catch (IllegalArgumentException e) {. } catch (IllegalAccessException e) {. } catch (InvocationTargetException e) {. } catch (Throwable t) {}} at this point, I believe you have a deeper understanding of "how Flink executes user programs". You might as well do it in practice! Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report