In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/03 Report--
@ [toc]
Manage AirFlow method process management tool Supervisord
Install the process management tool Supervisord to manage airflow processes
Easy_install supervisor # this method is not suitable for python3 installation (there will be a lot of problems) echo_supervisord_conf > / etc/supervisord.conf edit file supervisord.conf, add startup command vi / etc/supervisord.conf
[program:airflow_web]
Command=/usr/bin/airflow webserver-p 8080
[program:airflow_worker]
Command=/usr/bin/airflow worker
[program:airflow_scheduler]
Command=/usr/bin/airflow scheduler
> 3. Start the supervisord service
/ usr/bin/supervisord-c / etc/supervisord.conf
> 4. Now you can use supervisorctl to manage the airflow service
Supervisorctl start airflow_web
Supervisorctl stop airflow_web
Supervisorctl restart airflow_web
Supervisorctl stop all
# process management tool systemd > 1. Vim / etc/sysconfig/airflow # systemd needs to call this file, which generally defines the variables of airflow
AIRFLOW_CONFIG=/root/airflow/airflow.cfg
AIRFLOW_HOME=/root/airflow
> 2. Vim / usr/lib/systemd/system/airflow-webserver.service # systemctl managed service name > other services can also be defined in this way
[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service
Wants=postgresql.service mysql.service redis.service
[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=root
Group=root
Type=simple
ExecStart=/bin/bash-c "export PATH=$ {PATH}: / usr/local/python3/bin/; / usr/local/python3/bin/airflow webserver-p 8080-- pid / root/airflow/service/webserver.pid-A / root/airflow/service/webserver.out-E / root/airflow/service/webserver.err-1 / root/airflow/service/webserver.log"
KillMode=process
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
> 3. Systemctl daemon-reload # load service > 4. Systemctl status airflow-webserver.service # view service status You can manage airflow``bash #! / bin/bash#=== This is the function about airflow webserver service = webserver_status () {echo-e "\ e [36m Checking service status, please wait.\ e [0m" sleep 3 Status= `ps-elf | grep "airflow [-] webserver" | wc-l`bash [$Status-eq 0] Then echo-e "\ e [31m webserver is stop!!\ e [0m" else echo-e "\ e [32m webserver is running... \ e [0m "fi} webserver_start () {echo-e"\ e [36m Starting airflow webserver.\ e [0m "sleep 1 nohup / usr/local/python3/bin/airflow webserver > > / root/airflow/service/webserver.log 2 > & 1 & webserver_status} webserver_stop () {echo-e"\ e [36m Stopping airflow webserver.\ e [0m "sleep 1 / usr/bin/kill-9 `ps -elf | grep "airflow [-] webserver" | grep-v grep | awk-F "'{print $4} '`rm-rf / root/airflow/airflow-webserver.pid webserver_status} # = = This is the function about airflow scheduler service = = scheduler_status () {echo-e"\ e [36m Checking service status Please wait.\ e [0m "sleep 3 Status= `ps-elf | grep" airflow [-] scheduler "| wc-l` if [$Status-eq 0] Then echo-e "\ e [31m scheduler is stop!!\ e [0m" else echo-e "\ e [32m scheduler is running... \ e [0m "fi} scheduler_start () {echo-e"\ e [36m Starting airflow scheduler.\ e [0m "sleep 1 nohup / usr/local/python3/bin/airflow scheduler > > / root/airflow/service/scheduler.log 2 > & 1 & scheduler_status} scheduler_stop () {echo-e"\ e [36m Stopping airflow scheduler.\ e [0m "sleep 1 / usr/bin/kill-9 `ps -elf | grep "airflow [-] scheduler" | grep-v grep | awk-F "'{print $4} '`rm-rf / root/airflow/airflow-scheduler.pid scheduler_status} # = = This is the function about airflow flower service = = flower_status () {echo-e"\ e [36m Checking service status Please wait.\ e [0m "sleep 3 Status= `netstat-anputl | grep 5555 | grep LISTEN | awk-F"'{print $7}'| awk-F" / "'{print $1}'| wc-l`if [$Status-eq 0] Then echo-e "\ e [31m flower is stop!!\ e [0m" else echo-e "\ e [32m flower is running... \ e [0m "fi} flower_start () {echo-e"\ e [36m Starting airflow flower.\ e [0m "sleep 1 nohup / usr/local/python3/bin/airflow flower > > / root/airflow/service/flower.log 2 > & 1 & flower_status} flower_stop () {echo-e"\ e [36m Stopping airflow flower.\ e [0m "sleep 1 / usr/bin/kill-9 `netstat -anputl | grep 5555 | grep LISTEN | awk-F "'{print $7}'| awk-F" / "'{print $1} '`rm-rf / root/airflow/airflow-flower.pid flower_status} # = = This is the function about airflow worker service = = worker_status () {echo-e"\ e [36m Checking service status Please wait.\ e [0m "sleep 3 Status= `ps-elf | grep" airflow serve_logs "| grep-v grep | wc-l`celeryStatus= `ps-elf | grep celery | grep-v grep | wc-l`if [$Status-eq 0] Then if [$celeryStatus-eq 0]; then echo-e "\ e [31m worker is stop!!\ e [0m" else echo-e "\ e [32m worker is running... \ e [0m "fi else echo-e"\ e [32m worker is running... \ e [0m "fi} worker_start () {echo-e"\ e [36m Starting airflow worker.\ e [0m "sleep 1 nohup / usr/local/python3/bin/airflow worker > > / root/airflow/service/worker.log 2 > & 1 & worker_status} worker_stop () {echo-e"\ e [36m Stopping airflow worker.\ e [0m "sleep 1 / usr/bin/kill-9 `ps -elf | grep "airflow serve_logs" | grep-v grep | awk-F ""'{print $4}'`/ usr/bin/kill-9 `ps-elf | grep celery | grep-v grep | awk-F "'{print $4} '`rm-rf / root/airflow/airflow-worker.pid worker_status} # = = This is the startup option for the airflow service = = case" $2 "in start) case" $1 "in webserver) webserver_start ; worker) worker_start;; scheduler) scheduler_start;; flower) flower_start;; all) webserver_start scheduler_start flower_start worker_start *) echo-e "\ n A tool used for starting airflow servicesUsage: airflow.sh {webserver | worker | scheduler | flower | all} {start | stop | status}" usage exit 2 esac;; stop) case "$1" in webserver) webserver_stop;; worker) worker_stop;; scheduler) scheduler_stop;; flower) flower_stop All) worker_stop flower_stop scheduler_stop webserver_stop;; *) echo-e "\ n A tool used for starting airflow servicesUsage: airflow.sh {webserver | worker | scheduler | flower} {start | stop | status}" usage exit 3 esac;; status) case "$1" in webserver) webserver_status;; worker) worker_status Scheduler) scheduler_status;; flower) flower_status;; all) webserver_status scheduler_status flower_status worker_status;; *) echo-e "\ n A tool used for starting airflow servicesUsage: airflow.sh {webserver | worker | scheduler | flower | all} {start | stop | status}" usage exit 4 esac *) echo-e "\ n A tool used for starting airflow servicesUsage: airflow.sh {webserver | worker | scheduler | flower | all} {start | stop | status}" the modification of usage exit 1esac to obtain log information goes to incubator-airflow/airflow/www/ and modifies views.py
Add the following code @ expose ('/ logs') @ login_required@wwwutils.action_loggingdef logs (self) to class Airflow (BaseView): BASE_LOG_FOLDER = os.path.expanduser (conf.get ('core') 'BASE_LOG_FOLDER')) dag_id = request.args.get (' dag_id') task_id = request.args.get ('task_id') execution_date = request.args.get (' execution_date') dag = dagbag.get_dag (dag_id) log_relative = "{dag_id} / {task_id} / {execution_date}" .format (* * locals () loc = os.path.join (BASE_LOG_FOLDER Log_relative) loc = loc.format (* * locals ()) log = "" TI = models.TaskInstancesession = Session () dttm = dateutil.parser.parse (execution_date) ti = session.query (TI) .filter (TI.dag_id = = dag_id, TI.task_id = = task_id TI.execution_date = = dttm). First () dttm = dateutil.parser.parse (execution_date) form = DateTimeForm (data= {'execution_date': dttm}) if ti: host = ti.hostname log_loaded = False if os.path.exists (loc): try: F = open (loc) log + = ".join (f.readlines ()) F.close () log_loaded = True except: log = "* Failed to load local log file: {0}.\ n" .format (loc) else: WORKER_LOG_SERVER_PORT =\ conf.get ('celery' 'WORKER_LOG_SERVER_PORT') url = os.path.join ("http://{host}:{WORKER_LOG_SERVER_PORT}/log", Log_relative) .format (* * locals () log + = "* Log file isn't local.\ n" log + = "* * Fetching here: {url}\ n" .format (* * locals ()) try: import requests timeout = None # No timeout try: Timeout = conf.getint ('webserver' 'log_fetch_timeout_sec') except (AirflowConfigException, ValueError): pass response = requests.get (url) Timeout=timeout) response.raise_for_status () log + ='\ n'+ response.text log_loaded = True except: log + = "* Failed to fetch log file from worker.\ n" .format (* * locals ()) if not log_loaded: # load remote logs Remote_log_base = conf.get ('core' 'REMOTE_BASE_LOG_FOLDER') remote_log = os.path.join (remote_log_base, log_relative) log + ='\ nMays * Reading remote logs...\ n' # S3 if remote_log.startswith: log + = log_utils.S3Log () .read (remote_log) Return_error=True) # GCS elif remote_log.startswith ('gs:/'): log + = log_utils.GCSLog (). Read (remote_log, return_error=True) # unsupported elif remote_log: log + =' * * Unsupported remote log location.' Session.commit () session.close () if PY2 and not isinstance (log, unicode): log = log.decode ('utf-8') title = "Log" return wwwutils.json_response (log) > 3. Restart the service and access the url such as:
Http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11
> you can get this task in execution_date=2018-01-11 log # delete DAG > because the deletion of dag officially does not expose the direct api, and the complete deletion involves multiple tables, it is concluded that the sql for deleting dag is as follows
Set @ dag_id = 'BAD_DAG'
Delete from airflow.xcom where dag_id = @ dag_id
Delete from airflow.task_instance where dag_id = @ dag_id
Delete from airflow.sla_miss where dag_id = @ dag_id
Delete from airflow.log where dag_id = @ dag_id
Delete from airflow.job where dag_id = @ dag_id
Delete from airflow.dag_run where dag_id = @ dag_id
Delete from airflow.dag where dag_id = @ dag_id
# Cluster management script # Cluster service launch script # Cluster service launch script ``bashmanagement upload usrbinbinv bashfunction usage () {echo-e "\ n A tool used for starting airflow servicesUsage: 200.sh {webserver | worker | flower}" PORT=8081ROLE=webserverENV_ARGS= "" check_alive () {PID= `cluster-nlpt | grep $PORT | awk'{print $7}'| awk-F "/"'{print $1}'`[- n "$PID"] & return 0 | | return 1} check_scheduler_alive () {PIDS= `ps-ef | grep "/ usr/local/bin/airflow scheduler" | grep "python" | awk'{print $2}'`[- n "$PIDS"] & return 0 | | return 1} function get_host_ip () {local host=$ (ifconfig | grep "inet" | grep "\->" | awk'{print $2}'| tail-1) if [- z "$host"]] Then host=$ (ifconfig | grep "inet" | grep "broadcast" | awk'{print $2}'| tail-1) fi echo "${host}"} start_service () {if [$ROLE = 'scheduler']; then check_scheduler_alive else check_alive fi if [$?-ne 0] Then nohup airflow $ROLE $ENV_ARGS > $BASE_LOG_DIR/$ROLE/$ROLE.log 2 > & 1 & sleep 5 if [$ROLE = 'scheduler']; then check_scheduler_alive else check_alive fi if [$?-ne 0] Then echo "service start error" exit 1 else echo "service start success" exit 0 fi else echo "service alreay started" exit 0 fi} function main () {if [- z "${POOL}"] Then echo "the environment variable POOL cannot be empty" exit 1 fi source / data0/hcp/sbin/init-hcp.sh case "$1" in webserver) echo "starting airflow webserver" ROLE=webserver PORT=8081 start_service Worker) echo "starting airflow worker" ROLE=worker PORT=8793 local host_ip=$ (get_host_ip) ENV_ARGS= "- cn ${host_ip} @ ${host_ip}" start_service Flower) echo "starting airflow flower" ROLE=flower PORT=5555 start_service;; scheduler) echo "starting airflow scheduler" ROLE=scheduler start_service *) usage exit 1 esac} main "$@" Cluster service offline script #! / usr/bin/env bashfunction usage () {echo-e "\ n A tool used for stop airflow servicesUsage: 200.sh {webserver | worker | scheduler | flower}"} function get_host_ip () {local host=$ (ifconfig | grep "inet" | grep "\->" | awk'{print $2}'| tail-1) If [[- z "$host"]] Then host=$ (ifconfig | grep "inet" | grep "broadcast" | awk'{print $2}'| tail-1) fi echo "${host}"} function main () {if [- z "${POOL}"] Then echo "the environment variable POOL cannot be empty" exit 1 fi source / data0/hcp/sbin/init-hcp.sh case "$1" in webserver) echo "stopping airflow webserver" cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill-9 Worker) echo "stopping airflow worker" PORT=8793 PID= `netstat-nlpt | grep $PORT | awk'{print $7}'| awk-F "/"'{print $1} '`kill-9$ PID local host_ip=$ (get_host_ip) ps-ef | grep celeryd | grep ${host_ip} @ ${host_ip} | awk' {print $2}'| xargs kill-9 Flower) echo "stopping airflow flower" PORT=5555 PID= `netstat-nlpt | grep $PORT | awk'{print $7}'| awk-F "/"'{print $1} '`kill-9$ PID start_service Scheduler) echo "stopping airflow scheduler" PID= `ps-ef | grep "/ usr/local/bin/airflow scheduler" | grep "python" | awk'{print $2} '`kill-9$ PID;; *) usage exit 1 esac} main "$@" modify ariflow time zone problem
Airflow uses utc time by default, and + 8 hours is the local time in China time zone. Here, modify airflow to China time zone and take everyone to change the airflow source code. Here, it is mainly modified for airflow version 1.10.0. Other versions are more or less the same, you can refer to the modification.
Modify airflow.cfg in the airflow home directory and set the
Default_timezone = Asia/Shanghai
Enter the installation location of the airflow package, that is, the location of site-packages. The following modification files are all relative to each other.
This is the location where I installed the airflow package (for your own reference)
Cd / usr/local/python3/lib/python3.6/site-packages/airflow
Modify utils/timezone.py
# add from airflow import configuration as conftry:tz = conf.get ("core") under the line utc = pendulum.timezone ('UTC') (line 27) "default_timezone") if tz = = "system": utc = pendulum.local_timezone () else:utc = pendulum.timezone (tz) except Exception:pass# modifies the utcnow () function (on line 69) the original code d = dt.datetime.utcnow () is changed to d = dt.datetime.now () modify utils/sqlalchemy.py# to add from airflow import configuration as conftry:tz = conf.get ("core") to the line utc = pendulum.timezone ('UTC') (line 37) "default_timezone") if tz = = "system": utc = pendulum.local_timezone () else:utc = pendulum.timezone (tz) except Exception:pass
Comment on cursor.execute in utils/sqlalchemy.py ("SET time_zone ='+ 00SET time_zone 00'") (line 124)
> 5. Modify www/templates/admin/master.html (line 31) ```python to change the code var UTCseconds = (x.getTime () + x.getTimezoneOffset () * 6001000); change it to var UTCseconds = x.getTime (); change the code "timeFormat": "H:i:s% UTC%" to "timeFormat": "H:i:s", and finally restart airflow-webserver
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.