Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The Architecture principle and implementation idea of EasyScheduler scheduling system

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

System architecture design

Before explaining the architecture of the scheduling system, let's take a look at the terms commonly used in the scheduling system.

1. Noun interpretation

DAG: full name Directed Acyclic Graph, abbreviated as DAG. The Task tasks in the workflow are assembled in the form of directed acyclic graph and traverse topologically from nodes with zero entry degree to no successor nodes. For example, the following figure:

Process definition: a visual DAG formed by dragging a task node and establishing an association between the task node

Process instance: a process instance is the instantiation of a process definition, which can be generated by manual startup or scheduled scheduling.

Task instance: a task instance is the instantiation of a task node in a process definition, identifying the specific task execution status.

Task type: currently supports SHELL, SQL, SUB_PROCESS, PROCEDURE, MR, SPARK, PYTHON, DEPENDENT, and plans to support dynamic plug-in extensions. Note: its sub-SUB_PROCESS is also a separate process definition, which can be started and executed separately.

Scheduling mode: the system supports timing scheduling and manual scheduling based on cron expression. Command type support: start workflow, start execution from the current node, resume fault-tolerant workflow, resume pause process, start execution from failed node, complement, schedule, rerun, pause, stop, resume waiting thread. The two command types of restoring fault-tolerant workflow and recovery waiting thread are used by scheduling internal control and cannot be called externally.

Timing scheduling: the system adopts quartz distributed scheduler and supports visual generation of cron expressions.

Dependency: the system not only supports DAG simple dependencies between precursor and successor nodes, but also provides task dependency nodes to support custom task dependencies between processes.

Priority: the priority of process instance and task instance is supported. If the priority of process instance and task instance is not set, the default is FIFO.

Mail alarm: support SQL task query result email sending, process instance running result email alarm and fault tolerant alarm notification

Failure strategy: for tasks running in parallel, if any task fails, two failure strategy processing methods are provided. Continuation means that regardless of the state of running the task in parallel, until the end of the process failure. End means that once a failed task is found, the running parallel task is dropped by Kill at the same time, and the process fails to end.

Complement: complement historical data, support interval parallel and serial complement

two。 System Architecture 2.1 system Architecture Diagram

2.2 Architecture description

MasterServer

MasterServer adopts the concept of distributed and centerless design. MasterServer is mainly responsible for DAG task segmentation, task submission monitoring, and monitoring the health status of other MasterServer and WorkerServer.

When the MasterServer service starts, it registers the temporary node with Zookeeper, and carries out fault-tolerant processing by listening for changes in Zookeeper temporary node.

The service mainly includes:

Distributed Quartz distributed scheduling component is mainly responsible for the start and stop operation of timed tasks. When the task is called up by quartz, there will be a thread pool inside Master to deal with the subsequent operations of the task.

MasterSchedulerThread is a scanning thread that periodically scans the command table in the database and performs different business operations according to different command types.

MasterExecThread is mainly responsible for DAG task segmentation, task submission monitoring, and logical processing of different command types.

MasterTaskExecThread is mainly responsible for task persistence.

WorkerServer

WorkerServer also adopts the concept of distributed and centerless design, and WorkerServer is mainly responsible for the execution of tasks and the provision of log services. When the WorkerServer service starts, the temporary node is registered with the Zookeeper and the heartbeat is maintained.

The service includes:

FetchTaskThread is mainly responsible for constantly getting tasks from Task Queue and calling TaskScheduleThread corresponding executors according to different task types.

LoggerServer is a RPC service that provides log fragment viewing, refresh and download functions.

ZooKeeper

ZooKeeper service, the MasterServer and WorkerServer nodes in the system all use ZooKeeper for cluster management and fault tolerance. In addition, the system also carries out event monitoring and distributed locking based on ZooKeeper.

We also implemented queues based on Redis, but we wanted EasyScheduler to rely on as few components as possible, so we finally removed the Redis implementation.

Task Queue

Provides the operation of the task queue, which is also implemented based on Zookeeper. As there is less information in the queue, there is no need to worry about too much data in the queue. In fact, we have tested millions of data storage queues, which has no impact on the stability and performance of the system.

