博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[C++11]半同步半异步线程池实现与分析
阅读量:4183 次
发布时间:2019-05-26

本文共 5884 字,大约阅读时间需要 19 分钟。

线程池介绍

服务器完成一项任务的时间可分为:T1:创建线程或进程时间;T2:执行任务时间;T3:销毁进程或线程时间。通常T1+T3的时间大于T2,线程池正是关注如何缩短T1和T3的时间。

线程池通过在系统中预先创建一定数量的线程,当任务请求到来时从线程池中分一个预先创建的线程去处理任务,线程在处理完任务后还可以重用,不会销毁,继续等待下次任务的到开。这样能避免大量的线程创建和销毁操作,从而节省系统资源;同时有很多任务时,也会减少创建线程的数量。
用C++11的线程相关特性,比如线程、条件变量、互斥量,让我们编写并发程序更简单。

半同步半异步线程池实现的关键技术分析

线程池又三层组成:

1. 同步服务层:不断的将新任务添加到同步队列中,可以用多路复用或者多线程来完成。一开始没看懂任务是什么,其实一个函数就是一个任务,C++11通过std::function将函数封装为类模板对象,可以将这些任务(函数)放到容器中保存起来,以进行添加读取任务操作。
2. 排队层:就是一个同步队列,处于核心地位。所有待处理的任务都存在这里,要保证队列中共享数据线程安全(加锁),还要控制任务的数量,上层服务层往队列添加任务,下层从这里取任务去执行。
3. 异步服务层: 预先创建好线程,来并行处理队列中的任务。

自己画了一张图:

这里写图片描述

代码实现与分析

用到了很多C++11的特性,里面写了很多注释,是根据自己理解分析的。

同步队列:

#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;template
class Syncqueue{public: //初始化,队列的最大元素个数,开始不终止 Syncqueue( int maxsize ) : m_maxsize(maxsize),m_needStop(false){} //往队列中添加任务,重载两个版本,左值和右值引用 void Put( const T& x ) { Add(x); } void Put(T&& x) { Add(forward
(x)); } //Take和Add类似 void Take(list
& list) { unique_lock
locker(m_mtx); m_notEmpty.wait(locker, [this] { return m_needStop|| NotEmpty(); });//停止或者不空就继续执行,不用wait if(m_needStop) return ; //一次加锁,一下取出队列中的所有数据 list = move(m_queue); //通过移动,将 m_queue 转移到 list,而不是拷贝 m_notFull.notify_one(); //唤醒线程去添加任务 } //每次获取一个数据,效率较低 void Take(T& t) { unique_lock
locker(m_mtx); m_notFull.wait(locker, [this] { return m_needStop || NotEmpty(); }); if(m_needStop) return ; t = m_queue.front(); //取出一个 m_queue.pop_front(); m_notFull.notify_one(); } //方便让用户能终止任务 void Stop() { { lock_guard
locker(m_mtx); m_needStop = true; //将需要停止标志 置为 true //执行到这,lock_guard释放锁 } //唤醒所有等待的线程,到if(m_needStop)时为真,然后相继退出 m_notEmpty.notify_all(); //被唤醒的线程直接获取锁 m_notFull.notify_all(); } //判断队列是否为空 bool Empty() { lock_guard
locker(m_mtx); return m_queue.empty(); } //判断队列满了 bool Full() { lock_guard
locker(m_mtx); return m_queue.size() == m_maxsize; } //队列大小 size_t Size() { lock_guard
locker(m_mtx); return m_queue.size(); }private: //队列未满 bool NotFull() const { bool full = m_queue.size() >= m_maxsize; if(full) cout << "缓冲区满了,需要等待…… " << endl; return !full; } //队列不空 bool NotEmpty() const { bool empty = m_queue.empty(); if(empty) cout << "缓冲区空了,需要等待…… 异步层的线程id: " << this_thread::get_id() << endl; return !empty; } //范型事件函数 template
void Add(F&& x) { unique_lock
locker(m_mtx); m_notFull.wait(locker,[this]{ return m_needStop|| NotFull(); }); //需要停止 或者 不满则继续往下执行,否则wait if(m_needStop) return; //如果需要终止就 return m_queue.push_back(forward
(x)); //不终止,把任务添加到同步队列 m_notEmpty.notify_one(); //提醒线程队列不为空,唤醒线程去取数据 }private: list
m_queue; //缓冲区 用链表实现 mutex m_mtx; //互斥量 condition_variable m_notEmpty; //不为空的条件变量 condition_variable m_notFull; //没有满的条件变量 int m_maxsize; //同步队列最大的size bool m_needStop; //停止的标志,开始是false};

线程池:

