In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the relevant knowledge of how to achieve collaborative process in python, the content is detailed and easy to understand, the operation is simple and fast, and it has a certain reference value. I believe you will gain something after reading this python article on how to achieve collaborative process. Let's take a look at it.
The definition of cooperative process
Coroutine, also known as microthreading, fiber. (a co-program is a lightweight thread in user mode.)
Function: when executing the A function, you can interrupt at any time to execute the B function, then interrupt the B function and continue to execute the A function (which can be switched automatically), but this process is not a function call (there is no calling statement). The process is very similar to multithreading, but only one thread is executing the co-program.
Popular understanding: a function in a thread can save some temporary variables and other information of the current function anywhere, and then switch to another function to execute, note that it is not done by calling the function, and the number of switches and when to switch to the original function are determined by the developer.
Co-program and thread differences
When implementing multitasking, thread switching is much more than saving and restoring the CPU context at the system level. Operating system for the efficient running of the program, each thread has its own cache of Cache and other data, the operating system will also help you to do these data recovery operations. So thread switching is very performance-consuming. However, the switching of the cooperative program is only a simple operation of the context of CPU, so the system can resist switching millions of times a second.
The standard of cooperative process
Concurrency must be implemented in only one single thread
No lock is required to modify shared data
A context stack that saves multiple control flows in a user's program.
A co-program automatically switches to another co-program when it encounters an IO operation.
The advantages of cooperative process
Because it has context and stack, it does not need the overhead of thread context switching, and belongs to program-level switching, which is completely imperceptible to the operating system, so it is more lightweight.
No locking and synchronization overhead without atomic operations
Easy to switch control flow and simplify programming model
Concurrency can be achieved within a single thread, maximizing the use of cpu, with high scalability and low cost.
The shortcomings of the cooperative process
Unable to utilize multi-core resources: the essence of the cooperative program is a single thread, it cannot use multiple cores of a single CPU at the same time, and the cooperative program needs to cooperate with the process to run on the multi-CPU.
Performing a Blocking operation, such as IO, will block the entire program
Computational operations, which use cooperative programs to switch back and forth, do not make any sense, but switching back and forth and saving state will degrade performance.
The way to realize the Cooperative Program in python
Greenlet, a third-party module used to implement the protocol code (Gevent protocol is based on greenlet implementation)
Yield, generator, with the help of the characteristics of the generator can also implement the co-program code.
Asyncio, the module introduced in Python3.4 is used to write co-program code.
Async & awiat, two keywords introduced in Python3.5, combined with asyncio module, it is more convenient to write protocol code (recommended).
Async&await keyword
There are many ways to realize the collaborative program. At present, the most popular way is async&await. You can learn about it in other ways. This article introduces the most popular way.
To use the co-program, you need to know 2, the event loop and define the co-program function.
Event cycle
An event loop is an effective way to deal with multiple concurrency, which can be understood as a dead loop, during which some code is detected and executed. let's take a look at the following pseudo-code
Task list = [Task 1, Task 2, Task 3.] while True: list of tasks that can be executed, list of tasks completed = check all tasks in the task list Return 'executable' and 'completed' tasks to for ready tasks in executable task list: execute ready tasks for completed tasks in completed tasks list: remove completed tasks from the task list if all tasks in the task list are completed, the loop is terminated
The pseudo code above means: get the event loop, then keep listening to the task list, execute the task, and remove the completed task until all the tasks in the task list are completed, terminating the loop
Benefits of using event loops: programmers do not have to control the addition and deletion of tasks and the control of events
The code is written as follows:
Import asyncio# gets event loop loop = asyncio.get_event_loop () # put task into `task list `, listen event loop loop.run_until_complete (task) # close event loop.close () co-program function and co-program object
To define a co-program function, format: async def function name co-program object: the co-program object obtained by executing the co-program function ()
# define the async def func (): pass# create a cooperative object result = func ()
Note: execute the co-program function, create the co-program object, the function code will not run, if you want to run the internal code of the co-program function, you must give the co-program object to the event loop to deal with, see the following code
Import asyncioasync def func (): print ("Hello") result = func () # method 1loop = asyncio.get_event_loop () loop.run_until_complete (result) # method 2asyncio.run (result) # python3.7 await
Await is a keyword that can only be used in the coprogram function. It is used to suspend the current coprogram (task) when it encounters an IO operation. During the suspension of the current coprogram (task), the event loop can execute other coprograms (tasks). When the IO processing of the current coprogram is completed, you can switch back to execute the code after await.
For example: we have created two tasks, one to download pictures and the other to download videos. We first perform the task of downloading pictures. When we encounter io operation, we normally wait for pictures to finish downloading, but await can suspend the task of downloading pictures first, and then automatically switch to the task of downloading videos.
Usage: await + waitable objects (Collaborative object, Future object, Task object)
Case 1
Import asyncioasync def func (): print ("execute the internal code of the co-program function") # suspends the current protocol (task) when it encounters an IO operation, and waits for the IO operation to complete before continuing to execute. # when the current co-program is suspended, the event loop can execute other collaborators (tasks). Response = await asyncio.sleep (2) print ("IO request ends and the result is:", response) result = func () asyncio.run (result)
Case list 2
Import asyncioasync def others (): print ("start") # ④ print start await asyncio.sleep (2) # ⑤ wait 2 seconds. During this process, you can switch to print ("end") # ⑥ print end return 'return value' async def func (): print ("execute the inner code of the co-program function") # ② execute the co-program function Print print code response = await others () # ③ wait for others print (f "io request ends, result is {response}") # ⑦ wait for print statement if _ _ name__ = ='_ main__': asyncio.run (func ()) # ① to run in event loop after others ends
All of the above examples create only one task, that is, there is only one task in the task list of the event loop, so you cannot demonstrate the effect of switching to another task while IO is waiting. If you want to create multiple task objects in the program, you need to use the Task object to achieve it.
Task object
Tasks is used for concurrent scheduling protocols. Task objects are created by asyncio.create_task (cooperative program objects), which allows the co-programs to join the event loop waiting to be scheduled for execution. In addition to using the asyncio.create_task () function, you can also use the lower-level loop.create_task () or ensure_future () functions. Manual instantiation of Task objects is not recommended.
In essence, the co-program object is encapsulated into a task object, and the co-program is immediately added to the event loop, while tracking the status of the co-program.
Note: the asyncio.create_task () function is added in Python 3.7. Prior to Python 3.7, the lower-level asyncio.ensure_future () function could be used instead.
Case 1
Import asyncioasync def func (): print (1) await asyncio.sleep (2) print (2) return "return value" async def main (): print ("main start") # create a coprogram, encapsulate it in a Task object and immediately add it to the task list of the event loop, waiting for the event loop to execute (the default is ready). Task1 = asyncio.create_task (func ()) # creates a protocol, encapsulates it in a Task object and immediately adds it to the task list of the event loop, waiting for the event loop to execute (the default is ready). Task2 = asyncio.create_task (func ()) print ("main ends") # when executing a protocol that encounters an IO operation, it automatically switches to perform other tasks. # the await here waits for all the corresponding protocols to be executed and gets the result ret1 = await task1 ret2 = await task2 print (ret1, ret2) asyncio.run (main ())
Case await+ task list (most used)
Import asyncioasync def func (): print (1) await asyncio.sleep (2) print (2) return "return value" async def main (): print ("main start") # create a coprogram, encapsulate it in a Task object and add it to the task list of the event loop, waiting for the event loop to execute (the default is ready). # when calling task_list = [asyncio.create_task (func ()), asyncio.create_task (func ())] print ("main ends") # when executing a protocol that encounters an IO operation, it will automatically switch to perform other tasks. # the await here waits for all the cooperative programs to be executed, and saves the return values of all the cooperative programs to the done # if the timeout value is set, it means that the maximum number of seconds to wait here is written to the done, and if it is not completed, the return value is written to the pending. Done, pending = await asyncio.wait (task_list) print (done) asyncio.run (main ())
Note: the asyncio.wait source code internally executes ensure_future on each of the coprograms in the list to encapsulate it as a Task object, so the value of task_list is [func (), func ()] when used with wait.
Asyncio.Future object
The Future object in asyncio is a relatively low-level object. Usually we do not use this object directly, but directly use the Task object to track the union and state of the task. (Task is a subclass of Futrue)
Future provides us with the processing of the final result in asynchronous programming (the Task class also has the function of state handling)
Case 1
Async def main (): # get the current event loop loop = asyncio.get_running_loop () # create a task (Future object) that does nothing. Fut = loop.create_future () # wait for the final result of the task (Future object). If there is no result, it will wait forever. Await futasyncio.run (main ())
The result is that the program has been waiting and cannot be finished.
Case 2
Import asyncioasync def set_after (fut): await asyncio.sleep (2) fut.set_result ("666") async def main (): # get the current event loop loop = asyncio.get_running_loop () # create a task (Future object) without binding any behavior, the task will never know when it will end. Fut = loop.create_future () # create a task (Task object), bind the set_after function, and assign a value to fut after 2s inside the function. # that is, manually set the final result of the future task, then the fut can be finished. Await loop.create_task (set_after (fut)) # wait for the Future object to get the final result, otherwise data = await fut print (data) asyncio.run (main ())
The Future object itself functions bind, so if you want the event loop to get the result of the Future, you need to set it manually. While the Task object inherits the Future object, it actually extends Future. It can automatically execute set_result after the execution of the corresponding binding function is completed, thus realizing the automatic termination.
Although Task objects are usually used, the processing of results is essentially based on Future objects.
Futures.Future object
There is also a Future object in the concurrent.futures module of Python, which is used to implement asynchronous operations based on thread pool and process pool.
Import timefrom concurrent.futures import Futurefrom concurrent.futures.thread import ThreadPoolExecutorfrom concurrent.futures.process import ProcessPoolExecutordef func (value): time.sleep (1) print (value) pool = ThreadPoolExecutor (max_workers=5) # or pool = ProcessPoolExecutor (max_workers=5) for i in range (10): fut = pool.submit (func, I) print (fut)
The two Future objects are different, they are designed for different application scenarios, for example: concurrent.futures.Future does not support await syntax.
A function asynic.wrap_future that wraps a futures.Future object into an asyncio.Future object is provided in Python.
Next you must ask: why does python provide this feature?
In fact, in general, in program development, we either uniformly use asycio's co-programs to achieve asynchronous operations, or we all use process pools and thread pools to achieve asynchronous operations. However, this feature will be used if the async of the co-program and the async mash-up of the process pool / thread pool.
Import timeimport asyncioimport concurrent.futuresdef func1 (): # A time-consuming operation time.sleep (2) return "OK" async def main (): loop = asyncio.get_running_loop () # method 1. Run in the default loop's executor (default ThreadPoolExecutor) # first step: internally, the submit method of ThreadPoolExecutor will be called to apply for a thread to execute the func1 function in the thread pool And return a concurrent.futures.Future object # step 2: call asyncio.wrap_future to wrap the concurrent.futures.Future object as an asycio.Future object. # because the concurrent.futures.Future object does not support await syntax, it needs to be wrapped as an asycio.Future object to use it. Fut = loop.run_in_executor (None, func1) result = await fut print ('default thread pool', result) # method 2. Run in a custom thread pool: # with concurrent.futures.ThreadPoolExecutor () as pool: # result = await loop.run_in_executor (# pool, func1) # print (' custom thread pool' Result) # method 3. Run in a custom process pool: # with concurrent.futures.ProcessPoolExecutor () as pool: # result = await loop.run_in_executor (# pool, func1) # print ('custom process pool', result) asyncio.run (main ())
Application scenario: when a project is developed with asynchronous programming of co-programming, you need to use this feature if you want to use a third-party module that does not support asynchronous programming of co-programming, such as the requests module:
Import asyncioimport requestsasync def download_image (url): # send a network request to download pictures (encounter an IO request to download pictures on the network, automatically switch to other tasks) print ("start downloading:", url) loop = asyncio.get_event_loop () # requests module does not support asynchronous operation by default, so thread pool is used to cooperate with the implementation. Future = loop.run_in_executor (None, requests.get, url) response = await future print ('download completed') # the picture is saved to the local file file_name = url.rsplit ('_') [- 1] with open (file_name Mode='wb') as file_object: file_object.write (response.content) if _ _ name__ = ='_ main__': url_list = ['https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',' https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg', 'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'] tasks = [download_image (url) for url in url_list] loop = asyncio.get_event_loop () loop.run_until_complete (asyncio.wait (tasks)) Asynchronous iterator what is an asynchronous iterator?
Objects that implement the _ _ aiter__ () and _ _ anext__ () methods. _ _ anext__ must return an awaitable object. Async for handles the waitable object returned by the _ _ anext__ () method of the asynchronous iterator until it throws a StopAsyncIteration exception.
What is an asynchronous iterable object?
An object that can be used in an async for statement. An asynchronous iterator must be returned through its _ _ aiter__ () method.
Import asyncioclass Reader (object): "" Custom asynchronous iterator (which is also an asynchronous iterating object) "" def _ _ init__ (self): self.count = 0 async def readline (self): # await asyncio.sleep (1) self.count + = 1 if self.count = 100: return None return self.count def _ _ aiter__ (self): return self async def _ _ anext__ (self): val = await self.readline () if val = = None: raise StopAsyncIteration return valasync def func (): # create an asynchronous iterable object async_iter = Reader () # async for must be placed in the async def function Otherwise, the syntax is wrong. Async for item in async_iter: print (item) asyncio.run (func ())
Asynchronous iterators don't really help much, they just support async for syntax.
Asynchronous context manager
This object controls the environment in the async with statement by defining _ _ aenter__ () and _ _ aexit__ () methods.
Import asyncioclass AsyncContextManager: def _ _ init__ (self): self.conn = None async def do_something (self): # Asynchronous Operation Database return 666 async def _ aenter__ (self): # Asynchronous linked Database self.conn = await asyncio.sleep (1) return self async def _ aexit__ (self, exc_type, exc Tb): # Asynchronous close database link await asyncio.sleep (1) async def func (): async with AsyncContextManager () as f: result = await f.do_something () print (result) asyncio.run (func ())
This asynchronous context manager is useful and can be handled in this way when opening, handling, and closing operations during development.
Uvloop
Uvloop is an alternative to the event loop in asyncio, which can improve the performance of asyncio. In fact, uvloop is at least 2 times faster than other python asynchronous frameworks such as nodejs and gevent, and its performance is comparable to that of the Go language.
Install uvloop
Pip3 install uvloop
It is also very simple to replace the event loop of asyncio with uvloop in the project, as long as you do so in the code.
Import asyncioimport uvloopasyncio.set_event_loop_policy (uvloop.EventLoopPolicy ()) # writes the code for asyncio, which is the same as the code written before. # Internal event loop automation will become uvloopasyncio.run (...)
Note: inside the well-known asgi uvicorn is the event loop of the uvloop used.
Asynchronous redis
When operating redis through python, linking, setting values, and getting values all involve network IO requests. Using asycio asynchronous mode, you can do some other tasks while IO is waiting, so as to improve performance.
Install the Python asynchronous operation redis module
Pip3 install aioredis
Case: connect multiple redis to do operation (encounter IO will switch other tasks, provide performance)
Import asyncioimport aioredisasync def execute (address, password): print ("start execution", address) # Network IO operation: first connect to 77.95.4.197password 6379, then automatically switch tasks when you encounter IO Disconnect 77.95.4.1986379 redis = await aioredis.create_redis_pool (address, password=password) # Network IO operation: automatically switch tasks await redis.hmset_dict ('car', key1=1, key2=2, key3=3) # Network IO operations: automatically switch tasks result = await redis.hgetall (' car') when encounter with IO Encoding='utf-8') print (result) redis.close () # Network IO operation: automatically switch tasks await redis.wait_closed () print ("end", address) task_list = [execute ('redis://77.95.4.197:6379', "123456"), execute (' redis://77.95.4.198:6379') when you encounter IO "123456")] asyncio.run (asyncio.wait (task_list)) Asynchronous MySQL
When operating MySQL through python, connecting, executing SQL, and shutting down all involve network IO requests. Using asycio asynchronous mode can do some other tasks while IO is waiting, so as to improve performance.
Install the Python asynchronous operation redis module
Pip3 install aiomysql
Case
Import asyncioimport aiomysqlasync def execute (host, password): print ("start", host) # Network IO operation: connect 77.95.40.197 first, and automatically switch tasks when you encounter IO Disconnect 77.95.40.198user='root', password=password 6379 conn = await aiomysql.connect (host=host, port=3306, user='root', password=password, db='mysql') # Network IO operation: automatically switch tasks cur = await conn.cursor () # Network IO operations: automatically switch tasks await cur.execute ("SELECT Host) when encountered with IO User FROM user ") # Network IO operation: automatically switch tasks result = await cur.fetchall () print (result) # Network IO operations: automatically switch tasks await cur.close () conn.close () print (" end ", host) task_list = [execute ('77.95.40.197mm," 123456 "), execute (' 77.95.40.198' "123456")] asyncio.run (asyncio.wait (task_list)) crawler
When writing crawler applications, we need to request the target data through the network IO, which is suitable for using asynchronous programming to improve performance. Next, we use the aiohttp module that supports asynchronous programming.
Install the aiohttp module
Pip3 install aiohttp
Case
Import aiohttpimport asyncioasync def fetch (session, url): print (f "send request: {url}") async with session.get (url, verify_ssl=False) as response: text = await response.text () print ("get result:", url, len (text) async def main (): async with aiohttp.ClientSession () as session: url_list = ["http://www.baidu.com"," http://www.taobao.com", "http://www.jd.com"] tasks = [asyncio.create_task (fetch (session, url) for url in url_list] await asyncio.wait (tasks) if _ _ name__ = ='_ main__': asyncio.run (main ()). This is the end of the article on" how to implement a Cooperative Program in python ". Thank you for reading! I believe that everyone has a certain understanding of the knowledge of "how to achieve cooperation in python". If you want to learn more knowledge, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.