Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to master the startup process of Flink on YARN application

2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "how to master the Flink on YARN application startup process". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Flink on YARN flow chart

The Flink on YARN cluster deployment model involves two open source frameworks, YARN and Flink, and many aspects of the application launch process are intertwined. In order to facilitate everyone's understanding, the Flink on YARN infrastructure and the whole application launch process are drawn on a diagram, and the key roles and processes are introduced. The whole startup process is divided into two stages: client submission (the process is marked purple), Flink Cluster startup and Job submission run (the process is marked orange). Because there are too many branches and details, this article will ignore some of them and only introduce the key processes (based on Flink open source version 1.9 source code collation).

Client submission proc

1. Execute the command: bin/flink run-d-m yarn-cluster. Or bin/yarn-session.sh... To submit applications that run in per-job mode or session mode

two。 Parse the command parameter item and initialize it, and start the specified operation mode. In the case of per-job operation mode, the job graph will be created according to the Job main class specified by the command line parameter.

If the application ID can be obtained from the command line argument (- yid) or the YARN properties temporary file (${java.io.tmpdir} / .yarn-properties-$ {user.name}), submit the Job to the specified application

Otherwise, start the per-job running mode when the command line parameter contains-d (for detached mode) and-m yarn-cluster (for specified YARN cluster mode)

Otherwise, start the session running mode when the command line parameter item does not contain-yq (indicating that the resources available to the YARN cluster are queried)

3. Get YARN cluster information, new application ID, and start pre-run check

Request to create a new application through YarnClient to YARN ResourceManager (hereinafter abbreviated as: YARN RM,YARN Master node, responsible for the management and scheduling of the entire cluster resources) (YARN RM receives the request to create an application and returns the upper limit of resources applied for by ID and container), and obtains the YARN Slave node report (YARN RM returns the ID, status, rack, http address, total resources, used resources and other information of all slave nodes)

Pre-run check: (1) simply verify whether the YARN cluster can be accessed; (2) whether the maximum node resource can meet the flink JobManager/TaskManager vcores resource request requirements; (3) specify whether the queue exists (does not exist and only print WARN information, and then exclude the exception and exit when it is submitted to YARN); (4) throw an exception and exit when it is expected that the Container resource requested by the application will exceed the YARN resource limit (5) provide some reference information when it is expected that the application application cannot be satisfied (for example, the total resources exceed the total available resources of YARN cluster, the Container application resources exceed the maximum available resources of NM, etc.).

4. Upload the application configuration (flink-conf.yaml, logback.xml, log4j.properties) and related files (flink jars, ship files, user jars, job graph, etc.) to the application staging directory of distributed storage (e.g. HDFS) (/ user/$ {user.name} / .flink /)

5. Prepare the application submission context (ApplicationSubmissionContext, including the name, type, queue, label and other information of the application and the environment variables, classpath, resource size, etc.) of the container of the application Master, register the shutdown hook that failed to deploy (clean up the corresponding HDFS directory of the application), and then submit the application to YARN RM through YarnClient

6. Wait in a loop until the application status is RUNNING, which consists of two phases:

Loop waits for the application to be submitted successfully (SUBMITTED): by default, the application report is obtained through YarnClient every other 200ms. If the application status is not NEW and NEW_SAVING, it is considered to have been successfully submitted and exited from the loop. The current application status will be output to the log for 10 times in each loop: "Application submission is not finished, submitted application is still in". Output log after successful submission: "Submitted application".

Loop waiting for the application to run properly (RUNNING): every 250ms gets the application report through YarnClient, and each cycle also outputs the current application status to the log: "Deploying cluster, current state". The log "YARN application has been deployed successfully." will be output when the application status changes to RUNNING successfully. And exit the loop, and if you wait for an unexpected state such as FAILED/FINISHED/KILLED, an exception will be thrown and exited after outputting the diagnostic information returned by YARN ("The YARN application unexpectedly switched to state during deployment. Diagnostics from YARN:...").

Flink Cluster startup process

