用std::queue+mutex+condition_variable实现线程安全流水线:生产者检查容量并wait空位,消费者检查空队列并wait数据,均用while循环防虚假唤醒,notify_one避免性能浪费;支持模板与函数对象泛化类型和处理逻辑;必要时可用deque替代queue以支持peek等操作。
std::queue + std::mutex + std::condition_variable 实现基础流水线核心就是让生产者往队列塞数据、消费者从队列取数据,靠互斥锁保线程安全,靠条件变量避免忙等。别直接裸用 std::queue——它不是线程安全的,不加锁会崩。
关键点:队列为空时消费者必须等待;队列满时生产者最好也等(否则可能无限 push 导致 OOM)。示例里用固定容量 10 的队列模拟“流水线缓冲区”:
std::queue<int> q;std::mutex mtx;std::condition_variable cv_not_empty, cv_not_full;const size_t CAPACITY = 10;
生产者逻辑里先锁、检查容量、wait 等空位;消费者类似,但检查是否为空。注意 cv_not_full.wait() 必须传入 std::unique_lock,且条件判断要用 lambda 防止虚假唤醒。
notify_one() 还是 notify_all()?选错会卡死或性能差生产者 push 完一个元素后,只唤醒一个等待的消费者就够了——用 notify_one()。同理,消费者 pop 后只唤醒一个生产者。用 notify_all() 会导致所有等待线程争抢锁,尤其在高并发下浪费 CPU。
立即学习“C++免费学习笔记(深入)”;
但有个坑:如果用了 notify_one() 却没有对应等待者(比如消费者已退出),这次通知就丢了。所以实际中更稳妥的做法是——只要状态变了就 notify,靠 while 循环重检条件,而不是依赖“刚好唤醒对的人”。
cv_not_empty.notify_one()
cv_not_full.notify_one()
while (q.empty()) cv_not_empty.wait(lk),不用 if
硬编码 int 没法复用。把队列类型、生产逻辑、消费逻辑都泛化:
template <typename T>class Pipeline { std::queue<T> q; std::mutex mtx; std::condition_variable cv_not_empty, cv_not_full; size_t capacity_;public: Pipeline(size_t cap) : capacity_(cap) {} void produce(T item); T consume();};
再进一步,把“怎么生成”“怎么处理”抽成可调用对象:
std::function<T()>,每次调用生成一个 T
std::function<void(T)>,拿到就处理注意:如果处理函数耗时长,别让它在持有锁期间执行——consume() 应该只负责取数据并释放锁,再在外面处理。
std::deque 替代 std::queue?不是更麻烦吗std::queue 默认底层是 std::deque,但它是适配器,不暴露迭代器和容量接口。想做容量检查(size())、或需要 front/pop 分离(比如 peek 后决定是否真正消费),就得直接用 std::deque。
比如这个常见需求:消费者想看队首元素但不立即移除,判断是否满足某种条件再决定 pop。用 std::queue 做不到,只能换 std::deque 并自己管锁:
deque.front() 查看,deque.pop_front() 移除deque.size() 判断满/空,比每次 empty() + size() 调用更准(避免竞态)实际项目里,除非明确需要 peek 或动态调整容量,否则 sticking with std::queue 更清晰。别为了“看起来灵活”提前过度设计。