In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what is the method of Flink submitting tasks". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let Xiaobian take you to learn "what is the method of Flink submitting tasks"!
I. Key components
There are three important components in the task submission process: Dispatcher, JobMaster, JobManagerRunnerImpl. Find MiniDispatcher first by calling the path below:
YarnJobClusterEntrypoint's main() -> ClusterEntrypoint's runCluster() -> DefaultDispatcherResourceManagerComponentFactory's create() -> DefaultDispatcherRunnerFactory's createDispatcherRunner() -> DefaultDispatcherRunner's grantLeadership() -> JobDispatcherLeaderProcess's onStart() -> DefaultDispatcherGatewayServiceFactory's create() -> JobDispatcherFactory's createDispatcher() -> MiniDispatcher's start()
(1)Dispatcher
Responsible for receiving task submission requests and distributing them to JobManager for execution;
When Dispatcher starts, it runs startRecoveredJobs() to start the task that needs to be recovered. When in Flink on Yarn mode, MiniDispatcher transfers the current task to the task that needs to be restored, thus achieving the task commit launch.
(2)JobManagerRunner
Responsible for running JobMaster
(3)JobMaster
Responsible for running tasks, corresponding to the old version of JobManager;
Each task corresponds to a JobMaster;
II. JobMaster performs tasks
Execute a task in JobMaster through Scheduler and Execution components. Assign each node operator in the Task DAG to a TaskExecutor run in TaskManager.
The start() method of Execution remotely calls the submitTask() method of TaskExecutor via rpc:
public void deploy() throws JobException { ...... try { ...... final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final ComponentMainThreadExecutor jobMasterMainThreadExecutor = vertex.getExecutionGraph().getJobMasterMainThreadExecutor(); CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor) .thenCompose(Function.identity()) .whenCompleteAsync( ....., jobMasterMainThreadExecutor); } catch (Throwable t) { ...... } } III. TaskExecutor runs operator node tasks
TaskExecutor's submitTask() method runs the operator task by creating org.apache.flink.runtime.taskmanager.Task. In the doRun() method of Task, the operator processing logic is run through the execution class AbstractInvokable corresponding to the operator node. The execution class AbstractInvokable corresponding to each operator is determined when the client submits the task. AddOperator() of StreamExecutionEnvironment:
public void addOperator( Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory operatorFactory, TypeInformation inTypeInfo, TypeInformation outTypeInfo, String operatorName) { Class sourceOperator = new StreamSource(function); return new DataStreamSource(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName); }
headOperator is StreamSource, userFunction in StreamSource is FlinkKafkaConsumer
At this point, I believe that everyone has a deeper understanding of "what is the method of Flink submitting tasks". Let's actually operate it! Here is the website, more related content can enter the relevant channels for inquiry, pay attention to us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.