鱼C论坛

 找回密码
 立即注册
查看: 2507|回复: 3

[已解决]多线程无锁设计的实现:生产者-消费者问题

[复制链接]
发表于 2023-10-19 10:14:44 | 显示全部楼层 |阅读模式
5鱼币
1、问题描述:由UNIX网络编程中生产者-消费者问题延申,生产者线程和消费者线程不利用锁机制(互斥锁、读写锁)能否实现呢?

个人思路:通过数据结构来避免锁的使用,但是有一个前提条件:消费者线程读取(和处理)数据的速度要比生产者线程写入数据要快,否则将造成数据丢失。

2、实现单生产者-单消费者情况(c\c++)



3、实现单生产者-多消费者情况(c\c++)


4、有大佬在项目中遇到过上述问题并付诸实践了吗?期待大家的帮助,十分感谢。
最佳答案
2023-10-19 10:14:45
生产者-消费者问题的多线程无锁设计实现:

1. 思路:
   通过使用数据结构来避免锁的使用,可以使用队列作为数据结构,并设置一个缓冲区来存放生产者生成的数据。在生产者向缓冲区中写入数据时,需要判断缓冲区是否已满,如果已满则等待一段时间再尝试写入;在消费者从缓冲区中读取数据时,需要判断缓冲区是否为空,如果为空则等待一段时间再尝试读取。

2. 单生产者-单消费者情况(C/C++):
  1. ++
  2. #include <iostream>
  3. #include <vector>
  4. #include <thread>
  5. #include <chrono>
  6. #include <mutex>
  7. #include <condition_variable>
  8. using namespace std;
  9. class CircularQueue {
  10. public:
  11.     CircularQueue(int capacity) : _capacity(capacity), _head(0), _tail(0), _size(0), _queue(capacity) {}
  12.     void push(int data) {
  13.         unique_lock<mutex> lock(_mutex);
  14.         // 缓冲区已满,等待
  15.         while (_size == _capacity) {
  16.             _cond_full.wait(lock);
  17.         }
  18.         _queue[_tail] = data;
  19.         _tail = (_tail + 1) % _capacity;
  20.         ++_size;
  21.         _cond_empty.notify_one();  // 唤醒等待读取数据的消费者
  22.     }
  23.     int pop() {
  24.         unique_lock<mutex> lock(_mutex);
  25.         // 缓冲区为空,等待
  26.         while (_size == 0) {
  27.             _cond_empty.wait(lock);
  28.         }
  29.         int data = _queue[_head];
  30.         _head = (_head + 1) % _capacity;
  31.         --_size;
  32.         _cond_full.notify_one(); // 唤醒等待写入数据的生产者
  33.         return data;
  34.     }
  35. private:
  36.     const int _capacity;
  37.     int _head;
  38.     int _tail;
  39.     int _size;
  40.     vector<int> _queue;
  41.     mutex _mutex;
  42.     condition_variable _cond_empty;
  43.     condition_variable _cond_full;
  44. };
  45. void producer(CircularQueue& q) {
  46.     for (int i = 0; i < 10; ++i) {
  47.         this_thread::sleep_for(chrono::seconds(1));  // 模拟生产者生成数据的时间
  48.         q.push(i);
  49.         cout << "producer produce data: " << i << endl;
  50.     }
  51. }
  52. void consumer(CircularQueue& q) {
  53.     for (int i = 0; i < 10; ++i) {
  54.         int data = q.pop();
  55.         cout << "consumer consume data: " << data << endl;
  56.     }
  57. }
  58. int main() {
  59.     CircularQueue q(5);  // 缓冲区容量为5
  60.     thread t1(producer, ref(q));
  61.     thread t2(consumer, ref(q));
  62.     t1.join();
  63.     t2.join();
  64.     return 0;
  65. }
复制代码

3. 单生产者-多消费者情况(C/C++):
  1. ++
  2. #include <iostream>
  3. #include <vector>
  4. #include <thread>
  5. #include <chrono>
  6. #include <mutex>
  7. #include <condition_variable>
  8. using namespace std;
  9. class CircularQueue {
  10. public:
  11.     CircularQueue(int capacity) : _capacity(capacity), _head(0), _tail(0), _size(0), _queue(capacity) {}
  12.     void push(int data) {
  13.         unique_lock<mutex> lock(_mutex);
  14.         // 缓冲区已满,等待
  15.         while (_size == _capacity) {
  16.             _cond_full.wait(lock);
  17.         }
  18.         _queue[_tail] = data;
  19.         _tail = (_tail + 1) % _capacity;
  20.         ++_size;
  21.         _cond_empty.notify_one(); // 唤醒等待读取数据的消费者
  22.     }
  23.     int pop() {
  24.         unique_lock<mutex> lock(_mutex);
  25.         // 缓冲区为空,等待
  26.         while (_size == 0) {
  27.             _cond_empty.wait(lock);
  28.         }
  29.         int data = _queue[_head];
  30.         _head = (_head + 1) % _capacity;
  31.         --_size;
  32.         _cond_full.notify_one(); // 唤醒等待写入数据的生产者
  33.         return data;
  34.     }
  35. private:
  36.     const int _capacity;
  37.     int _head;
  38.     int _tail;
  39.     int _size;
  40.     vector<int> _queue;
  41.     mutex _mutex;
  42.     condition_variable _cond_empty;
  43.     condition_variable _cond_full;
  44. };
  45. void producer(CircularQueue& q) {
  46.     for (int i = 0; i < 10; ++i) {
  47.         this_thread::sleep_for(chrono::seconds(1));  // 模拟生产者生成数据的时间
  48.         q.push(i);
  49.         cout << "producer produce data: " << i << endl;
  50.     }
  51. }
  52. void consumer(CircularQueue& q, int id) {
  53.     for (int i = 0; i < 5; ++i) {
  54.         int data = q.pop();
  55.         cout << "consumer " << id << " consume data: " << data << endl;
  56.     }
  57. }
  58. int main() {
  59.     CircularQueue q(3);  // 缓冲区容量为3
  60.     thread t1(producer, ref(q));
  61.     thread t2(consumer, ref(q), 1);
  62.     thread t3(consumer, ref(q), 2);
  63.     t1.join();
  64.     t2.join();
  65.     t3.join();
  66.     return 0;
  67. }
复制代码

4. 实践经验:
   实际项目中,使用无锁的生产者-消费者模型可以提高多线程程序的效率和性能。但是需要注意的是,如果消费者线程读取(和处理)数据的速度比生产者线程写入数据的速度还要慢,那么将会出现数据丢失的问题。此时需要考虑其他的解决方案,例如:增加缓冲区的容量、调整生产者与消费者的速度等。

球一个最佳答案谢谢啦!这对我非常重要!
小甲鱼最新课程 -> https://ilovefishc.com
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-9-23 12:13

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表