鱼C论坛

 找回密码
 立即注册
查看: 1349|回复: 1

[已解决]multiprocessing.Process的提问

[复制链接]
发表于 2023-4-8 14:40:43 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
本帖最后由 歌者文明清理员 于 2023-4-9 19:12 编辑
# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint


class Point:
    # 积分类
    def __init__(self, point):
        self.num = point

    def __repr__(self):
        return f'Point({self.num})'


def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
        point_num = randint(1, 10)
        point_part = Point(point_num)
        queue.put(point_part)


def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
        point = queue.get()
        print(point)


que = Queue()
cons = Process(target=consumer, args=(que, ))
prod = Process(target=producer, args=(que, ))
cons.start()
prod.start()
cons.join()
prod.terminate()
报错信息如下:
Traceback (most recent call last):
  File "<string>", line 1, in <module>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
  File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
    exitcode = _main(fd)
  File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 115, in _main
  File "c:\users\bruce\appdata\local\programs\python\python37\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'consumer' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'producer' on <module '__main__' (built-in)>
可能是因为需要if __name__ == '__main__',所以我又更改了代码
# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint


class Point:
    # 积分类
    def __init__(self, point):
        self.num = point

    def __repr__(self):
        return f'Point({self.num})'


def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
        point_num = randint(1, 10)
        point_part = Point(point_num)
        queue.put(point_part)


def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
        point = queue.get()
        print(point)


if __name__ == '__main__':
    que = Queue()
    cons = Process(target=consumer, args=(que, ))
    prod = Process(target=producer, args=(que, ))
    cons.start()
    prod.start()
    cons.join()
    prod.terminate()
虽然没有报错,也可以看到正常输出,但是 prod.terminate() 没用,死循环停不下来,我只能加上 timeout 参数并捕获错误:
def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
        try:
            point = queue.get(timeout=1)
        except:
            break
虽然问题已经解决,但为什么报错
最佳答案
2023-4-8 14:44:41
本帖最后由 isdkz 于 2023-4-8 14:46 编辑

根据您提供的信息,我理解了您遇到的两个问题。首先,让我们分别解决它们:

1、AttributeError: Can't get attribute 'consumer' on <module 'main' (built-in)>
这个错误是因为在使用multiprocessing模块时,Process实例需要在if __name__ == '__main__':块中创建。

这是因为multiprocessing模块使用子进程来并行执行任务,而子进程通常需要单独的独立的执行环境。

当您将Process实例移动到if __name__ == '__main__':代码块中时,这个问题就解决了。

2、死循环停不下来的问题
prod.terminate()没有停止死循环,原因是producer进程已经完成了任务并退出,而consumer进程在queue.get()时,由于队列为空,它会一直等待新的元素。

由于producer进程已经退出,队列不会再有新元素加入。因此,consumer进程会陷入死循环。

为了解决这个问题,您可以在producer函数完成任务后,向队列中放入一个特殊的结束标记。然后在consumer函数中检查队列中是否有这个结束标记,如果有,则跳出循环。这是一个示例:
def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
        point_num = randint(1, 10)
        point_part = Point(point_num)
        queue.put(point_part)
    queue.put(None)  # 放入结束标记

def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
        point = queue.get()
        if point is None:  # 检查结束标记
            break
        print(point)

这样,consumer函数就可以在检测到结束标记后正常退出循环了。

这样一来,您不再需要为queue.get()设置timeout参数,因为consumer进程会在处理完所有数据后自动退出。


完整代码:
# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint


class Point:
    # 积分类
    def __init__(self, point):
        self.num = point

    def __repr__(self):
        return f'Point({self.num})'


def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
        point_num = randint(1, 10)
        point_part = Point(point_num)
        queue.put(point_part)
    queue.put(None)  # 放入结束标记

def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
        point = queue.get()
        if point is None:  # 检查结束标记
            break
        print(point)

if __name__ == '__main__':
    que = Queue()
    cons = Process(target=consumer, args=(que, ))
    prod = Process(target=producer, args=(que, ))
    cons.start()
    prod.start()
    cons.join()
    prod.terminate()
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

发表于 2023-4-8 14:44:41 | 显示全部楼层    本楼为最佳答案   
本帖最后由 isdkz 于 2023-4-8 14:46 编辑

根据您提供的信息,我理解了您遇到的两个问题。首先,让我们分别解决它们:

1、AttributeError: Can't get attribute 'consumer' on <module 'main' (built-in)>
这个错误是因为在使用multiprocessing模块时,Process实例需要在if __name__ == '__main__':块中创建。

这是因为multiprocessing模块使用子进程来并行执行任务,而子进程通常需要单独的独立的执行环境。

当您将Process实例移动到if __name__ == '__main__':代码块中时,这个问题就解决了。

2、死循环停不下来的问题
prod.terminate()没有停止死循环,原因是producer进程已经完成了任务并退出,而consumer进程在queue.get()时,由于队列为空,它会一直等待新的元素。

由于producer进程已经退出,队列不会再有新元素加入。因此,consumer进程会陷入死循环。

为了解决这个问题,您可以在producer函数完成任务后,向队列中放入一个特殊的结束标记。然后在consumer函数中检查队列中是否有这个结束标记,如果有,则跳出循环。这是一个示例:
def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
        point_num = randint(1, 10)
        point_part = Point(point_num)
        queue.put(point_part)
    queue.put(None)  # 放入结束标记

def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
        point = queue.get()
        if point is None:  # 检查结束标记
            break
        print(point)

这样,consumer函数就可以在检测到结束标记后正常退出循环了。

这样一来,您不再需要为queue.get()设置timeout参数,因为consumer进程会在处理完所有数据后自动退出。


完整代码:
# 获取进程的ID
from os import getpid
# 进程和队列
from multiprocessing import Process, Queue
# 本例子需要使用随机数
from random import randint


class Point:
    # 积分类
    def __init__(self, point):
        self.num = point

    def __repr__(self):
        return f'Point({self.num})'


def producer(queue):
    # 生产者函数
    print(f'生产者pid:{getpid()}')
    for i in range(10):
        point_num = randint(1, 10)
        point_part = Point(point_num)
        queue.put(point_part)
    queue.put(None)  # 放入结束标记

def consumer(queue):
    # 消费者
    print(f'消费者pid:{getpid()}')
    while True:
        point = queue.get()
        if point is None:  # 检查结束标记
            break
        print(point)

if __name__ == '__main__':
    que = Queue()
    cons = Process(target=consumer, args=(que, ))
    prod = Process(target=producer, args=(que, ))
    cons.start()
    prod.start()
    cons.join()
    prod.terminate()
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2024-12-26 21:21

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表