Alert

Provides alarm-related interfaces, which mainly include the storage, query and notification functions of two types of alarm data. Among them, there are two kinds of notification functions: email notification and SNMP (not implemented yet).

API

The API interface layer is mainly responsible for handling the requests of the front-end UI layer. The service provides a unified RESTful api to provide request services to the outside.

The interface includes workflow creation, definition, query, modification, release, offline, manual start, stop, pause, resume, execution from this node, and so on.

UI

The front-end page of the system provides a variety of visual operation interfaces of the system, as detailed in the user manual section of the system.

2.3Architectural design ideas I. decentralized vs centralization idea

The design concept of centralization is relatively simple. Nodes in distributed clusters are generally divided into two roles according to their roles:

The role of Master is mainly responsible for task distribution and supervising the health status of Slave, and can dynamically balance tasks to Slave, so that Slave nodes are not "busy" or "idle". The role of Worker is mainly responsible for task execution and maintenance and Master heartbeat so that Master can assign tasks to Slave.

The problems existing in the design of centralized ideas:

Once there is a problem with Master, there is no leader, and the whole cluster will collapse. In order to solve this problem, most Master/Slave architecture modes adopt the design scheme of active / standby Master, which can be hot or cold backup, automatic switching or manual switching, and more and more new systems begin to have the ability to automatically elect and switch Master to improve the availability of the system. Another problem is that if Scheduler is on Master, although it can support different tasks in a DAG to run on different machines, it will result in overload of Master. If Scheduler is on Slave, all tasks in a DAG can only be submitted on a certain machine, and when there are more parallel tasks, the pressure on Slave may be greater. Decentralization

In decentralized design, there is usually no concept of Master/Slave, all roles are the same, the status is equal, the global Internet is a typical decentralized distributed system, any node device connected to the downmachine will only affect a very small range of functions. The core design of decentralized design is that there is no "manager" different from other nodes in the whole distributed system, so there is no single point of failure problem. However, because there is no "manager" node, each node needs to communicate with other nodes to get the necessary machine information, and the unreliable communication of the distributed system greatly increases the difficulty of implementing the above functions.

In fact, truly decentralized distributed systems are rare. On the contrary, dynamic centralized distributed systems are constantly emerging. Under this architecture, the managers in the cluster are selected dynamically, not preset, and when the cluster fails, the nodes of the cluster will spontaneously hold a "meeting" to elect a new "manager" to preside over the work. The most typical case is Etcd implemented in ZooKeeper and GE language.

The decentralization of EasyScheduler is that Master/Worker registers to Zookeeper, realizes that Master cluster and Worker cluster have no center, and uses Zookeeper distributed lock to elect one of the Master or Worker as "manager" to perform tasks. Second, distributed locking practice

EasyScheduler uses ZooKeeper distributed locks to implement the submission of tasks that only one Master executes Scheduler or only one Worker executes at a time.

The core flow algorithm for obtaining distributed locks is as follows

Flowchart of Scheduler thread distributed lock implementation in EasyScheduler:

If there is no sub-process in a DAG, if the number of data items in the Command is greater than the threshold set by the thread pool, the direct process waits or fails. If there are many sub-processes embedded in a large DAG, such as the following figure, it will produce a "dead wait" state:

In the figure above, MainFlowThread waits for the end of the SubFlowThread1, SubFlowThread1 waits for the end of the SubFlowThread2, and SubFlowThread2 waits for the end of the SubFlowThread3. If there are new threads in the SubFlowThread3 waiting thread pool, the whole DAG process cannot end and the threads cannot be released. This creates a state in which the child parent process loops and waits. At this point, unless you start a new Master to add threads to break this "deadlock", the scheduling cluster will no longer be available.

It seems a little less than satisfactory to launch a new Master to break the deadlock, so we propose three options to reduce this risk:

Calculate the sum of threads for all Master, and then calculate the number of threads needed for each DAG, that is, pre-calculate before the DAG process executes. Because it is a multi-Master thread pool, the number of buses is unlikely to be obtained in real time. Judge the single Master thread pool and let the thread fail directly if the thread pool is full. Add a Command type with insufficient resources, and suspend the main process if the thread pool is insufficient. In this way, there are new threads in the thread pool, which can reawaken the process that is suspended because of insufficient resources.

