鱼C论坛

 找回密码
 立即注册
查看: 399|回复: 9

[已解决]多线程莫名其妙问题

[复制链接]
发表于 2024-7-19 15:25:40 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <functional>
class ThreadPool
{

public:
        ThreadPool(int numThread) :stop(false)
        {
                for (size_t i = 0; i < numThread; i++)
                {
                        threads.emplace_back(
                        [this]()
                        {
                                while (1)
                                {
                                        std::unique_lock<std::mutex> lock(mtx);
                                        condition.wait(lock, [this]() {return !tasks.empty()||stop; });
                                        if (stop && tasks.empty())
                                        {
                                                return 0;
                                        }

                                        std::function<void()> task(std::move(tasks.front()));
                                        tasks.pop();
                                        lock.unlock();

                                        task();

                                }
                        });
                }
        }

        ~ThreadPool()
        {
                std::unique_lock<std::mutex> lock(mtx);
                stop = true;
                condition.notify_all();

                size_t t =  threads.size();

                for (size_t i = 0; i < t; i++)
                {
                        threads[i].join();
                }


        }

        template<class F,class ...Args>
        void enqueue(F&& f,Args&&... args)
        {
                
                std::function<void()> task =
                         (std::bind(std::forward<F>(f),std::forward<Args>(args)...));

                {
                        std::unique_lock<std::mutex> lock(mtx);
                        tasks.emplace(std::move(task));
                }

                condition.notify_all();
        }
private:
        std::vector<std::thread> threads;
        std::queue<std::function<void()>> tasks;

        std::mutex mtx;
        std::condition_variable condition;

        bool stop;
};



int main()
{
        ThreadPool pool(4);

        for (size_t i = 0; i < 10; i++)
        {
                pool.enqueue([i]() {
                        std::cout << "task:" << i << "running..." << '\n';
                        std::this_thread::sleep_for(std::chrono::seconds(1));
                        std::cout << "task:" << i << "done" << '\n'; });
        }

}为啥啥都没有打印?
最佳答案
2024-7-19 15:26:02
哎呀,鱼油,好像你的代码有点问题啊!还是让我帮你瞧瞧看哪里出了错。我们先来逐步拆解一下。
#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <thread>

class ThreadPool {
public:
    ThreadPool(int numThread) : stop(false) {
        for (size_t i = 0; i < numThread; i++) {
            threads.emplace_back([this]() {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mtx);
                        condition.wait(lock, [this] { return !tasks.empty() || stop; });
                        if (stop && tasks.empty()) {
                            return;
                        }
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mtx);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &worker : threads) {
            worker.join();
        }
    }

    template<class F, class... Args>
    void enqueue(F&& f, Args&&... args) {
        {
            std::unique_lock<std::mutex> lock(mtx);
            tasks.emplace([f, args...]() { f(args...); });
        }
        condition.notify_one();
    }

private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex mtx;
    std::condition_variable condition;
    bool stop;
};

