|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 歌者文明清理员 于 2023-4-9 19:12 编辑
- # 获取进程的ID
- from os import getpid
- # 进程和队列
- from multiprocessing import Process, Queue
- # 本例子需要使用随机数
- from random import randint
- class Point:
- # 积分类
- def __init__(self, point):
- self.num = point
- def __repr__(self):
- return f'Point({self.num})'
- def producer(queue):
- # 生产者函数
- print(f'生产者pid:{getpid()}')
- for i in range(10):
- point_num = randint(1, 10)
- point_part = Point(point_num)
- queue.put(point_part)
- def consumer(queue):
- # 消费者
- print(f'消费者pid:{getpid()}')
- while True:
- point = queue.get()
- print(point)
- que = Queue()
- cons = Process(target=consumer, args=(que, ))
- prod = Process(target=producer, args=(que, ))
- cons.start()
- prod.start()
- cons.join()
- prod.terminate()
复制代码
报错信息如下:
- Traceback (most recent call last):
- File "<string>", line 1, in <module>
- Traceback (most recent call last):
- File "<string>", line 1, in <module>
- File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
- File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
- exitcode = _main(fd)
- exitcode = _main(fd)
- File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 115, in _main
- File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 115, in _main
- self = reduction.pickle.load(from_parent)
- self = reduction.pickle.load(from_parent)
- AttributeError: Can't get attribute 'consumer' on <module '__main__' (built-in)>
- AttributeError: Can't get attribute 'producer' on <module '__main__' (built-in)>
复制代码
可能是因为需要if __name__ == '__main__',所以我又更改了代码
- # 获取进程的ID
- from os import getpid
- # 进程和队列
- from multiprocessing import Process, Queue
- # 本例子需要使用随机数
- from random import randint
- class Point:
- # 积分类
- def __init__(self, point):
- self.num = point
- def __repr__(self):
- return f'Point({self.num})'
- def producer(queue):
- # 生产者函数
- print(f'生产者pid:{getpid()}')
- for i in range(10):
- point_num = randint(1, 10)
- point_part = Point(point_num)
- queue.put(point_part)
- def consumer(queue):
- # 消费者
- print(f'消费者pid:{getpid()}')
- while True:
- point = queue.get()
- print(point)
- if __name__ == '__main__':
- que = Queue()
- cons = Process(target=consumer, args=(que, ))
- prod = Process(target=producer, args=(que, ))
- cons.start()
- prod.start()
- cons.join()
- prod.terminate()
复制代码 虽然没有报错,也可以看到正常输出,但是 prod.terminate() 没用,死循环停不下来,我只能加上 timeout 参数并捕获错误:
- def consumer(queue):
- # 消费者
- print(f'消费者pid:{getpid()}')
- while True:
- try:
- point = queue.get(timeout=1)
- except:
- break
复制代码
虽然问题已经解决,但为什么报错
本帖最后由 isdkz 于 2023-4-8 14:46 编辑
根据您提供的信息,我理解了您遇到的两个问题。首先,让我们分别解决它们:
1、AttributeError: Can't get attribute 'consumer' on <module 'main' (built-in)>
这个错误是因为在使用multiprocessing模块时,Process实例需要在if __name__ == '__main__':块中创建。
这是因为multiprocessing模块使用子进程来并行执行任务,而子进程通常需要单独的独立的执行环境。
当您将Process实例移动到if __name__ == '__main__':代码块中时,这个问题就解决了。
2、死循环停不下来的问题
prod.terminate()没有停止死循环,原因是producer进程已经完成了任务并退出,而consumer进程在queue.get()时,由于队列为空,它会一直等待新的元素。
由于producer进程已经退出,队列不会再有新元素加入。因此,consumer进程会陷入死循环。
为了解决这个问题,您可以在producer函数完成任务后,向队列中放入一个特殊的结束标记。然后在consumer函数中检查队列中是否有这个结束标记,如果有,则跳出循环。这是一个示例:
- def producer(queue):
- # 生产者函数
- print(f'生产者pid:{getpid()}')
- for i in range(10):
- point_num = randint(1, 10)
- point_part = Point(point_num)
- queue.put(point_part)
- queue.put(None) # 放入结束标记
- def consumer(queue):
- # 消费者
- print(f'消费者pid:{getpid()}')
- while True:
- point = queue.get()
- if point is None: # 检查结束标记
- break
- print(point)
复制代码
这样,consumer函数就可以在检测到结束标记后正常退出循环了。
这样一来,您不再需要为queue.get()设置timeout参数,因为consumer进程会在处理完所有数据后自动退出。
完整代码:
- # 获取进程的ID
- from os import getpid
- # 进程和队列
- from multiprocessing import Process, Queue
- # 本例子需要使用随机数
- from random import randint
- class Point:
- # 积分类
- def __init__(self, point):
- self.num = point
- def __repr__(self):
- return f'Point({self.num})'
- def producer(queue):
- # 生产者函数
- print(f'生产者pid:{getpid()}')
- for i in range(10):
- point_num = randint(1, 10)
- point_part = Point(point_num)
- queue.put(point_part)
- queue.put(None) # 放入结束标记
- def consumer(queue):
- # 消费者
- print(f'消费者pid:{getpid()}')
- while True:
- point = queue.get()
- if point is None: # 检查结束标记
- break
- print(point)
- if __name__ == '__main__':
- que = Queue()
- cons = Process(target=consumer, args=(que, ))
- prod = Process(target=producer, args=(que, ))
- cons.start()
- prod.start()
- cons.join()
- prod.terminate()
复制代码
|
|