歌者文明清理员 发表于 2023-4-8 14:40:43

multiprocessing.Process的提问

本帖最后由 歌者文明清理员 于 2023-4-9 19:12 编辑

# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint


class Point:
    # 积分类
    def __init__(self, point):
      self.num = point

    def __repr__(self):
      return f'Point({self.num})'


def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
      point_num = randint(1, 10)
      point_part = Point(point_num)
      queue.put(point_part)


def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
      point = queue.get()
      print(point)


que = Queue()
cons = Process(target=consumer, args=(que, ))
prod = Process(target=producer, args=(que, ))
cons.start()
prod.start()
cons.join()
prod.terminate()

报错信息如下:
Traceback (most recent call last):
File "<string>", line 1, in <module>
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
    exitcode = _main(fd)
File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 115, in _main
File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'consumer' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'producer' on <module '__main__' (built-in)>
可能是因为需要if __name__ == '__main__',所以我又更改了代码
# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint


class Point:
    # 积分类
    def __init__(self, point):
      self.num = point

    def __repr__(self):
      return f'Point({self.num})'


def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
      point_num = randint(1, 10)
      point_part = Point(point_num)
      queue.put(point_part)


def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
      point = queue.get()
      print(point)


if __name__ == '__main__':
    que = Queue()
    cons = Process(target=consumer, args=(que, ))
    prod = Process(target=producer, args=(que, ))
    cons.start()
    prod.start()
    cons.join()
    prod.terminate()
虽然没有报错,也可以看到正常输出,但是 prod.terminate() 没用,死循环停不下来,我只能加上 timeout 参数并捕获错误:
def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
      try:
            point = queue.get(timeout=1)
      except:
            break

虽然问题已经解决,但为什么报错

isdkz 发表于 2023-4-8 14:44:41

本帖最后由 isdkz 于 2023-4-8 14:46 编辑

根据您提供的信息,我理解了您遇到的两个问题。首先,让我们分别解决它们:

1、AttributeError: Can't get attribute 'consumer' on <module 'main' (built-in)>
这个错误是因为在使用multiprocessing模块时,Process实例需要在if __name__ == '__main__':块中创建。

这是因为multiprocessing模块使用子进程来并行执行任务,而子进程通常需要单独的独立的执行环境。

当您将Process实例移动到if __name__ == '__main__':代码块中时,这个问题就解决了。

2、死循环停不下来的问题
prod.terminate()没有停止死循环,原因是producer进程已经完成了任务并退出,而consumer进程在queue.get()时,由于队列为空,它会一直等待新的元素。

由于producer进程已经退出,队列不会再有新元素加入。因此,consumer进程会陷入死循环。

为了解决这个问题,您可以在producer函数完成任务后,向队列中放入一个特殊的结束标记。然后在consumer函数中检查队列中是否有这个结束标记,如果有,则跳出循环。这是一个示例:

def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
      point_num = randint(1, 10)
      point_part = Point(point_num)
      queue.put(point_part)
    queue.put(None)# 放入结束标记

def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
      point = queue.get()
      if point is None:# 检查结束标记
            break
      print(point)

这样,consumer函数就可以在检测到结束标记后正常退出循环了。

这样一来,您不再需要为queue.get()设置timeout参数,因为consumer进程会在处理完所有数据后自动退出。


完整代码:
# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint


class Point:
    # 积分类
    def __init__(self, point):
      self.num = point

    def __repr__(self):
      return f'Point({self.num})'


def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
      point_num = randint(1, 10)
      point_part = Point(point_num)
      queue.put(point_part)
    queue.put(None)# 放入结束标记

def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
      point = queue.get()
      if point is None:# 检查结束标记
            break
      print(point)

if __name__ == '__main__':
    que = Queue()
    cons = Process(target=consumer, args=(que, ))
    prod = Process(target=producer, args=(que, ))
    cons.start()
    prod.start()
    cons.join()
    prod.terminate()
页: [1]
查看完整版本: multiprocessing.Process的提问