Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the pits encountered in Python distributed processes?

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

What are the pits that will be encountered in the Python distributed process? I believe many inexperienced people are at a loss about this. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Little surprise, big surprise.

Are you using Python3 or programming on windows systems? The most important thing is that you are not very clear about processes and threads. So congratulations, in the python distributed process, there will be holes waiting for you to dig. (, I'm allowed to scare you here.) I'm kidding, but if you know that anonymous functions are not supported in the sequence, then this pit is say byebye with you. Okay, not a lot of words, let's get to the point.

Distributed process

As we all know, Process is more stable than Thread, and Process can be distributed to multiple machines, while Thread can only be distributed to multiple CPU on the same machine at most. Python's multiprocessing module not only supports multiple processes, but the managers submodule also supports the distribution of multiple processes to multiple machines. A service process can act as a scheduler, distribute tasks to other processes and rely on network communication. Because the managers module is well encapsulated, it is easy to write distributed multi-process programs without knowing the details of network communication.

Code record

For instance

If we already have a multi-process program that communicates through Queue running on the same machine, and now, because of the heavy task of the process processing the task, we want to distribute the process of sending the task and the process of processing the task to two machines, how should this be achieved with distributed processes? You already know that the original Queue can continue to be used, and exposing the Queue through the network through the managers module allows the processes of other machines to access the Queue. All right, let's do it!

Write a task_master.py

Let's first look at the service process. The service process is responsible for starting Queue, registering Queue on the network, and then writing tasks to Queue.

#! / user/bin/pytthon #-*-coding:utf-8-*-# @ Time: 2018-3-3 16:46 # @ Author: lichexo # @ File: task_master.py import random, time, queue from multiprocessing.managers import BaseManager # queue for sending tasks: task_queue = queue.Queue () # queue for receiving results: result_queue = queue.Queue () # QueueManager: class QueueManager (BaseManager): pass # inheriting from BaseManager: pass # registers both Queue with the network The callable parameter is associated with the Queue object: QueueManager.register ('get_task_queue', callable=lambda: task_queue) QueueManager.register (' get_result_queue', callable=lambda: result_queue) # bind port 5000, and set the verification code 'abc': manager = QueueManager (address= (', 5000) Authkey=b'abc') # start Queue: manager.start () # get the Queue object accessed through the network: task = manager.get_task_queue () result = manager.get_result_queue () # put several tasks in: for i in range (10): n = random.randint (0,10000) print ('Put task% d..% n) task.put (n) # read from the result queue: print (' Try get results...') For i in range (10): r = result.get (timeout=10) print ('Result:% s'% r) # off: manager.shutdown () print ('master exit.')

Please note that when we write multi-process programs on a machine, the created Queue can be used directly, but in a distributed multi-process environment, adding tasks to Queue cannot directly operate on the original task_queue, so it bypasses the encapsulation of QueueManager and must be added through the Queue interface obtained by manager.get_task_queue (). Then, start the task process on another machine (or start it on this machine)

Write a task_worker.py

#! / user/bin/pytthon #-*-coding:utf-8-*-# @ Time: 2018-3-3 16:46 # @ Author: lichexo # @ File: task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # create a similar QueueManager: class QueueManager (BaseManager): pass # because this QueueManager only gets Queue from the network So only the name QueueManager.register ('get_task_queue') QueueManager.register (' get_result_queue') # is provided when registering to connect to the server That is, the machine running task_master.py: server_addr = '127.0.0.1' print ('Connect to server% s..'% server_addr) # port and CAPTCHA should be exactly the same as set by task_master.py: M = QueueManager (address= (server_addr, 5000) Authkey=b'abc') # get the Queue object from the network connection: m.connect () #: task = m.get_task_queue () result = m.get_result_queue () # fetch the task from the task queue and write the result to the result queue: for i in range (10): try: n = task.get (timeout=1) print ('run task% d *% d..'% (n) N)) r ='% d *% d =% d'% (n, n, n) time.sleep (1) result.put (r) except Queue.Empty: print ('task queue is empty.') # end of processing: print (' worker exit.')

The task process needs to connect to the service process over the network, so specify the IP of the service process.

Running result

Now it's time to try out how distributed processes work. Start the task_master.py service process first:

Traceback (most recent call last): File "F:/Python/untitled/xianchengjincheng/master.py", line 25, in manager.start () File "F:Pythonpystalllibmultiprocessingmanagers.py", line 513, in start self._process.start () File "F:Pythonpystalllibmultiprocessingprocess.py", line 105, in start self._popen = self._Popen (self) File "F:Pythonpystalllibmultiprocessingcontext.py", line 322, in _ Popen return Popen (process_obj) File "F:Pythonpystalllibmultiprocessingpopen_spawn_win32.py", line 65 In__ init__ reduction.dump (process_obj, to_child) File "F:Pythonpystalllibmultiprocessing eduction.py", line 60, in dump ForkingPickler (file, protocol) .dump (obj) _ pickle.PicklingError: Can't pickle: attribute lookup on _ _ main__ failed

After the task_master.py process finishes sending the task, it starts waiting for the result of the result queue. Now start the task_worker.py process:

Connect to server 127.0.0.1... Traceback (most recent call last): File "F:/Python/untitled/xianchengjincheng/work.py", line 24, in m.connect () File "F:Pythonpystalllibmultiprocessingmanagers.py", line 489, in connect conn = Client (self._address, authkey=self._authkey) File "F:Pythonpystalllibmultiprocessingconnection.py", line 487, in Client c = SocketClient (address) File "F:Pythonpystalllibmultiprocessingconnection.py", line 614, in SocketClient s.connect (address) ConnectionRefusedError: [WinError 10061] unable to connect due to active refusal of the target computer.

See, it all went wrong. Let's make a good analysis of what went wrong.

Error analysis

In the error prompt of task_master.py, we know that it says lambda error, this is because serialization does not support anonymous functions, so we have to modify the code to re-encapsulate queue in QueueManager and put it into the network.

# register both Queue to the network, and the callable parameter is associated with the Queue object QueueManager.register ('get_task_queue',callable=return_task_queue) QueueManager.register (' get_result_queue',callable=return_result_queue)

Where task_queue and result_queue are two queues for tasks and results, respectively. They are used to communicate between processes and exchange objects.

Because it is a distributed environment, the data put into the queue needs to be read after being processed by the Workers machine, so the queue needs to be encapsulated and put into the network with QueueManager, which is realized by the above two lines of code. We give the network call interface of return_task_queue a name of get_task_queue, and the name of return_result_queue is get_result_queue, which makes it easy to distinguish which queue to operate on. Task.put (n) is to write data to task_queue, which is equivalent to assigning tasks. Result.get () is the result returned after waiting for the workers machine to process it.

It is worth noting that in windows systems you have to write IP addresses, while other operating systems such as linux do not.

# windows needs to write the ip address manager = QueueManager (address= ('127.0.0.1, 5000), authkey=b'abc')

Modified code

Modify it in task_master.py as follows:

#! / user/bin/pytthon #-*-coding:utf-8-*-# @ Time: 16:46 on 2018-3-3 # @ Author: lichexo # @ File: task_master.py # task_master.py import random,time Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support task_queue = queue.Queue () # queue to send tasks: result_queue = queue.Queue () # queue to receive results: class QueueManager (BaseManager): # run def return_task_queue (): global task_queue return task_queue # to send task queue def return_result_queue (): global result_queue return result under QueueManager: pass # windows inherited from BaseManager _ queue # returns the receiving result queue def test (): # registers both Queue to the network The callable parameter is associated with Queue objects, which are used for interprocess communication Exchange object # QueueManager.register ('get_task_queue', callable=lambda: task_queue) # QueueManager.register (' get_result_queue', callable=lambda: result_queue) QueueManager.register ('get_task_queue', callable=return_task_queue) QueueManager.register (' get_result_queue', callable=return_result_queue) # bind port 5000, set the verification code 'abc': # manager = QueueManager (address= (', 5000)) Authkey=b'abc') # windows needs to write the ip address manager = QueueManager (address= ('127.0.0.1, 5000), authkey=b'abc') manager.start () # launch Queue: # get the Queue object accessed through the network: task = manager.get_task_queue () result = manager.get_result_queue () for i in range (10): # put several tasks in: n = random.randint (0) 10000) print ('Put task% d..'% n) task.put (n) # read result from result queue: print ('Try get results...') For i in range (10): # exception catch try: r = result.get (timeout=5) print ('Result:% s% r) except queue.Empty: print (' result queue is empty.') # close: manager.shutdown () print ('master exit.') If _ _ name__=='__main__': freeze_support () print ('startkeeper') Test ()

Modify it in task_worker.py as follows:

#! / user/bin/pytthon #-*-coding:utf-8-*-# @ Time: 2018-3-3 16:46 # @ Author: lichexo # @ File: task_worker.py # task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # create a similar QueueManager: class QueueManager (BaseManager): pass # because this QueueManager only obtains Queue from the network So only the name QueueManager.register ('get_task_queue') QueueManager.register (' get_result_queue') # is provided when registering to connect to the server That is, the machine running task_master.py: server_addr = '127.0.0.1' print ('Connect to server% s..'% server_addr) # port and CAPTCHA should be exactly the same as set by task_master.py: M = QueueManager (address= (server_addr, 5000) Authkey=b'abc') # get the Queue object from the network connection: m.connect () #: task = m.get_task_queue () result = m.get_result_queue () # fetch the task from the task queue and write the result to the result queue: for i in range (10): try: n = task.get (timeout=1) print ('run task% d *% d..'% (n) N)) r ='% d *% d =% d'% (n, n, n) time.sleep (1) result.put (r) except queue.Empty: print ('task queue is empty.') # end of processing: print (' worker exit.')

Run task_master.py before running task_worker.py

(1) the running result of task_master.py is as follows

Start! Put task 7872... Put task 6931... Put task 1395... Put task 8477... Put task 8300... Put task 1597... Put task 8738... Put task 8627... Put task 1884... Put task 2561... Try get results... Result: 7872 * 7872 = 61968384 Result: 6931 * 6931 = 48038761 Result: 1395 * 6931 = 1946025 Result: 8477 * 8477 = 71859529 Result: 8300 * 8300 = 68890000 Result: 1 597 * 1597 = 2550409 Result: 8738 * 8738 = 76352644 Result: 8627 * 8627 = 74425129 Result: 1884 * 1884 = 3549456 Result: 2561 * 2561 = 6558721 master exit.

(2) the running result of task_worker.py is as follows

Connect to server 127.0.0.1... Run task 8640 * 8640. Run task 7418 * 7418... Run task 9303 * 9303. Run task 568 * 568. Run task 1633 * 1633... Run task 3583 * 3583... Run task 3293 * 3293... Run task 8975 * 8975. Run task 8189 * 8189. Run task 731 * 731. Worker exit.

Knowledge supplement

What is the use of this simple Master/Worker model? In fact, this is a simple but truly distributed computing. By slightly modifying the code and starting multiple worker, you can distribute tasks to several or even dozens of machines. For example, by replacing the code for calculating Numbern with sending mail, you can send mail queue asynchronously.

Where are the Queue objects stored? Notice that there is no code to create the Queue in task_worker.py, so the Queue object is stored in the task_master.py process:

The reason why Queue can be accessed through the network is through QueueManager. Since QueueManager manages more than one Queue, give each Queue's network calling interface a name, such as get_task_queue. The name registered by QueueManager here in task_worker must be the same as in task_manager. Compared to the above example, you can see that the Queue object is passed over the network from another process. It's just that the delivery and network communication here is done by QueueManager.

What's the use of authkey? This is to ensure that the two machines communicate normally and not be maliciously interfered by other machines. If the authkey of task_worker.py is inconsistent with the authkey of task_master.py, you will definitely not be able to connect.

After reading the above, have you mastered the methods of the pits that will be encountered in Python distributed processes? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report