Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the way Flink distinguishes the running environment?

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "what is the method of distinguishing the running environment of Flink". In the daily operation, I believe that many people have doubts about the method of distinguishing the running environment of Flink. The editor consulted all kinds of materials and sorted out the simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "what is the method of distinguishing the running environment of Flink?" Next, please follow the editor to study!

The logic used by Flink to determine the running environment (local, cluster) is as follows:

(1) in the main method of the task, obtain the running environment through StreamExecutionEnvironment

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment ()

(2) the factory class that generates the running environment is placed in ThreadLocal; threadLocalContextEnvironmentFactory is the static property of the StreamExecutionEnvironment class.

/ * * The ThreadLocal used to store {@ link StreamExecutionEnvironmentFactory}. * / private static final ThreadLocal threadLocalContextEnvironmentFactory = new ThreadLocal ()

① when the task main method is run directly by the local IDE, the StreamExecutionEnvironmentFactory obtained in ThreadLocal is empty, and the local runtime environment LocalStreamEnvironment is generated

Public static StreamExecutionEnvironment getExecutionEnvironment () {return Utils.resolveFactory (threadLocalContextEnvironmentFactory, contextEnvironmentFactory) .map (StreamExecutionEnvironmentFactory::createExecutionEnvironment) .orElseGet (StreamExecutionEnvironment::createLocalEnvironment);}

When there is StreamExecutionEnvironmentFactory in ThreadLocal, its createExecutionEnvironment () method is used to generate the runtime environment.

② how do you put StreamExecutionEnvironmentFactory into ThreadLocal when you are in a clustered environment?

Through bin/flink run.... When the command submits the jar package to the cluster to run the command, the script calls org.apache.flink.client.cli.CliFrontend to run the user program, as follows:

. # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systemsexec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting [@]}"-classpath "`manglePathList" $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS "`" org.apache.flink.client.cli.CliFrontend "$@"

Execute the following method main ()-> parseParameters ()-> run ()-> executeProgram () in CliFrontend

Protected void executeProgram (final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {ClientUtils.executeProgram (new DefaultExecutorServiceLoader (), configuration, program, false, false);}

Call StreamContextEnvironment.setAsContext (...) in executeProgram () of org.apache.flink.client.ClientUtils, and StreamContextEnvironment inherits from StreamExecutionEnvironment. The setAsContext () code is as follows

Public static void setAsContext (final PipelineExecutorServiceLoader executorServiceLoader, final Configuration configuration, final ClassLoader userCodeClassLoader, final boolean enforceSingleJobExecution, final boolean suppressSysout) {StreamExecutionEnvironmentFactory factory = ()-> new StreamContextEnvironment (executorServiceLoader Configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout) InitializeContextEnvironment (factory);}

Create an instance of the factory class that generates the runtime environment, and place the instance in the static property threadLocalContextEnvironmentFactory of the StreamExecutionEnvironment class in the initializeContextEnvironment () method, as follows

Protected static void initializeContextEnvironment (StreamExecutionEnvironmentFactory ctx) {contextEnvironmentFactory = ctx; threadLocalContextEnvironmentFactory.set (contextEnvironmentFactory);}

In this way, when the user program StreamExecutionEnvironment.getExecutionEnvironment (), the runtime environment obtained is generated in the setAsContext () method of the StreamContextEnvironment class.

Public static void setAsContext (final PipelineExecutorServiceLoader executorServiceLoader, final Configuration configuration, final ClassLoader userCodeClassLoader, final boolean enforceSingleJobExecution, final boolean suppressSysout) {StreamExecutionEnvironmentFactory factory = ()-> new StreamContextEnvironment (executorServiceLoader Configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout) . }

The main difference between the local runtime environment LocalStreamEnvironment and the standalone cluster, flink on yarn and other runtime environment StreamContextEnvironment is that their member attribute configuration is different. In LocalStreamEnvironment is the created null key-value pair (new Configuration ()), while StreamContextEnvironment is a Configuration object generated by CliFrontend.

At this point, the study on "what is the method of distinguishing Flink from the running environment" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report