multiprocessing.Process的提问
本帖最后由 歌者文明清理员 于 2023-4-9 19:12 编辑# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint
class Point:
# 积分类
def __init__(self, point):
self.num = point
def __repr__(self):
return f'Point({self.num})'
def producer(queue):
# 生产者函数
print(f'生产者pid:{getpid()}')
for i in range(10):
point_num = randint(1, 10)
point_part = Point(point_num)
queue.put(point_part)
def consumer(queue):
# 消费者
print(f'消费者pid:{getpid()}')
while True:
point = queue.get()
print(point)
que = Queue()
cons = Process(target=consumer, args=(que, ))
prod = Process(target=producer, args=(que, ))
cons.start()
prod.start()
cons.join()
prod.terminate()
报错信息如下:
Traceback (most recent call last):
File "<string>", line 1, in <module>
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
exitcode = _main(fd)
File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 115, in _main
File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 115, in _main
self = reduction.pickle.load(from_parent)
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'consumer' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'producer' on <module '__main__' (built-in)>
可能是因为需要if __name__ == '__main__',所以我又更改了代码
# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint
class Point:
# 积分类
def __init__(self, point):
self.num = point
def __repr__(self):
return f'Point({self.num})'
def producer(queue):
# 生产者函数
print(f'生产者pid:{getpid()}')
for i in range(10):
point_num = randint(1, 10)
point_part = Point(point_num)
queue.put(point_part)
def consumer(queue):
# 消费者
print(f'消费者pid:{getpid()}')
while True:
point = queue.get()
print(point)
if __name__ == '__main__':
que = Queue()
cons = Process(target=consumer, args=(que, ))
prod = Process(target=producer, args=(que, ))
cons.start()
prod.start()
cons.join()
prod.terminate()
虽然没有报错,也可以看到正常输出,但是 prod.terminate() 没用,死循环停不下来,我只能加上 timeout 参数并捕获错误:
def consumer(queue):
# 消费者
print(f'消费者pid:{getpid()}')
while True:
try:
point = queue.get(timeout=1)
except:
break
虽然问题已经解决,但为什么报错 本帖最后由 isdkz 于 2023-4-8 14:46 编辑
根据您提供的信息,我理解了您遇到的两个问题。首先,让我们分别解决它们:
1、AttributeError: Can't get attribute 'consumer' on <module 'main' (built-in)>
这个错误是因为在使用multiprocessing模块时,Process实例需要在if __name__ == '__main__':块中创建。
这是因为multiprocessing模块使用子进程来并行执行任务,而子进程通常需要单独的独立的执行环境。
当您将Process实例移动到if __name__ == '__main__':代码块中时,这个问题就解决了。
2、死循环停不下来的问题
prod.terminate()没有停止死循环,原因是producer进程已经完成了任务并退出,而consumer进程在queue.get()时,由于队列为空,它会一直等待新的元素。
由于producer进程已经退出,队列不会再有新元素加入。因此,consumer进程会陷入死循环。
为了解决这个问题,您可以在producer函数完成任务后,向队列中放入一个特殊的结束标记。然后在consumer函数中检查队列中是否有这个结束标记,如果有,则跳出循环。这是一个示例:
def producer(queue):
# 生产者函数
print(f'生产者pid:{getpid()}')
for i in range(10):
point_num = randint(1, 10)
point_part = Point(point_num)
queue.put(point_part)
queue.put(None)# 放入结束标记
def consumer(queue):
# 消费者
print(f'消费者pid:{getpid()}')
while True:
point = queue.get()
if point is None:# 检查结束标记
break
print(point)
这样,consumer函数就可以在检测到结束标记后正常退出循环了。
这样一来,您不再需要为queue.get()设置timeout参数,因为consumer进程会在处理完所有数据后自动退出。
完整代码:
# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint
class Point:
# 积分类
def __init__(self, point):
self.num = point
def __repr__(self):
return f'Point({self.num})'
def producer(queue):
# 生产者函数
print(f'生产者pid:{getpid()}')
for i in range(10):
point_num = randint(1, 10)
point_part = Point(point_num)
queue.put(point_part)
queue.put(None)# 放入结束标记
def consumer(queue):
# 消费者
print(f'消费者pid:{getpid()}')
while True:
point = queue.get()
if point is None:# 检查结束标记
break
print(point)
if __name__ == '__main__':
que = Queue()
cons = Process(target=consumer, args=(que, ))
prod = Process(target=producer, args=(que, ))
cons.start()
prod.start()
cons.join()
prod.terminate()
页:
[1]