UtilsLite
Utilities for C++ programming
Loading...
Searching...
No Matches
ThreadPool2.hxx
Go to the documentation of this file.
1/*--------------------------------------------------------------------------*\
2 | |
3 | Copyright (C) 2020 |
4 | |
5 | , __ , __ |
6 | /|/ \ /|/ \ |
7 | | __/ _ ,_ | __/ _ ,_ |
8 | | \|/ / | | | | \|/ / | | | |
9 | |(__/|__/ |_/ \_/|/|(__/|__/ |_/ \_/|/ |
10 | /| /| |
11 | \| \| |
12 | |
13 | Enrico Bertolazzi |
14 | Dipartimento di Ingegneria Industriale |
15 | Università degli Studi di Trento |
16 | email: enrico.bertolazzi@unitn.it |
17 | |
18\*--------------------------------------------------------------------------*/
19
20//
21// file: ThreadPool2.hxx
22//
23
24namespace Utils {
25
30
31 /*\
32 | _____ _ _ ____ _
33 | |_ _| |__ _ __ ___ __ _ __| | _ \ ___ ___ | |
34 | | | | '_ \| '__/ _ \/ _` |/ _` | |_) / _ \ / _ \| |
35 | | | | | | | | | __/ (_| | (_| | __/ (_) | (_) | |
36 | |_| |_| |_|_| \___|\__,_|\__,_|_| \___/ \___/|_|
37 \*/
38
46 class ThreadPool2 : public ThreadPoolBase {
47
48 using TYPE = std::function<void(void)>;
49
50 std::atomic_int m_jobs_left{0};
51 std::atomic_bool m_running{true};
52
53 std::condition_variable m_job_available_var;
54 std::condition_variable m_wait_var;
55 std::mutex m_wait_mutex;
56 std::mutex m_queue_mutex;
57
58 std::list<TYPE> m_queue;
59 std::vector<std::thread> m_threads;
60
65 void
66 Task() {
67 TYPE job;
68 while( m_running ) {
69 {
70 std::unique_lock<std::mutex> lock( m_queue_mutex );
71 while ( m_queue.empty() ) m_job_available_var.wait( lock );
72 job = std::move(m_queue.front());
73 m_queue.pop_front();
74 }
75 job();
76 {
77 std::unique_lock<std::mutex> lock( m_wait_mutex );
78 if ( --m_jobs_left == 0 ) m_wait_var.notify_one();
79 }
80 }
81 }
82
83 void
84 close_all() {
85 wait();
86 // note that we're done, and wake up any thread that's
87 // waiting for a new job
88 m_running = false;
89 for( unsigned i{unsigned(m_threads.size())}; i > 0; --i ) exec( [](){} );
90 for( auto & t : m_threads ) if( t.joinable() ) t.join();
91 }
92
93 public:
94
100 explicit
101 ThreadPool2( unsigned nthread = std::max( unsigned(1), unsigned(std::thread::hardware_concurrency()-1) )) {
102 m_threads.clear();
103 m_threads.reserve(nthread);
104 for( unsigned i{0}; i < nthread; ++i )
105 m_threads.emplace_back( [this]{ this->Task(); } );
106 }
107
111 virtual ~ThreadPool2() { close_all(); }
112
118 void
119 exec( FUN && fun ) override {
120 std::unique_lock<std::mutex> lock( m_queue_mutex );
121 ++m_jobs_left;
122 m_queue.emplace_back( std::move(fun) );
123 m_job_available_var.notify_one();
124 }
125
126 void
127 wait() override {
128 std::unique_lock<std::mutex> lock( m_wait_mutex );
129 while ( m_jobs_left > 0 ) m_wait_var.wait( lock );
130 }
131
137 unsigned thread_count() const override { return unsigned(m_threads.size()); }
138
139 static char const * Name() { return "ThreadPool2"; }
140
141 char const * name() const override { return Name(); }
142
143 };
144
146
147}
148
149//
150// eof: ThreadPool2.hxx
151//
virtual ~ThreadPool2()
Destroys the ThreadPool1 and stops all worker threads.
Definition ThreadPool2.hxx:111
void wait() override
Definition ThreadPool2.hxx:127
static char const * Name()
Returns the name of the thread pool.
Definition ThreadPool2.hxx:139
void exec(FUN &&fun) override
Executes a task in the thread pool.
Definition ThreadPool2.hxx:119
ThreadPool2(unsigned nthread=std::max(unsigned(1), unsigned(std::thread::hardware_concurrency() -1)))
Constructs a new ThreadPool1 with a specified number of threads.
Definition ThreadPool2.hxx:101
unsigned thread_count() const override
Returns the number of threads in the pool.
Definition ThreadPool2.hxx:137
char const * name() const override
Definition ThreadPool2.hxx:141
ThreadPoolBase(ThreadPoolBase const &)=delete
std::function< void(void)> FUN
Definition ThreadPoolBase.hxx:55
Definition SystemUtils.cc:39