Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze queues in the full stack of Python

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

How to analyze the queue in the Python stack, I believe that many inexperienced people do not know what to do about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

1. Lock mutex

Knowledge points:

Lock.acquire () # Lock lock.release () # unlock # allow one lock on a process at the same time, that is, Lock locking ensures that when multiple processes modify the same piece of data, only one task can modify it at a time, that is, serial modification. Yes, the speed is slow, but the sacrifice speed ensures data security. # allowing multiple locks on multiple processes at the same time is [semaphore Semaphore] semaphore is a deformation of locks: the actual implementation is counter + lock, and multiple processes are allowed to lock # Lock: mutex is the mutual exclusion of processes. Whoever grabs the resource first will lock and change the resource content, in order to ensure the synchronization of the data. # Note: multiple locks together, do not unlock, will cause deadlock. Locking and unlocking are a couple.

Program implementation:

# Lock lock mutex lock from multiprocessing import Process,Lock "" locking and unlocking is a pair, continuously unlocking is deadlock, only in the unlocked state, other processes have a chance to lock "# create a lock lock = Lock () # lock lock.acquire () # lock.acquire () # continuous lock; print (" I am curling smoke. You are waiting anxiously. While the toilet is in progress. ") # unlock lock.release ()" # 12306 ticket grabbing software import json,time,random# 1. Def wr_info (sign, dic=None): if sign = = "r": with open ("ticket", mode= "r", encoding= "utf-8") as fp: dic= json.load (fp) return dic elif sign = = "w": with open ("ticket", mode= "w") in the read-write database Encoding= "utf-8") as fp: json.dump (dic,fp) # dic= wr_info ("w", dic= {"count": 0}) # print (dic, type (dic)) # 2. Def get_ticket (person): # get the actual number of votes in the database first dic = wr_info ("r") # simulate the network delay time.sleep (random.uniform (0.1) 0.7) # judge the number of votes if dic ["count"] > 0: print ("{} got the tickets ".format (person) # after grabbing the ticket Subtract the current number of votes by 1 dic ["count"]-= 1 # update the number of votes in the database wr_info ("w", dic) else: print ("{} didn't get the ticket" .format (person)) # 3. Make a uniform call to def main (person,lock): # check the remaining votes dic = wr_info ("r") print ("{} View votes remaining: {}" .format (person) Dic ["count"]) # Lock lock.acquire () # start to grab tickets get_ticket (person) # unlock lock.release () if _ _ name__ = = "_ _ main__": lock = Lock () lst = ["Liang Xinyu", "Kang Yukang", "Zhang Baozhang", "Yu Chaozhi", "Xue Yujian", "Han Ruirui" "diving first", "Liu Zitao", "Li Minghui", "Zhao Fenghong"] for i in lst: P = Process (target=main,args= (I, lock)) p.start () "" create the process, start the ticket grabbing is an asynchronous concurrent program, until it starts to grab the ticket, it becomes a synchronous program, first grab the lock resources, then execute the lock resources. It is executed sequentially; it is a synchronization program; when you grab a ticket, it becomes a synchronization program. The advantage is that you can wait until the data modification is completed and let the next person grab the data to ensure that the data is not messed up. If it is not locked, when there is only one ticket left, then everyone can get the ticket, because the program executes too fast, so it is close to the synchronization process, resulting in the wrong data. "

Ticket file

{"count": 0}