Note: the Master Scheduler thread executes as FIFO when it gets the Command.

So we chose the third way to solve the problem of insufficient threads.

Fourth, fault-tolerant design

Fault tolerance is divided into service downtime fault tolerance and task retry, and service downtime fault tolerance is divided into Master fault tolerance and Worker fault tolerance.

1. Downtime fault tolerance

The design of service fault tolerance depends on the Watcher mechanism of ZooKeeper. The implementation principle is shown in figure.

Master monitors the directories of other Master and Worker. If remove events are monitored, process instance fault tolerance or task instance fault tolerance will be implemented according to the specific business logic.

Master fault tolerance flowchart:

After the completion of ZooKeeper Master fault tolerance, it is rescheduled by the Scheduler thread in EasyScheduler, traversing the DAG to find the "running" and "submitted successfully" tasks, monitoring the status of its task instance for the "running" task, judging whether the "submitted successful" task already exists in the Task Queue, if so, also monitoring the status of the task instance, and resubmitting the task instance if it does not exist.

Worker fault tolerance flowchart:

Once the Master Scheduler thread finds that the task instance is in a fault-tolerant state, it takes over the task and resubmits it.

Note: because the "network jitter" may cause the node to lose the heartbeat of and ZooKeeper in a short period of time, the remove event of the node occurs. In this case, we use the easiest way, that is, once a node times out to connect to ZooKeeper, it directly stops the Master or Worker service.

two。 Task failed and try again

First of all, we need to distinguish the concepts of task failure retry, process failure recovery, and process failure rerun:

Task failure retry is task-level and is automatically carried out by the scheduling system. For example, if a Shell task sets the number of retries to 3, then the Shell task will try to run a maximum of 3 process retries after it fails. The recovery is process-level and manual, and the recovery is performed only from the failed node or from the current node. The process failure rerun is also process-level and manual. The rerun is carried out from the starting node.

To get to the point, we divide the task nodes in the workflow into two types.

One is the business node, which corresponds to an actual script or processing statement, such as Shell node, MR node, Spark node, dependent node, and so on.

There is also a logical node, which does not do actual script or statement processing, but only the logical processing of the whole process flow, such as sub-process sections and so on.

Each business node can configure the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the configured number of retries. Logical nodes do not support failed retries. But the tasks in the logical node support retry.

If a task in the workflow fails to reach the maximum number of retries, the workflow will fail to stop, and the failed workflow can manually rerun or resume the process.

V. Task priority design

In the early scheduling design, if there is no priority design and fair scheduling design is adopted, we will encounter the situation that the first submitted task may be completed at the same time as the subsequent submitted task, and the priority of the process or task cannot be set. Therefore, we have redesigned this, and the current design is as follows:

According to the priority of different process instances, the priority of the same process instance takes precedence over the priority of tasks in the same process, and the order of task submission in the same process is from high to low.

The specific implementation is to parse the priority according to the json of the task instance, and then save the process instance priority _ process instance id_ task priority _ task id information in the ZooKeeper task queue. When obtained from the task queue, the most priority tasks can be obtained by string comparison.

The priority of the process definition is to take into account that some processes need to be processed before others, which can be configured when the process is started or timed, with a total of five levels, namely HIGHEST, HIGH, MEDIUM, LOW and LOWEST. The figure below is as follows

-the priority of the task is also divided into 5 levels, which are HIGHEST, HIGH, MEDIUM, LOW and LOWEST in turn. The figure below is as follows

6. Logback and gRPC implement log access

Because Web (UI) and Worker are not necessarily on the same machine, viewing logs cannot be like querying local files. There are two options:

Put the log on the ES search engine to obtain remote log information through gRPC communication. Considering the lightweight nature of EasyScheduler as much as possible, gRPC is selected to realize remote access to log information.

We use the FileAppender and Filter features of the custom Logback to generate a log file for each task instance.

The main implementation of FileAppender is as follows:

/ * task log appender*/public class TaskLogAppender extends FileAppender

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report