int main() {
    ThreadPool pool(4);

    for (size_t i = 0; i < 10; i++) {
        pool.enqueue([i]() {
            std::cout << "Task " << i << " is executing\n";
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

几点注意的地方:
1. `#include`部分,有些标准头文件没加上,比如 `<iostream>` 和 `<functional>`。
2. `std::unique_lock`需要传递锁类型。
3. 析构函数中应使用 `std::unique_lock` 在修改 `stop` 标志位时锁住 `mtx`,然后调用 `notify_all()` 唤醒所有线程以便它们能退出。
4. 修正了 `enqueue` 函数模板参数格式,并在卧槽确保对 `condition` 使用 `notify_one` 而不是 `notify_all`,这样可以减少不必要的唤醒。

代码经过这些调整后,相信应该能正常运行了。如果有其他问题,尽管问!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

发表于 2024-7-19 15:26:02 | 显示全部楼层    本楼为最佳答案   
哎呀,鱼油,好像你的代码有点问题啊!还是让我帮你瞧瞧看哪里出了错。我们先来逐步拆解一下。
#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <thread>

class ThreadPool {
public:
    ThreadPool(int numThread) : stop(false) {
        for (size_t i = 0; i < numThread; i++) {
            threads.emplace_back([this]() {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mtx);
                        condition.wait(lock, [this] { return !tasks.empty() || stop; });
                        if (stop && tasks.empty()) {
                            return;
                        }
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mtx);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &worker : threads) {
            worker.join();
        }
    }

    template<class F, class... Args>
    void enqueue(F&& f, Args&&... args) {
        {
            std::unique_lock<std::mutex> lock(mtx);
            tasks.emplace([f, args...]() { f(args...); });
        }
        condition.notify_one();
    }

private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex mtx;
    std::condition_variable condition;
    bool stop;
};

int main() {
    ThreadPool pool(4);

    for (size_t i = 0; i < 10; i++) {
        pool.enqueue([i]() {
            std::cout << "Task " << i << " is executing\n";
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

几点注意的地方:
1. `#include`部分,有些标准头文件没加上,比如 `<iostream>` 和 `<functional>`。
2. `std::unique_lock`需要传递锁类型。
3. 析构函数中应使用 `std::unique_lock` 在修改 `stop` 标志位时锁住 `mtx`,然后调用 `notify_all()` 唤醒所有线程以便它们能退出。
4. 修正了 `enqueue` 函数模板参数格式,并在卧槽确保对 `condition` 使用 `notify_one` 而不是 `notify_all`,这样可以减少不必要的唤醒。

代码经过这些调整后,相信应该能正常运行了。如果有其他问题,尽管问!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 2024-7-19 15:30:35 | 显示全部楼层
这段代码是什么意思?        ~ThreadPool()
        {
                {
                        std::unique_lock<std::mutex> lock(mtx);
                        stop = true;
                }
                condition.notify_all();
                for (std::thread& worker : threads) {
                        worker.join();
                }
        }
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 2024-7-19 15:30:49 | 显示全部楼层
sun_chuan 发表于 2024-07-19 15:30
这段代码是什么意思?        ~ThreadPool()
        {
                {
                        st ...


嗨,鱼油,看来你对析构函数有疑问呢!别担心,我来帮你解答。这个析构函数的作用是,在 `ThreadPool` 对象被销毁时,确保所有的线程都能优雅地退出,并释放资源。

让我们一步一步地剖析一下这段代码:

1. **设置停止标志并加锁:**

  
{
                std::unique_lock<std::mutex> lock(mtx);
                stop = true;
        }

    这里使用 `std::unique_lock` 来锁住互斥量 `mtx` 确保对共享数据的安全访问,然后将 `stop` 标志设置为 `true`。这意味着线程池应该停止接收新的任务,并且当前正在等待的线程可以开始准备退出。

2. **通知所有等待的线程:**

  
condition.notify_all();

    这行代码使用条件变量 `condition` 来通知所有等待在 `condition.wait` 的线程。由于 `stop` 标志已经设置为 `true`,这些线程将从等待中醒来。

3. **等待所有线程完成:**

  
for (std::thread& worker : threads) {
                worker.join();
        }

    这种方式是遍历所有的线程,并调用 `join()` 方法。这会阻塞当前线程(即主线程),直到所遍历的每一个线程都执行完毕并退出。这样可以确保在程序继续执行之前,所有的工作线程都能安全地结束。

总结起来,这段析构函数的总体目标是:

- 通知所有工作线程不再接受新任务,并且当前任务完成后应立即退出。
- 等待所有工作线程都安全地完成任务并退出,将其资源妥善回收。

用通俗的话来说,析构函数就是在说:“伙计们,不好意思,时间到了,大家都赶紧把手头的活儿干完,收拾东西准备下班吧!”

希望这个解释对你有所帮助!还有啥问题,尽管问哦!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 2024-7-19 15:34:13 | 显示全部楼层
for (std::thread& worker : threads) {
                worker.join();
        }是什么写法?{
                std::unique_lock<std::mutex> lock(mtx);
                stop = true;
        }加这个互斥量的意义是什么?
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 2024-7-19 15:35:45 | 显示全部楼层
sun_chuan 发表于 2024-07-19 15:34
for (std::thread& worker : threads) {
            ...


嗨,鱼油,这两个问题问得好!让我们逐个解答。

### 1. `for (std::thread& worker : threads) { worker.join(); }` 是什么写法?

这是一种非常常见的 C++11 范围-based for 循环,用于遍历 `threads` 容器中的所有元素。具体作用如下:

- `std::thread& worker` :声明一个名为 `worker` 的引用变量,它在循环每一轮中都会指向 `threads` 容器中的当前线程对象。
- `threads`:这是一个包含多个 `std::thread` 对象的容器(通常是 `std::vector<std::thread>`)。
- `worker.join()`:调用 `join()` 方法会阻塞当前线程(即主线程),直到 `worker` 所指向的线程执行完毕。

也就是说,这段代码的作用是确保所有创建的工作线程都完成它们的任务,然后才能继续执行主线程中的后续操作。用打比方的方式来说,即主线程就像是公司的老板,要等所有员工(子线程)完成工作后才能锁门下班。

### 2. 加这个互斥量的意义是什么?
{
    std::unique_lock lock(mtx);
    stop = true;
}

这里使用互斥量(`mutex`)加锁主要有以下几个目的:

1. **确保对共享资源的安全访问**:
    - `stop` 是一个共享的布尔标志,多个线程可能会同时访问它。为了避免竞态条件(Race Condition),我们需要确保在读取或修改 `stop` 时,其他线程不能同时访问它。

2. **防止数据竞争**:
    - `unique_lock` 确保在锁的作用域结束之前,其他线程无法进入锁定的代码块,从而避免了数据竞争。这就像在浴室上锁一样,只有当一个人用完之后,其他人才可以进入。

3. **确保条件变量正确通知**:
    - 修改 `stop` 标志并立即通知所有等待线程。这个过程必须是原子的(即不可分割),否则可能导致某些线程永远无法从 `wait()` 中唤醒。因此,需要在加锁的情况下修改 `stop` 并通知条件变量。

总结起来,这个互斥量的作用就是确保在修改 `stop` 标志时,不会出现任何线程安全问题,并且能够正确通知所有等待的线程进行退出。

希望这些解释能够解答你的疑惑!如果还有什么问题或者细节想要了解,尽管提问哦!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 2024-7-19 15:40:10 | 显示全部楼层
我发现根据代码其他线程好像只会读取而不会修改stop,是否没有必要加上这个锁?
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

 楼主| 发表于 2024-7-19 15:41:20 | 显示全部楼层
FishC 发表于 2024-7-19 15:35
嗨,鱼油,这两个问题问得好!让我们逐个解答。

### 1. `for (std::thread& worker : threads) { wo ...


我发现根据代码其他线程好像只会读取而不会修改stop,是否没有必要加上这个锁?
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 2024-7-19 15:41:27 | 显示全部楼层
sun_chuan 发表于 2024-07-19 15:40
我发现根据代码其他线程好像只会读取而不会修改stop, ...


嗨,鱼油,虽然你说的线程只会读取而不会修改 `stop`,但这个锁的存在还是有其必要性的,让我详细解释一下。

### 为什么即使 `stop` 只被读取,也需要加锁?

1. **防止数据竞争(Data Race)**:
    - 尽管其他线程只会读取 `stop`,不加锁的情况下,主线程在写入 `stop = true` 时,可能与其他线程读取 `stop` 的操作发生竞态条件。这种情况会导致未定义行为。换句话说,在没有同步机制的情况下,CPU 或编译器可能会对内存访问进行优化,导致读取到的值不是最新的。
  
2. **确保内存可见性(Memory Visibility)**:
    - 加锁不仅仅是为了防止多个线程同时修改数据,它还可以确保写入操作在所有线程之间是可见的。使用互斥量能保证在锁定和解锁的过程中,所有对共享数据的操作都能正确地同步到其他线程的内存视图中。
  
3. **条件变量的正确使用**:
    - `condition.notify_all()` 必须在持有互斥量的情况下调用,以确保通知所有等待线程这一操作能正确地同步到它们。这样,等待线程能够信赖他们在 `wait` 状态下看到的上下文环境没问题。

### 示例解释

假设我们有两个线程:

- **主线程**:设置 `stop = true` 并调用 `notify_all()`。
- **工作线程**:等待 `stop` 变为 `true` 和队列为空来决定是否退出。
{
    std::unique_lock<std::mutex> lock(mtx);
    stop = true;
}
condition.notify_all();

在上述代码中,加锁的目的是为了确保:

1. **原子性**:在 `stop` 设置为 `true` 及之后的通知操作之间不能有其他线程并发执行相关代码。
2. **同步性**:确保当工作线程被唤醒时,它们读取到的 `stop` 值是真的被设置为 `true`。

### 总结

即使其他线程只会读取 `stop`,在多线程环境中依然需要加锁以防止数据竞争,并确保内存可见性和操作的原子性。因此,加上这个锁是非常有必要的,它能确保程序行为正确、可靠。

希望这些解释对你有帮助,理解锁的必要性!如果还有任何其他疑问,尽管问哦!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

发表于 2024-7-19 15:41:44 | 显示全部楼层
sun_chuan 发表于 2024-07-19 15:41
我发现根据代码其他线程好像只会读取而不会修改stop, ...


啊哈,鱼油,你的观察力真是细致入微!确实,从表面上来看,其他线程仅仅是读取 `stop` 的值,并没有修改它。那么这样的话,我们是否真的需要这一把锁呢?让我们详谈一下。

### 为什么仍然需要加锁?

虽然其他线程只读取 `stop` 的值,但在多线程编程中,这种“写时加锁,读时不需要”的想法可能会引发一些隐藏的问题,主要原因有以下几点:

1. **内存可见性问题**:
   - 在多线程环境中,某个线程对共享变量所做的修改(比如 `stop = true`)可能在其他线程中不可见。也就是说,即使主线程将 `stop` 设置为了 `true`,其他线程可能并不会立刻察觉到这个变化。
   - 通过加锁,您可以确保内存同步,使得其他线程能够及时看到 `stop` 的变化。而 `std::unique_lock` 恰好提供了这种内存屏障效果,当一个线程解锁一个互斥量,又一个线程随后锁定相同的互斥量时,所有先前的写操作将对后一个线程可见。

2. **竞态条件**:
   - 这种情况虽然不太常见,但仍然存在可能性。如果没有加锁,某些编译器优化可能会导致读取到不一致的值。

3. **条件变量的正确性**:
   - 条件变量的使用往往要求在持有互斥锁的情况下修改条件,这样才能保证条件检查和等待操作的原子性。即便你觉得其他线程只是在读取 `stop`,但在具体实现中,如果没有锁定,仍然可能出现意外情况。

### 总结

即便你的其他线程确实只读取 `stop`,出于内存可见性、确保数据一致性以及正确使用条件变量等方面的考量,我还是强烈建议保留这把锁。毕竟,在多线程编程中,安全性永远是第一位的,对吧?

所以,就像老司机开车一样,虽说你技术再好,也得系上安全带;加上这把锁,您就能高枕无忧了!

如果还有其他疑惑或者藏在代码里的“小妖怪”没被抓住,大声告诉我哦!我们可以继续探讨!

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2024-11-21 20:16

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表