two。 Event _ traffic light effect 2.1semaphore _ semaphore# semaphore Semaphore is essentially a lock, but multiple locks on multiple processes can control the number of locks "" Semaphore = lock + "from multiprocessing import Semaphore, Processimport time" Random "" # allow 5 locks on multiple processes at the same time sem = Semaphore (5) # lock sem.acquire () print ("execute operation...") # unlock sem.release () "" def singsong_ktv (person,sem): # lock sem.acquire () print ("{} enter the singing bar ktv Singing ~ ".format (person)) # sing for a while time.sleep (random.randrange (4pj8)) # 4 5 6 7 print (" {} left the bar ktv After singing... ".format (person)) # unlock sem.release () if _ _ name__ = =" _ _ main__ ": sem = Semaphore (5) lst = [" Zhao Fengyong "," Shen Siyu "," Zhao Wanli "," Zhang Yu "," false lead "," Sun Jielong "," Chen Lu "," Wang Yuhan " "Yang Yuantao", "Liu Yifeng"] for i in lst: P = Process (target=singsong_ktv, args = (I, sem)) p.start () "" # Summary: Semaphore can set the number of locks. When creating processes with multiple locks at the same time, they are asynchronous and concurrent, and when executing tasks, they are synchronous programs. Zhao Wanli entered the singing bar ktv, singing ~ # Zhao Fengyong entered the singing bar ktv, singing ~ # Zhang Yu entered the singing bar ktv, singing ~ # Shen Siyu entered the singing bar ktv, singing ~ # Sun Jielong entered the singing bar ktv Singing ~ 2.2 event _ traffic light effect # event (Event) "" # blocking event: e = Event () generate event object e e.wait () dynamically block the program Whether blocking is added in the program depends entirely on the is_set () in the object [the default return value is False] # if it is True without blocking # if it is False plus blocking # control the value of this property # set () method changes the value of this property to True # clear () method changes the value of this property to False # is_set () method Determine whether the current attribute is True (False by default) "from multiprocessing import Process" Eventimport time, random# 1 invalid attribute = Event () # default attribute value is False.print (e.is_set ()) # determine whether the internal member attribute is False e.wait () # if it is False The code program blocks print ("code is executing...")''# 2roomroome = Event () # change the value of this property to Truee.set () # determine whether the internal member property is Truee.wait () # if it is True The code program does not block print ("code execution...") # change the value of this property to Falsee.clear () e.wait () print ("code execution. 2 ")''# 3" e = Event () # wait (3) stands for waiting up to 3 seconds; e.wait (3) print ("code execution. 3 ")" # simulate the classic traffic light effect # traffic light toggle def traffic_light (e): print ("red light") while True: if e.is_set (): # green light status-> red light time.sleep (1) Print ("red light") # True = > False e.clear () else: # red light status-> green light time.sleep (1) print ("green light") # False = > True e.set () # e = Event () # traffic_light (e) # vehicle status def car (e) I): # determine whether it is a red light, if it is added with wait blocking if not e.is_set (): print ("car {} is waiting." .format (I)) e.wait () # otherwise, it means the green light. Print ("car {} is available..." .format (I)) "" # 1. National traffic lights if _ _ name__ = "_ _ main__": e = Event () # create traffic lights p1 = Process (target=traffic_light, args= (e,)) p1.start () # process of creating cars for i in range (1Query 21): time.sleep (random.randrange (2)) p2 = Process (target=car, args= (e) I) p2.start () "# 2. Baotou traffic light, when there is no car, turn off the traffic light to save electricity If _ _ name__ = = "_ _ main__": lst = [] e = Event () # create traffic light p1 = Process (target=traffic_light, args= (e) ) # set the traffic light to the daemon p1.daemon = True p1.start () # create the trolley process for i in range (1Magne21): time.sleep (random.randrange (2)) p2 = Process (target=car, args= (e) I) lst.append (p2) p2.start () # let all the cars finish running and blow up the traffic lights print (lst) for i in lst: i.join () print ("closed successfully. ")

Event knowledge points:

# blocking event: e = Event () generates event object e e.wait () dynamically blocks the program Whether blocking is added in the program depends entirely on the is_set () in the object [the default return value is False] # if it is True without blocking # if it is False plus blocking # control the value of this property # set () method changes the value of this property to True # clear () method changes the value of this property to False # is_set () method Determine whether the current attribute is True (False by default) 3. Queue process queue # process queue (processes and child processes are isolated from each other If the two want to communicate, you can use queue implementation) from multiprocessing import Process,Queue# to introduce thread module In order to catch queue.Empty exception; import queue# 1. Basic syntax "" order: first in, first out, last in, last out "" # create process queue Q = Queue () # put () store q.put (1) q.put (2) q.put (3) # get () get "" when no data is available Blocking "" # print (q.get ()) # get_nowait () cannot get Datagram exception "" [windows] effect is normal [linux] incompatible "" try: print (q.get_nowait ()) print (q.get_nowait ()) ) print (q.get_nowait ()) print (q.get_nowait ()) except: # queue.Empty pass# put_nowait () the non-blocking version of put# sets the maximum queue length to 3 (the maximum number of elements is 3) "" when the queue length is specified. If too much data is crammed in, it will lead to blocking "# Q2 = Queue (3) # q2.put (111l) # q2.put (222C) # q2.put (333) # q2.put (444)"using put_nowait when the queue is full If you plug in the data, you will directly report an error. "Q2 = Queue (3) try: q2.put_nowait (111l) q2.put_nowait (222b) q2.put_nowait (333) q2.put_nowait (444) except: pass# 2. Inter-process communication IPCdef func (Q): # 2. The child process gets the data stored by the main process res = q.get () print (res, ") # 3. Storing data q.put ("Liu Yigou") if _ name__ = = "_ _ main__": Q3 = Queue () p = Process (target=func,args= (Q3,)) p.start () # 1. The main process stores data q3.put ("Zhao Fengyong") # in order to wait for the child process to store the data in the queue, the main process is getting the data; p.join () # 4. The main process acquires the data stored by the child process print (q3.get (), "")

Tip: generally, the main process executes faster than the child process

Queue knowledge points:

# Inter-process communication IPC# IPC Inter-Process Communication# implements two mechanisms for inter-process communication: # pipeline Pipe # queue Queue# put () stores # get () gets # get_nowait () does not get exception # put_nowait () non-blocking version of putq.empty () detects whether it is empty (knows) q.full () detects whether it is full (knows) 4. Producer-consumer model # producer and consumer model "crawler case No. 1 process is responsible for crawling relevant keyword information from other websites, regular matching to queue storage (mysql) No. 2 process is responsible for taking out the content in the queue. Process 1 can be understood as the producer 2 process in the layout of the modified content into its own website. From a procedural point of view, the producer is responsible for storing data (put), the consumer is responsible for obtaining data (get), and the producer and consumer are more ideal models: how much to produce and how much to consume. The speed of production data is relatively consistent with that of consumption data. The basic version produces the consumer model "problem: the current model, the program cannot be terminated properly." from multiprocessing import Process,Queueimport time,random# consumer model def consumer (Q Name): while True: # get the data in the queue food = q.get () time.sleep (random.uniform (0.1)) print ("{} ate {}" .format (name,food)) # producer model def producer (qmemename Food): for i in range (5): time.sleep (random.uniform (0.1)) # shows the production data print ("{} produces {}" .format (name) Food+str (I)) # Storage production data in queue q.put (food+str (I)) if _ _ name__ = = "_ _ main__": Q = Queue () p1 = Process (target=consumer,args= (Q, "Zhao Wanli") p2 = Process (target=producer,args= (Q, "Zhao Shenyang") (banana)) p1.start () p2.start () p2.join () "# 2. Features of the optimization model "": manually add the identity None at the end of the queue to terminate the consumer model "from multiprocessing import Process,Queueimport time,random# consumer model def consumer (qjournal name): while True: # get the data in the queue food = q.get () # if the last data obtained is None Means that there is no more data to get in the queue, and the loop is terminated. If food is None: break time.sleep (random.uniform (0.1)) print ("{} ate {}" .format (name,food)) # producer model def producer (qQuery name Food): for i in range (5): time.sleep (random.uniform (0.1)) # shows the production data print ("{} produces {}" .format (name) Food+str (I)) # Storage production data in queue q.put (food+str (I)) if _ _ name__ = = "_ _ main__": Q = Queue () p1 = Process (target=consumer,args= (Q, "Zhao Wanli") p2 = Process (target=producer,args= (Q, "Zhao Shenyang") (banana)) p1.start () p2.start () p2.join () q.put (None) # banana 0 banana 1 banana 2 banana 3 banana 4 None Multiple producer and consumer problems: although you can solve the problem, you need to add multiple None, code redundancy from multiprocessing import Process,Queueimport time,random# consumer model def consumer (Q Name): while True: # get the data in the queue food = q.get () # if the last data obtained is None, it means that the queue has no more data to get, and the cycle is terminated If food is None: break time.sleep (random.uniform (0.1)) print ("{} ate {}" .format (name,food)) # producer model def producer (qQuery name Food): for i in range (5): time.sleep (random.uniform (0.1)) # shows the production data print ("{} produces {}" .format (name) Food+str (I)) # Storage production data in queue q.put (food+str (I)) if _ _ name__ = "_ _ main__": Q = Queue () p1 = Process (target=consumer,args= (Q, "Zhao Wanli")) p1room1 = Process (target=consumer,args= (Q, "Zhao Shichao")) p2 = Process (target=producer) Args= (Q, "Zhao Shenyang", "Banana") p2room2 = Process (target=producer,args= (Q, "Zhao Fengyong") "garlic") p1.start () p1_1.start () p2.start () p2_2.start () # wait for all data to be filled p2.join () p2_2.join () # put the None keyword at the end of the queue as an identifier to jump out of the consumer loop Q.put (None) # adds a None to the first consumer to terminate q.put (None) # adds a None to the second consumer to terminate #.

