生产者-消费者问题的多线程无锁设计实现:
1. 思路:
通过使用数据结构来避免锁的使用,可以使用队列作为数据结构,并设置一个缓冲区来存放生产者生成的数据。在生产者向缓冲区中写入数据时,需要判断缓冲区是否已满,如果已满则等待一段时间再尝试写入;在消费者从缓冲区中读取数据时,需要判断缓冲区是否为空,如果为空则等待一段时间再尝试读取。
2. 单生产者-单消费者情况(C/C++):++#include <iostream>#include <vector>#include <thread>#include <chrono>#include <mutex>#include <condition_variable>using namespace std;class CircularQueue {public: CircularQueue(int capacity) : _capacity(capacity), _head(0), _tail(0), _size(0), _queue(capacity) {} void push(int data) { unique_lock<mutex> lock(_mutex); // 缓冲区已满,等待 while (_size == _capacity) { _cond_full.wait(lock); } _queue[_tail] = data; _tail = (_tail + 1) % _capacity; ++_size; _cond_empty.notify_one(); // 唤醒等待读取数据的消费者 } int pop() { unique_lock<mutex> lock(_mutex); // 缓冲区为空,等待 while (_size == 0) { _cond_empty.wait(lock); } int data = _queue[_head]; _head = (_head + 1) % _capacity; --_size; _cond_full.notify_one(); // 唤醒等待写入数据的生产者 return data; }private: const int _capacity; int _head; int _tail; int _size; vector<int> _queue; mutex _mutex; condition_variable _cond_empty; condition_variable _cond_full;};void producer(CircularQueue& q) { for (int i = 0; i < 10; ++i) { this_thread::sleep_for(chrono::seconds(1)); // 模拟生产者生成数据的时间 q.push(i); cout << "producer produce data: " << i << endl; }}void consumer(CircularQueue& q) { for (int i = 0; i < 10; ++i) { int data = q.pop(); cout << "consumer consume data: " << data << endl; }}int main() { CircularQueue q(5); // 缓冲区容量为5 thread t1(producer, ref(q)); thread t2(consumer, ref(q)); t1.join(); t2.join(); return 0;}
3. 单生产者-多消费者情况(C/C++):++#include <iostream>#include <vector>#include <thread>#include <chrono>#include <mutex>#include <condition_variable>using namespace std;class CircularQueue {public: CircularQueue(int capacity) : _capacity(capacity), _head(0), _tail(0), _size(0), _queue(capacity) {} void push(int data) { unique_lock<mutex> lock(_mutex); // 缓冲区已满,等待 while (_size == _capacity) { _cond_full.wait(lock); } _queue[_tail] = data; _tail = (_tail + 1) % _capacity; ++_size; _cond_empty.notify_one(); // 唤醒等待读取数据的消费者 } int pop() { unique_lock<mutex> lock(_mutex); // 缓冲区为空,等待 while (_size == 0) { _cond_empty.wait(lock); } int data = _queue[_head]; _head = (_head + 1) % _capacity; --_size; _cond_full.notify_one(); // 唤醒等待写入数据的生产者 return data; }private: const int _capacity; int _head; int _tail; int _size; vector<int> _queue; mutex _mutex; condition_variable _cond_empty; condition_variable _cond_full;};void producer(CircularQueue& q) { for (int i = 0; i < 10; ++i) { this_thread::sleep_for(chrono::seconds(1)); // 模拟生产者生成数据的时间 q.push(i); cout << "producer produce data: " << i << endl; }}void consumer(CircularQueue& q, int id) { for (int i = 0; i < 5; ++i) { int data = q.pop(); cout << "consumer " << id << " consume data: " << data << endl; }}int main() { CircularQueue q(3); // 缓冲区容量为3 thread t1(producer, ref(q)); thread t2(consumer, ref(q), 1); thread t3(consumer, ref(q), 2); t1.join(); t2.join(); t3.join(); return 0;}
4. 实践经验:
实际项目中,使用无锁的生产者-消费者模型可以提高多线程程序的效率和性能。但是需要注意的是,如果消费者线程读取(和处理)数据的速度比生产者线程写入数据的速度还要慢,那么将会出现数据丢失的问题。此时需要考虑其他的解决方案,例如:增加缓冲区的容量、调整生产者与消费者的速度等。
球一个最佳答案谢谢啦!这对我非常重要! |