/*************************************************************************    > File Name: ThreadPool.h    > Author: Tanswer    > Mail: duxm@xiyoulinux.org    > Created Time: 2017年08月10日 星期四 14时46分51秒 ************************************************************************/#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "Syncqueue.h"using namespace std;const int MaxTaskCount = 100; //最大任务数量class ThreadPool{public: using Task = function
; //任务类型,这里是无参数无返回值,可以修改为任何类型的范型函数模板 //hardware_concurrency CPU核数 当默认线程数 ThreadPool(int numThreads = thread::hardware_concurrency()) : m_queue(MaxTaskCount) { Start(numThreads); //启动 } ~ThreadPool() { Stop(); //如果没有停止时,则主动终止线程池 } //终止线程池,销毁池中所有线程 void Stop() { //保证多线程情况下只调用一次StopThreadGroup call_once(m_flag, [this] { StopThreadGroup(); }); } //同步服务层:往同步队列中添加任务,两个版本 void AddTask(Task&& task) { m_queue.Put(forward
(task)); } void AddTask(const Task& task) { m_queue.Put(task); }private: void Start( int numThreads ) //线程池开始,预先创建包含numThreads 个线程的线程组 { m_running = true; //创建线程组 for(int i=0; i
(&ThreadPool::RunInThread, this) ); } } void RunInThread() { while(m_running) { //一次取出队列中所有任务 list
list; m_queue.Take(list); for(auto& task : list) { if(!m_running) //如果停止 return ; task(); //执行任务 } } } //终止线程池,销毁池中所有线程 void StopThreadGroup() { m_queue.Stop(); //让同步队列中的线程停止 m_running = false; //让内部线程跳出循环并退出 for(auto thread : m_threadgroup) { if(thread) thread -> join(); } m_threadgroup.clear(); }private: list
> m_threadgroup; //处理任务的线程组,用list保存 Syncqueue
m_queue; //同步队列 atomic_bool m_running; //是否停止的标志 once_flag m_flag; //call_once的参数};

测试例子:

/*************************************************************************    > File Name: TestThreadPool.cpp    > Author: Tanswer    > Mail: duxm@xiyoulinux.org    > Created Time: 2017年08月10日 星期四 16时49分35秒 ************************************************************************/#include 
#include
#include
#include
#include
#include
#include
#include "ThreadPool.h"using namespace std;void TestThreadPool(){ ThreadPool pool(2); //线程池创建两个线程,异步层此时无任务需要先等待 //pool.Start(2); //创建两个同步层的线程不断往线程池中添加任务 //在这任务很简单,打印同步层线程ID,用lambda表达式表示,每个线程处理10个任务 thread thd1( [&pool]{ for(int i=0; i<10; i++ ) { auto thdId = this_thread::get_id(); pool.AddTask( [thdId]{ cout << "同步层线程1的线程ID: " << thdId << endl; } ); } } ); thread thd2( [&pool]{ for( int i=0; i<10; i++ ) { auto thdId = this_thread::get_id(); pool.AddTask( [thdId]{ cout << "同步层线程2的线程ID: " << thdId << endl; } ); } } ); this_thread::sleep_for(chrono::seconds(2)); getchar(); //停止线程池 pool.Stop(); //等待同步层的两个线程执行完 thd1.join(); thd2.join();}int main(){ TestThreadPool(); exit(EXIT_SUCCESS);}

测试结果:

这里写图片描述

线程池预先创建了两个线程,线程ID分别为: 140141544822528 和 140141536429824,开始时同步队列是空的,还没有任务,所以两个线程都等待。然后创建了两个同步层线程1和2,线程ID分别为:140141528037120 和 140141519644416,这两个线程开始不断往线程池同步队列中添加任务。有了任务后,线程池异步层中的两个线程开始处理任务,任务很简单,就是打印同步层线程ID,异步层线程交替处理上层的任务。

你可能感兴趣的文章
Leetcode Best Time to Buy and Sell Stock 2
查看>>
Best Time to Buy and Sell Stock III
查看>>
自制编译器:词法单元解析
查看>>
LeetCode 21 Merge Two Sorted Lists
查看>>
LeetCode Palindrom Number
查看>>
LeeCode 88. Merge Sorted Array两种解法
查看>>
《UNIX环境高级编程》---2 UNIX标准及实现
查看>>
LeetCode24 Swap Nodes in Pairs 25. Reverse Nodes in k-Group详解
查看>>
《UNIX环境高级编程》---3.文件I/O
查看>>
LeetCode 234. Palindrome Linked List判断链表是否回文
查看>>
LeetCode Reverse Linked List I, II详解
查看>>
《UNIX环境高级编程》---4文件和目录
查看>>
LeetCode 147. Insertion Sort List插入排序链表的高效简单解法
查看>>
LeetCode Rotate List简单易懂解法
查看>>
hihocoder 最长回文子串简单解法
查看>>
LeetCode 14. Longest Common Prefix最长公共前缀
查看>>
LeetCode 58. Length of Last Word
查看>>
LeetCode Super Pow详解
查看>>
LeetCode 258. Add Digits两种方法
查看>>
LeetCode 258. Add Digits两种方法
查看>>