5. Joinablequeue queue uses # JoinableQueue queue "" put stores get to obtain task_done calculator attribute value-1 join is used with task_done to block put data once, and the built-in counter attribute value of the queue + 1get primary data Make the built-in counter property value of the queue through task_done-1join: will judge whether to block or release the queue counter property is equal to 0 according to the queue counter property value, and the code does not block the release queue counter property is not equal to 0 It means code blocking "" from multiprocessing import JoinableQueuejq = JoinableQueue () jq.put ("Wang Tongpei") # + 1jq.put ("Wang Wei") # + 2print (jq.get ()) print (jq.get ()) # print (jq.get ()) blocking jq.task_done () #-1jq.task_done () #-1jq.join () print ("end of code execution. ") # 2. Using JoinableQueue to transform the production of consumer model from multiprocessing import Process,Queueimport time,random# consumer model def consumer (QMagneName): while True: # get the data in the queue food = q.get () time.sleep (random.uniform (0.1)) print ("{} ate {}" .format (name) Food)) # Let the built-in counter property of the queue-1 q.task_done () # producer model def producer (qjournal name Food): for i in range (5): time.sleep (random.uniform (0.1)) # shows the production data print ("{} produces {}" .format (name) Food+str (I)) # Storage production data in queue q.put (food+str (I)) if _ _ name__ = = "_ _ main__": Q = JoinableQueue () p1 = Process (target=consumer,args= (Q, "Zhao Wanli") p2 = Process (target=producer,args= (Q, "Zhao Shenyang") "Banana") p1.daemon = True p1.start () p2.start () p2.join () # must wait for all the data in the queue to be consumed before releasing q.join () print ("Program ends.") 6. Summary ipc allows communication between processes lock actually allows communication between processes, multiple processes to grab a lock, one process grabs the lock, other processes can not grab the lock, processes send messages to each other through the bottom of the socket, telling other processes that the current state has been locked, can not be stronger. Processes are isolated and cannot communicate by default. If you want to communicate, you must read the above contents through ipc (lock, joinablequeue, Manager). Do you know how to analyze the queues in the Python stack? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report