网站权重多少比较好,孟津网站开发,横店网站开发,北京互联网建站网站目录
9.1 线程池
9.1.1 最简易可行的线程池
9.1.2 等待提交给线程池的任务完成运行
9.1.3等待其他任务完成的任务
9.1.4 避免任务队列上的争夺
9.1.5 任务窃取
9.2 中断线程
9.2.1 发起一个线程#xff0c;以及把他中断
9.2.2 检测线程是否被中断
9.2.3 中断条件变…目录
9.1 线程池
9.1.1 最简易可行的线程池
9.1.2 等待提交给线程池的任务完成运行
9.1.3等待其他任务完成的任务
9.1.4 避免任务队列上的争夺
9.1.5 任务窃取
9.2 中断线程
9.2.1 发起一个线程以及把他中断
9.2.2 检测线程是否被中断
9.2.3 中断条件变量上的等待
9.2.4 中断条件变量std::condition_variable_any上的等待
9.2.5 中断其他阻塞型等待
9.2.6 处理中断
9.2.7 在应用程序推出时中断后台任务
9.3 小结 参考https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019/blob/master/content/chapter9/9.1-chinese.md
9.1 线程池
大多数系统中将每个任务指定给某个线程是不切实际的不过可以利用并发性进行并发执行。线程池提供了这样的功能将提交到线程池中的任务并发执行提交的任务将会挂在任务队列上。工作线程会从队列中的获取任务当任务执行完成后再从任务队列中获取下一个任务。
创建一个线程池时会遇到几个关键性的设计问题比如可使用的线程数量高效的任务分配方式以及是否需要等待一个任务完成。
9.1.1 最简易可行的线程池
代码9.1 简单的线程池
class thread_pool
{std::atomic_bool done;thread_safe_queuestd::functionvoid() work_queue; // 1std::vectorstd::thread threads; // 2join_threads joiner; // 3void worker_thread(){while(!done) // 4{std::functionvoid() task;if(work_queue.try_pop(task)) // 5{task(); // 6}else{std::this_thread::yield(); // 7}}}public:thread_pool():done(false),joiner(threads){unsigned const thread_countstd::thread::hardware_concurrency(); // 8try{for(unsigned i0;ithread_count;i){threads.push_back( std::thread(thread_pool::worker_thread,this)); // 9}}catch(...){donetrue; // 10throw;}}~thread_pool(){donetrue; // 11}templatetypename FunctionTypevoid submit(FunctionType f){work_queue.push(std::functionvoid()(f)); // 12}
};
这样简单的线程池就完成了特别是任务没有返回值或需要执行阻塞操作的任务。很多情况下这样的线程池是不够用的其他情况使用这样简单的线程池可能会出现问题比如死锁。同样在简单例子中使用std::async能提供更好的功能。
9.1.2 等待提交给线程池的任务完成运行
第8章中的例子中线程间的任务划分完成后代码会显式生成新线程主线程通常是等待新线程在返回调用之后结束确保所有任务都完成。使用线程池就需要等待任务提交到线程池中而非直接提交给单个线程。与基于std::async的方法类似使用代码9.1中的简单线程池使用第4章中提到的工具条件变量和future。虽然会增加代码的复杂度不过要比直接对任务进行等待好很多。
通过增加线程池的复杂度可以直接等待任务完成。使用submit()函数返回对任务描述的句柄可用来等待任务的完成。任务句柄会用条件变量或future进行包装从而简化线程池的实现。
一种特殊的情况是执行任务的线程需要返回结果到主线程上进行处理。本这种情况下需要用future对最终的结果进行转移。代码9.2展示了对简单线程池的修改通过修改就能等待任务完成以及在工作线程完成后返回一个结果到等待线程中去不过std::packaged_task实例是不可拷贝的仅可移动所以不能再使用std::function来实现任务队列因为std::function需要存储可复制构造的函数对象。包装一个自定义函数用来处理可移动的类型就是一个带有函数操作符的类型擦除类。只需要处理没有入参的函数和无返回的函数即可所以这只是一个简单的虚函数调用。
代码9.2 可等待任务的线程池
class function_wrapper
{struct impl_base {virtual void call()0;virtual ~impl_base() {}};std::unique_ptrimpl_base impl;templatetypename Fstruct impl_type: impl_base{F f;impl_type(F f_): f(std::move(f_)) {}void call() { f(); }};
public:templatetypename Ffunction_wrapper(F f):impl(new impl_typeF(std::move(f))){}void operator()() { impl-call(); }function_wrapper() default;function_wrapper(function_wrapper other):impl(std::move(other.impl)){}function_wrapper operator(function_wrapper other){implstd::move(other.impl);return *this;}function_wrapper(const function_wrapper)delete;function_wrapper(function_wrapper)delete;function_wrapper operator(const function_wrapper)delete;
};class thread_pool
{thread_safe_queuefunction_wrapper work_queue; // 使用function_wrapper而非使用std::functionvoid worker_thread(){while(!done){function_wrapper task;if(work_queue.try_pop(task)){task();}else{std::this_thread::yield();}}}
public:templatetypename FunctionTypestd::futuretypename std::result_ofFunctionType()::type // 1submit(FunctionType f){typedef typename std::result_ofFunctionType()::typeresult_type; // 2std::packaged_taskresult_type() task(std::move(f)); // 3std::futureresult_type res(task.get_future()); // 4work_queue.push(std::move(task)); // 5return res; // 6}// 和之前一样
};
9.1.3等待其他任务完成的任务
最简单的方法就是在thread_pool中添加一个新函数来执行任务队列上的任务并对线程池进行管理。高级线程池的实现可能会在等待函数中添加逻辑或等待其他函数来处理这个任务优先的任务会让其他的任务进行等待。下面代码中的实现就展示了一个新run_pending_task()函数对于快速排序的修改将会在代码9.5中展示。
代码9.4 run_pending_task()函数实现
void thread_pool::run_pending_task()
{function_wrapper task;if(work_queue.try_pop(task)){task();}else{std::this_thread::yield();}
}
下面快速排序算法的实现要比代码8.1中版本简单许多因为所有线程管理逻辑都移到线程池中了。
代码9.5 基于线程池的快速排序实现
templatetypename T
struct sorter // 1
{thread_pool pool; // 2std::listT do_sort(std::listT chunk_data){if(chunk_data.empty()){return chunk_data;}std::listT result;result.splice(result.begin(),chunk_data,chunk_data.begin());T const partition_val*result.begin();typename std::listT::iterator divide_pointstd::partition(chunk_data.begin(),chunk_data.end(),[](T const val){return valpartition_val;});std::listT new_lower_chunk;new_lower_chunk.splice(new_lower_chunk.end(),chunk_data,chunk_data.begin(),divide_point);std::futurestd::listT new_lower // 3pool.submit(std::bind(sorter::do_sort,this,std::move(new_lower_chunk)));std::listT new_higher(do_sort(chunk_data));result.splice(result.end(),new_higher);while(!new_lower.wait_for(std::chrono::seconds(0)) std::future_status::timeout){pool.run_pending_task(); // 4}result.splice(result.begin(),new_lower.get());return result;}
};templatetypename T
std::listT parallel_quick_sort(std::listT input)
{if(input.empty()){return input;}sorterT s;return s.do_sort(input);
}
9.1.4 避免任务队列上的争夺
为了避免乒乓缓存每个线程建立独立的任务队列。这样每个线程就会将新任务放在自己的任务队列上并且当线程上的任务队列没有任务时去全局的任务列表中取任务。下面列表中的实现使用了一个thread_local变量来保证每个线程都拥有自己的任务列表(如全局列表那样)。
代码9.6 线程池——线程具有本地任务队列
class thread_pool
{thread_safe_queuefunction_wrapper pool_work_queue;typedef std::queuefunction_wrapper local_queue_type; // 1static thread_local std::unique_ptrlocal_queue_typelocal_work_queue; // 2void worker_thread(){local_work_queue.reset(new local_queue_type); // 3while(!done){run_pending_task();}}public:templatetypename FunctionTypestd::futuretypename std::result_ofFunctionType()::typesubmit(FunctionType f){typedef typename std::result_ofFunctionType()::type result_type;std::packaged_taskresult_type() task(f);std::futureresult_type res(task.get_future());if(local_work_queue) // 4{local_work_queue-push(std::move(task));}else{pool_work_queue.push(std::move(task)); // 5}return res;}void run_pending_task(){function_wrapper task;if(local_work_queue !local_work_queue-empty()) // 6{taskstd::move(local_work_queue-front());local_work_queue-pop();task();}else if(pool_work_queue.try_pop(task)) // 7{task();}else{std::this_thread::yield();}}
// rest as before
};
9.1.5 任务窃取
任务分配不均时造成的结果就是某个线程本地队列中有很多任务的同时其他线程无所事事。例如举一个快速排序的例子一开始的数据块能在线程池上被处理因为剩余部分会放在工作线程的本地队列上进行处理这样的使用方式也违背使用线程池的初衷。
幸好这个问题有解本地工作队列和全局工作队列上没有任务时可从别的线程队列中窃取任务。
代码9.7 基于锁的任务窃取队列
class work_stealing_queue
{
private:typedef function_wrapper data_type;std::dequedata_type the_queue; // 1mutable std::mutex the_mutex;public:work_stealing_queue(){}work_stealing_queue(const work_stealing_queue other)delete;work_stealing_queue operator(const work_stealing_queue other)delete;void push(data_type data) // 2{std::lock_guardstd::mutex lock(the_mutex);the_queue.push_front(std::move(data));}bool empty() const{std::lock_guardstd::mutex lock(the_mutex);return the_queue.empty();}bool try_pop(data_type res) // 3{std::lock_guardstd::mutex lock(the_mutex);if(the_queue.empty()){return false;}resstd::move(the_queue.front());the_queue.pop_front();return true;}bool try_steal(data_type res) // 4{std::lock_guardstd::mutex lock(the_mutex);if(the_queue.empty()){return false;}resstd::move(the_queue.back());the_queue.pop_back();return true;}
};
这就说明每个线程中的“队列”是一个后进先出的栈最新推入的任务将会第一个执行。从缓存角度来看这将对性能有所提升因为任务相关的数据一直存于缓存中要比提前将任务相关数据推送到栈上好。同样这种方式很好的映射到某个算法上例如快速排序。之前的实现中每次调用do_sort()都会推送一个任务到栈上并且等待这个任务执行完毕。通过对最新推入任务的处理就可以保证在将当前所需数据块处理完成前其他任务是否需要这些数据块从而可以减少活动任务的数量和栈的使用次数。try_steal()从队列末尾获取任务为了减少与try_pop()之间的竞争。使用在第6、7章中的所讨论的技术来让try_pop()和try_steal()并发执行。
现在拥有了一个很不错的任务队列并且支持窃取。那如何在线程池中使用这个队列呢这里简单的展示一下。
代码9.8 使用任务窃取的线程池
class thread_pool
{typedef function_wrapper task_type;std::atomic_bool done;thread_safe_queuetask_type pool_work_queue;std::vectorstd::unique_ptrwork_stealing_queue queues; // 1std::vectorstd::thread threads;join_threads joiner;static thread_local work_stealing_queue* local_work_queue; // 2static thread_local unsigned my_index;void worker_thread(unsigned my_index_){my_indexmy_index_;local_work_queuequeues[my_index].get(); // 3while(!done){run_pending_task();}}bool pop_task_from_local_queue(task_type task){return local_work_queue local_work_queue-try_pop(task);}bool pop_task_from_pool_queue(task_type task){return pool_work_queue.try_pop(task);}bool pop_task_from_other_thread_queue(task_type task) // 4{for(unsigned i0;iqueues.size();i){unsigned const index(my_indexi1)%queues.size(); // 5if(queues[index]-try_steal(task)){return true;}}return false;}public:thread_pool():done(false),joiner(threads){unsigned const thread_countstd::thread::hardware_concurrency();try{for(unsigned i0;ithread_count;i){queues.push_back(std::unique_ptrwork_stealing_queue( // 6new work_stealing_queue));threads.push_back(std::thread(thread_pool::worker_thread,this,i));}}catch(...){donetrue;throw;}}~thread_pool(){donetrue;}templatetypename FunctionTypestd::futuretypename std::result_ofFunctionType()::type submit(FunctionType f){ typedef typename std::result_ofFunctionType()::type result_type;std::packaged_taskresult_type() task(f);std::futureresult_type res(task.get_future());if(local_work_queue){local_work_queue-push(std::move(task));}else{pool_work_queue.push(std::move(task));}return res;}void run_pending_task(){task_type task;if(pop_task_from_local_queue(task) || // 7pop_task_from_pool_queue(task) || // 8pop_task_from_other_thread_queue(task)) // 9{task();}else{std::this_thread::yield();}}
};
9.2 中断线程
9.2.1 发起一个线程以及把他中断
代码9.9 interruptible_thread的基本实现
class interrupt_flag
{
public:void set();bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag; // 1class interruptible_thread
{std::thread internal_thread;interrupt_flag* flag;
public:templatetypename FunctionTypeinterruptible_thread(FunctionType f){std::promiseinterrupt_flag* p; // 2internal_threadstd::thread([f,p]{ // 3p.set_value(this_thread_interrupt_flag);f(); // 4});flagp.get_future().get(); // 5}void interrupt(){if(flag){flag-set(); // 6}}
};
9.2.2 检测线程是否被中断
9.2.3 中断条件变量上的等待
代码9.11 为std::condition_variable在interruptible_wait中使用超时
class interrupt_flag
{std::atomicbool flag;std::condition_variable* thread_cond;std::mutex set_clear_mutex;public:interrupt_flag():thread_cond(0){}void set(){flag.store(true,std::memory_order_relaxed);std::lock_guardstd::mutex lk(set_clear_mutex);if(thread_cond){thread_cond-notify_all();}}bool is_set() const{return flag.load(std::memory_order_relaxed);}void set_condition_variable(std::condition_variable cv){std::lock_guardstd::mutex lk(set_clear_mutex);thread_condcv;}void clear_condition_variable(){std::lock_guardstd::mutex lk(set_clear_mutex);thread_cond0;}struct clear_cv_on_destruct{~clear_cv_on_destruct(){this_thread_interrupt_flag.clear_condition_variable();}};
};void interruptible_wait(std::condition_variable cv,std::unique_lockstd::mutex lk)
{interruption_point();this_thread_interrupt_flag.set_condition_variable(cv);interrupt_flag::clear_cv_on_destruct guard;interruption_point();cv.wait_for(lk,std::chrono::milliseconds(1));interruption_point();
}
9.2.4 中断条件变量std::condition_variable_any上的等待
代码9.12 为std::condition_variable_any设计的interruptible_wait
class interrupt_flag
{std::atomicbool flag;std::condition_variable* thread_cond;std::condition_variable_any* thread_cond_any;std::mutex set_clear_mutex;public:interrupt_flag(): thread_cond(0),thread_cond_any(0){}void set(){flag.store(true,std::memory_order_relaxed);std::lock_guardstd::mutex lk(set_clear_mutex);if(thread_cond){thread_cond-notify_all();}else if(thread_cond_any){thread_cond_any-notify_all();}}templatetypename Lockablevoid wait(std::condition_variable_any cv,Lockable lk){struct custom_lock{interrupt_flag* self;Lockable lk;custom_lock(interrupt_flag* self_,std::condition_variable_any cond,Lockable lk_):self(self_),lk(lk_){self-set_clear_mutex.lock(); // 1self-thread_cond_anycond; // 2}void unlock() // 3{lk.unlock();self-set_clear_mutex.unlock();}void lock(){std::lock(self-set_clear_mutex,lk); // 4}~custom_lock(){self-thread_cond_any0; // 5self-set_clear_mutex.unlock();}};custom_lock cl(this,cv,lk);interruption_point();cv.wait(cl);interruption_point();}// rest as before
};templatetypename Lockable
void interruptible_wait(std::condition_variable_any cv,Lockable lk)
{this_thread_interrupt_flag.wait(cv,lk);
}
9.2.5 中断其他阻塞型等待
9.2.6 处理中断
9.2.7 在应用程序推出时中断后台任务
试想在桌面上查找一个应用。这就需要与用户互动应用的状态需要能在显示器上显示就能看出应用有什么改变。为了避免影响GUI的响应时间通常会将处理线程放在后台运行。后台进程需要一直执行直到应用退出。后台线程会作为应用启动的一部分被启动并且在应用终止的时候停止运行。通常这样的应用只有在机器关闭时才会退出因为应用需要更新应用最新的状态就需要全时间运行。在某些情况下当应用关闭需要使用有序的方式将后台线程关闭其中一种方式就是中断。
下面代码中为一个系统实现了简单的线程管理部分。
代码9.13 后台监视文件系统
std::mutex config_mutex;
std::vectorinterruptible_thread background_threads;void background_thread(int disk_id)
{while(true){interruption_point(); // 1fs_change fscget_fs_changes(disk_id); // 2if(fsc.has_changes()){update_index(fsc); // 3}}
}void start_background_processing()
{background_threads.push_back(interruptible_thread(background_thread,disk_1));background_threads.push_back(interruptible_thread(background_thread,disk_2));
}int main()
{start_background_processing(); // 4process_gui_until_exit(); // 5std::unique_lockstd::mutex lk(config_mutex);for(unsigned i0;ibackground_threads.size();i){background_threads[i].interrupt(); // 6}for(unsigned i0;ibackground_threads.size();i){background_threads[i].join(); // 7}
}
9.3 小结
本章中了解各种线程管理的高级技术线程池和中断线程。也了解了如何使用本地任务队列使用任务窃取的方式减小同步开销提高线程池的吞吐量等待子任务完成的同时执行队列中其他任务从而来避免死锁。
还有使用线程去中断另一个处理线程的各种方式比如使用特定的断点和函数执行中断要不就是使用某种方法对阻塞等待进行中断。