ClientRMService in 1.YARN RM (a RPC service component for ordinary users that handles various RPC requests from clients, such as querying YARN cluster information, submitting, terminating applications, etc.) receives the application submission request and transfers the request to RMAppManager (the component that manages the application life cycle internally in YARN RM) after simple verification.

2.RMAppManager creates an application with the initial state of NEW based on the application submission context, and persists the application state to the RM state storage service (for example, ZooKeeper cluster, RM state storage service is used to ensure that the cluster application can resume normally after RM restart, HA switching or failure, and the state storage involved in the subsequent process will not be discussed again), and the application state will be changed to NEW_SAVING.

3. After the application state storage is completed, the application status changes to SUBMITTED RMAppManager began to submit applications to ResourceScheduler (YARN RM pluggable Resource Scheduler, YARN comes with three kinds of scheduler FifoScheduler/FairScheduler/CapacityScheduler, of which CapacityScheduler support function is the most widely used, FifoScheduler function is the simplest and basically unavailable. This year, the community has made it clear that FairScheduler will no longer be supported. It is recommended that some users move to CapacityScheduler) to submit applications. If the application cannot be submitted normally (for example, the queue does not exist, the queue is not a leaf queue, the queue has been deactivated, the maximum number of applications in the queue is exceeded, etc.), the application is rejected. The application status changes to FINAL_SAVING, triggers the application state storage process, and becomes FAILED after completion. If the submission is successful, the application status changes to ACCEPTED

4. Start to create an application running instance (ApplicationAttempt. Since the most important component in a running instance is ApplicationMaster, hereinafter referred to as AM, its state represents the current state of ApplicationAttempt, so ApplicationAttempt actually represents AM), and the initial state is NEW.

5. Initialize the application running instance information and register with ApplicationMasterService (AM&RM protocol interface service, which processes requests from AM, mainly including registration and heartbeat), and the application instance status changes to SUBMITTED.

The application instance maintained by 6.RMAppManager initializes the AM resource request information and revalidates the queue, and then applies for AM Container from ResourceScheduler (Container is the abstraction of resources in YARN, including memory, CPU and other multi-dimensional resources), and the application instance status changes to ACCEPTED.

According to the priority (each dimension of queue / application / request has priority configuration), 7.ResourceScheduler progresses layer by layer from the root queue, selects the subqueue with the highest priority, the application and a specific request, and then makes the allocation decision according to the cluster resource distribution. After the AM Container allocation is successful, the application instance status changes to ALLOCATED_SAVING, and the application instance state storage process is triggered. After successful storage, the application instance status changes to ALLOCATED.

The application instance maintained by 8.RMAppManager starts notifying ApplicationMasterLauncher (AM Lifecycle Management Service, which is responsible for starting or cleaning AM container) to start AM container,ApplicationMasterLauncher and YARN NodeManager (hereinafter referred to as YARN NM, maintaining communication with YARN RM, responsible for managing all resources, Container lifecycle, ancillary services, etc., on a single node, monitoring node health and Container resource usage) to establish communication and request to start AM container

9.ContainerManager (YARN NM core component, which manages the life cycle of all Container) receives the AM container startup request, and YARN NM starts to verify Container Token and resource files, create application instances and Container instances and store them locally. When the result is returned, the application instance status changes to LAUNCHED.

