In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to use Python Web's Flask". In daily operation, I believe many people have doubts about how to use Python Web's Flask. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the questions of "how to use Python Web's Flask"! Next, please follow the editor to study!
Flask is a famous lightweight synchronous web framework in Python. In some development, you may encounter tasks that need to be processed for a long time. At this time, you need to use an asynchronous method to let the long-time task run in the background. First, return the response status of this request to the front end to prevent the front-end interface from "stuttering". When the asynchronous task is processed, if you need to return the status, then return the status.
How can it be realized?
The way threads are used
When you want to perform a time-consuming task, it is easiest and quickest to start a new thread to execute the task.
Through ThreadPoolExecutor to achieve
From flask import Flaskfrom time import sleepfrom concurrent.futures import ThreadPoolExecutor# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor# create thread pool executor executor = ThreadPoolExecutor (2) app = Flask (_ _ name__) @ app.route ('/ jobs') def run_jobs (): # leave the thread to execute the time-consuming task executor.submit (long_task, 'hello', 123) return' long task running.'# time-consuming task def long_task (arg1 Arg2): print ("args:% s% s!"% (arg1, arg2)) sleep (5) print ("Task is done!") if _ _ name__ = ='_ main__': app.run ()
You can use this method when you want to perform some relatively simple and time-consuming tasks, such as sending email, SMS CAPTCHA, and so on.
But the problem with this approach is that the front end cannot know the status of the task.
If you want the front end to know, you need to design some logic, such as storing the task execution state in redis, identifying it by a unique task id, and then writing an interface to obtain the task status through the task id, and then let the front end request the interface regularly to obtain the task status information.
It's a bit of a hassle to implement it all on your own, and Celery just implements this logic to use it.
Use Celery
To meet the need for the front end to get the status of the task, you can use Celery.
Celery is a distributed task queue for real-time task processing and scheduling, which is often used for web asynchronous tasks, scheduled tasks and so on. A separate article is written to describe the architecture of Celery, which will not be discussed in depth here.
Now I want the front end to use a progress bar to determine the execution of back-end tasks. It's easy to use Celery, first install Celery and redis through pip, and install redis because Celery chooses redis as the "message broker / message middleware".
Pip install celerypip install redis
Using Celery in Flask is actually very simple. Here, we simply go through the overall process of using Celery in Flask, and then implement a specific project.
1. Initialize Celery in Flask
From flask import Flaskfrom celery import Celeryapp = Flask (_ _ name__) # configure # configure the path of the message broker if it is on a remote server Then configure the URLapp.config of the redis in the remote server ['CELERY_BROKER_URL'] =' redis://localhost:6379/0'# to store the status or run results of the Celery task, you must configure app.config ['CELERY_RESULT_BACKEND'] =' redis://localhost:6379/0'# initialization Celerycelery = Celery (app.name, broker=app.config ['CELERY_BROKER_URL']) # pass the configuration in Flask directly to Celerycelery.conf.update (app.config)
In the above code, the celery object is initialized through the Celery class, and the application name passed in is connected to the message broker URL.
two。 Decorate the corresponding functions of time-consuming tasks through the celery.task decorator
@ celery.taskdef long_task (arg1, arg2): # logical return result of time-consuming tasks
Interfaces defined in 3.Flask perform time-consuming tasks asynchronously
@ app.route ('/', methods= ['GET',' POST']) def index (): task = long_task.delay (1,2) delay () method is a shortcut to the applyasync () method. With more applyasync () parameters, time-consuming tasks can be controlled more carefully. For example, if you want long_task () to execute @ app.route ('/', methods= ['GET',' POST']) def index () after one minute: task = long_task.apply_async (args= [1,2], countdown=60) delay () and apply_async () will return a task object that can get the status of the task and all kinds of related information.
With these three steps, you can use Celery.
Then we will specifically realize the requirement of "allowing the front end to judge the execution of back-end tasks through a progress bar".
# bind is True Self will be passed to the decorated method @ celery.task (bind=True) def long_task (self): verb = ['Starting up',' Booting', 'Repairing',' Loading', 'Checking'] adjective = [' master', 'radiant',' silent', 'harmonic',' fast'] noun = ['solar array',' particle reshaper', 'cosmic ray',' orbiter', 'bit'] message =' total = random.randint (10 50) for i in range (total): if not message or random.random ()
< 0.25: # 随机的获取一些信息 message = '{0} {1} {2}...'.format(random.choice(verb), random.choice(adjective), random.choice(noun)) # 更新Celery任务状态 self.update_state(state='PROGRESS', meta={'current': i, 'total': total, 'status': message}) time.sleep(1) # 返回字典 return {'current': 100, 'total': 100, 'status': 'Task completed!', 'result': 42} 上述代码中,celery.task () 装饰器使用了 bind=True 参数,这个参数会让 Celery 将 Celery 本身传入,可以用于记录与更新任务状态。 然后就是一个 for 迭代,迭代的逻辑没什么意义,就是随机从 list 中抽取一些词汇来模拟一些逻辑的运行,为了表示这是耗时逻辑,通过 time.sleep (1) 休眠一秒。 每次获取一次词汇,就通过 self.update_state () 更新 Celery 任务的状态,Celery 包含一些内置状态,如 SUCCESS、STARTED 等等,这里使用了自定义状态「PROGRESS」,除了状态外,还将本次循环的一些信息通过 meta 参数 (元数据) 以字典的形式存储起来。有了这些数据,前端就可以显示进度条了。 定义好耗时方法后,再定义一个 Flask 接口方法来调用该耗时方法 @app.route('/longtask', methods=['POST'])def longtask(): # 异步调用 task = long_task.apply_async() # 返回 202,与Location头 return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)} 简单而言,前端通过 POST 请求到 /longtask,让后端开始去执行耗时任务。 返回的状态码为 202,202 通常表示一个请求正在进行中,然后还在返回数据包的包头 (Header) 中添加了 Location 头信息,前端可以通过读取数据包中 Header 中的 Location 的信息来获取任务 id 对应的完整 url。 前端有了任务 id 对应的 url 后,还需要提供一个接口给前端,让前端可以通过任务 id 去获取当前时刻任务的具体状态。 @app.route('/status/')def taskstatus(task_id): task = long_task.AsyncResult(task_id) if task.state == 'PENDING': # 在等待 response = { 'state': task.state, 'current': 0, 'total': 1, 'status': 'Pending...' } elif task.state != 'FAILURE': # 没有失败 response = { 'state': task.state, # 状态 # meta中的数据,通过task.info.get()可以获得 'current': task.info.get('current', 0), # 当前循环进度 'total': task.info.get('total', 1), # 总循环进度 'status': task.info.get('status', '') } if 'result' in task.info: response['result'] = task.info['result'] else: # 后端执行任务出现了一些问题 response = { 'state': task.state, 'current': 1, 'total': 1, 'status': str(task.info), # 报错的具体异常 } return jsonify(response) 为了可以获得任务对象中的信息,使用任务 id 初始化 AsyncResult 类,获得任务对象,然后就可以从任务对象中获得当前任务的信息。 该方法会返回一个 JSON,其中包含了任务状态以及 meta 中指定的信息,前端可以利用这些信息构建一个进度条。 如果任务在 PENDING 状态,表示该任务还没有开始,在这种状态下,任务中是没有什么信息的,这里人为的返回一些数据。如果任务执行失败,就返回 task.info 中包含的异常信息,此外就是正常执行了,正常执行可以通 task.info 获得任务中具体的信息。 这样,后端的逻辑就处理完成了,接着就来实现前端的逻辑,要实现图形进度条,可以直接使用 nanobar.js,简单两句话就可以实现一个进度条,其官网例子如下: var options = { classname: 'my-class', id: 'my-id', // 进度条要出现的位置 target: document.getElementById('myDivId')};// 初始化进度条对象var nanobar = new Nanobar( options );nanobar.go( 30 ); // 30% 进度条nanobar.go( 76 ); // 76% 进度条// 100% 进度条,进度条结束nanobar.go(100); 有了 nanobar.js 就非常简单了。 先定义一个简单的 HTML 界面 Long running task with progress updatesStart Long Calculation 通过 JavaScript 实现对后台的请求 // 按钮点击事件$(function() { $('#start-bg-job').click(start_long_task); });// 请求 longtask 接口function start_long_task() { // 添加元素在html中 div = $('0%... '); $('#progress').append(div); // 创建进度条对象 var nanobar = new Nanobar({ bg: '#44f', target: div[0].childNodes[0] }); // ajax请求longtask $.ajax({ type: 'POST', url: '/longtask', // 获得数据,从响应头中获取Location success: function(data, status, request) { status_url = request.getResponseHeader('Location'); // 调用 update_progress() 方法更新进度条 update_progress(status_url, nanobar, div[0]); }, error: function() { alert('Unexpected error'); } }); }// 更新进度条function update_progress(status_url, nanobar, status_div) { // getJSON()方法是JQuery内置方法,这里向Location中对应的url发起请求,即请求「/status/」 $.getJSON(status_url, function(data) { // 计算进度 percent = parseInt(data['current'] * 100 / data['total']); // 更新进度条 nanobar.go(percent); // 更新文字 $(status_div.childNodes[1]).text(percent + '%'); $(status_div.childNodes[2]).text(data['status']); if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') { if ('result' in data) { // 展示结果 $(status_div.childNodes[3]).text('Result: ' + data['result']); } else { // 意料之外的事情发生 $(status_div.childNodes[3]).text('Result: ' + data['state']); } } else { // 2秒后再次运行 setTimeout(function() { update_progress(status_url, nanobar, status_div); }, 2000); } }); } 可以通过注释阅读代码整体逻辑。 至此,需求实现完了,运行一下。 首先运行 Redis redis-server 然后运行 celery celery worker -A app.celery --loglevel=info 最后运行 Flask 项目 python app.py 效果如下:At this point, the study on "how to use Python Web's Flask" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.