10.ResourceLocalizationService (Resource Localization Service) is responsible for the localization of resources required by Container. It can download the file resources needed for Container from HDFS as described and distribute them to each disk as far as possible to prevent access hotspots) initialize various service components, create a working directory, download various resources needed for operation from HDFS to the Container working directory (path: ${yarn.nodemanager.local-dirs} / usercache/$ {user} / appcache//)

11.ContainersLauncher (responsible for the specific operations of container, including startup, restart, recovery and cleaning, etc.) writes the environment variables and running commands needed to run Container into the launch_container.sh script under the Container working directory, and then runs the script to start Container

The 12.Container process loads and runs ClusterEntrypoint (Flink JobManager entry class, each cluster deployment mode and application running mode have corresponding implementations. For example, in YARN cluster deployment mode, the per-job application running mode implementation class is YarnJobClusterEntrypoint,session application running mode implementation class is YarnSessionClusterEntrypoint). First, initialize the relevant running environment:

Output software version and running environment information, command line parameter items, classpath and other information

Register handler that handles various SIGNAL: log to log

Register JVM to turn off guaranteed shutdown hook: avoid being blocked by other shutdown hook when JVM exits

Print YARN runtime environment information: user name

Load flink conf from the running directory

Initialize the file system

Create and start various internal services (including RpcService, HAService, BlobServer, HeartbeatServices, MetricRegistry, ExecutionGraphStore, etc.)

Update RPC address and port to flink conf configuration

13. Start ResourceManager (the core component of Flink resource management, including two sub-components of YarnResourceManager and SlotManager. YarnResourceManager is responsible for external resource management, establish communication with YARN RM and maintain heartbeat, apply for or release TaskManager resources, log out applications, etc. SlotManager is responsible for internal resource management, maintains all Slot information and status) and related services, creates an asynchronous AMRMClient, starts to register AM, and sends a heartbeat to YARN RM at regular intervals (heartbeat interval configuration item: ${yarn.heartbeat.interval}, default 5s) to send resource update requests and accept resource change results. Within YARN RM, the status of the application and the running instance of the application are changed to RUNNING, and the AMLivelinessMonitor service is informed to monitor the survival status of the AM, and trigger the AM failover process when the heartbeat exceeds a certain time (default is 10 minutes).

14. Start Dispatcher (responsible for receiving the job provided by the user and pulling up a new JobManager for the newly submitted job) and related services (including REST endpoint, etc.). In per-job operation mode, Dispatcher will load the JobGrap file directly from the Container working directory; in session operation mode, Dispatcher will carry out the subsequent process after receiving the Job submitted by the client (_ receiving job grap file through BlockServer)

15. Start JobManager (responsible for job scheduling, managing the life cycle of Job and Task) according to JobGraph, and build ExecutionGraph (the parallel version of JobGraph, the core data structure of the scheduling layer)

16.JobManager starts to execute ExecutionGraph and apply for resources from ResourceManager

17.ResourceManager adds the resource request to the waiting request queue and requests a new Container resource from the YARN RM through the heartbeat to start the TaskManager process. If there is a free Slot resource in the subsequent process, SlotManager allocates it to the matching request in the waiting request queue, instead of applying for a new TaskManager through 18. YarnResourceManager

* * after receiving the resource request, 18.YARN ApplicationMasterService parses the new resource request and updates the application request information; * *

19.YARN ResourceScheduler updates the application information after successfully allocating resources to the application. When ApplicationMasterService receives the next heartbeat of Flink JobManager, it returns the newly allocated resource information.

After receiving the newly allocated Container resources, 20.Flink ResourceManager prepares the TaskManager startup context (ContainerLauncherContext, generating TaskManager configuration and uploading to distributed storage, configuring other dependencies and environment variables, etc.), and then applies to YARN NM to start the TaskManager process. The process for YARN NM to start Container is basically similar to that of AM Container, except that the application instance initialization process is skipped when the application instance already exists on NM and does not have a RUNNING state.

The 21.TaskManager process loads and runs YarnTaskExecutorRunner (Flink TaskManager entry class), and starts TaskExecutor (responsible for performing Task related operations) after the initialization process is completed.

22.TaskExecutor first registers with ResourceManager after startup, and then reports its Slot resources and status to SlotManager after success. After receiving Slot idle resources, SlotManager actively triggers Slot allocation, selects appropriate resource requests from the waiting request queue, and requests the Slot resources from TaskManager.

After receiving the request, 23.TaskManager checks whether the Slot can be assigned (if it does not exist, it returns exception information), whether the Job has been registered (if not, register first and then assign the Slot), and then assign the Slot to the JobManager after passing the check.

24.JobManager checks whether the Slot allocation is duplicated, then informs Execution to execute the deployment task process, and submits the task; TaskExecutor to TaskExecutor to start a new thread to run Task.

This is the end of the content of "how to master the Flink on YARN application startup process". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report