UtilsLite
Utilities for C++ programming
Loading...
Searching...
No Matches
ThreadPool5.hxx
Go to the documentation of this file.
1/*--------------------------------------------------------------------------*\
2 | |
3 | Copyright (C) 2020 |
4 | |
5 | , __ , __ |
6 | /|/ \ /|/ \ |
7 | | __/ _ ,_ | __/ _ ,_ |
8 | | \|/ / | | | | \|/ / | | | |
9 | |(__/|__/ |_/ \_/|/|(__/|__/ |_/ \_/|/ |
10 | /| /| |
11 | \| \| |
12 | |
13 | Enrico Bertolazzi |
14 | Dipartimento di Ingegneria Industriale |
15 | Università degli Studi di Trento |
16 | email: enrico.bertolazzi@unitn.it |
17 | |
18\*--------------------------------------------------------------------------*/
19
20//
21// file: ThreadPool5.hxx
22//
23
24namespace Utils {
25
30
31 /*\
32 | _____ _ _ ____ _
33 | |_ _| |__ _ __ ___ __ _ __| | _ \ ___ ___ | |
34 | | | | '_ \| '__/ _ \/ _` |/ _` | |_) / _ \ / _ \| |
35 | | | | | | | | | __/ (_| | (_| | __/ (_) | (_) | |
36 | |_| |_| |_|_| \___|\__,_|\__,_|_| \___/ \___/|_|
37 \*/
49 class ThreadPool5 : public ThreadPoolBase {
50
51 using FUN = ThreadPoolBase::FUN;
52 //using PFUN = ThreadPoolBase::PFUN;
53
54 /*\
55 | __ __ _
56 | \ \ / /__ _ __| | _____ _ __
57 | \ \ /\ / / _ \| '__| |/ / _ \ '__|
58 | \ V V / (_) | | | < __/ |
59 | \_/\_/ \___/|_| |_|\_\___|_|
60 \*/
68 class Worker {
69
70
71 FUN m_push_worker;
72 bool m_active{true};
73 UTILS_SEMAPHORE m_is_running;
74 std::thread m_running_thread;
75 FUN m_job;
76
80 void
81 worker_loop() {
82 m_is_running.red(); // block computation
83 while ( m_active ) {
84 m_is_running.wait(); // wait signal to start computation
85 // ----------------------------------------
86 m_job();
87 // ----------------------------------------
88 m_is_running.red(); // block computation
89 m_push_worker(); // worker ready for a new computation
90 }
91 }
92
93 public:
94
95 Worker() { m_is_running.red(); }
96
103 explicit
104 Worker( ThreadPool5 * tp, unsigned id ) {
105 m_is_running.red();
106 m_push_worker = [tp,id]()->void { tp->push_worker( id ); };
107 m_running_thread = std::thread( &Worker::worker_loop, this );
108 }
109
115 ~Worker() {
116 wait(); // if running task wait it terminate
117 m_active = false; // deactivate computation
118 m_job = [](){}; // dummy task
119 m_is_running.green(); // start computation (exiting loop)
120 if ( m_running_thread.joinable() ) m_running_thread.join(); // wait thread for exiting
121 }
122
126 void wait() { m_is_running.wait_red(); }
127
133 void
134 exec( FUN && fun ) {
135 m_is_running.wait_red();
136 m_job = std::move(fun); // cambia funzione da eseguire
137 m_is_running.green(); // activate computation
138 }
139
140 };
141
142 // =========================================================================
143 // =========================================================================
144 // =========================================================================
145
146 // Vector of workers managed by the thread pool.
147 std::vector<Worker> m_workers;
148 std::list<unsigned> m_queue;
149 std::mutex m_queue_mutex;
150 std::condition_variable m_queue_cond;
151
157 void
158 push_worker( unsigned id ) {
159 std::unique_lock<std::mutex> lock(m_queue_mutex);
160 m_queue.emplace_back(id);
161 m_queue_cond.notify_one();
162 }
163
169 unsigned
170 pop_worker() {
171 std::unique_lock<std::mutex> lock(m_queue_mutex);
172 while ( m_queue.empty() ) m_queue_cond.wait( lock );
173 unsigned id{ m_queue.back() }; m_queue.pop_back();
174 return id;
175 }
176
177 public:
178
185 unsigned nthread = std::max(
186 unsigned(1),
187 unsigned(std::thread::hardware_concurrency()-1)
188 )
189 )
191 , m_workers( size_t(nthread) )
192 {
193 m_queue.clear();
194 unsigned id{0};
195 for ( Worker & w : m_workers ) { new (&w) Worker( this, id ); ++id; }
196 while ( id-- > 0 ) push_worker( id );
197 }
198
204 virtual
206 wait();
207 m_workers.clear();
208 }
209
215 void
216 exec( FUN && fun ) override {
217 // cerca prima thread libera
218 m_workers[pop_worker()].exec( std::move(fun) );
219 }
220
224 void
225 wait() override
226 { for ( auto & w : m_workers ) w.wait(); }
227
233 unsigned
234 thread_count() const override
235 { return unsigned(m_workers.size()); }
236
242 static char const * Name() { return "ThreadPool5"; }
243
244 char const * name() const override { return Name(); }
245
246 };
247
249
250}
251
252//
253// eof: ThreadPool5.hxx
254//
void exec(FUN &&fun) override
Executes a task and assigns it to an available worker.
Definition ThreadPool5.hxx:216
void wait() override
Waits for all tasks to be completed.
Definition ThreadPool5.hxx:225
virtual ~ThreadPool5()
Destructor for the ThreadPool5 class.
Definition ThreadPool5.hxx:205
unsigned thread_count() const override
Gets the current number of threads in the pool.
Definition ThreadPool5.hxx:234
char const * name() const override
Definition ThreadPool5.hxx:244
static char const * Name()
Gets the name of the thread pool implementation.
Definition ThreadPool5.hxx:242
ThreadPool5(unsigned nthread=std::max(unsigned(1), unsigned(std::thread::hardware_concurrency() -1)))
Constructs a new ThreadPool5 instance with a specified number of threads.
Definition ThreadPool5.hxx:184
ThreadPoolBase(ThreadPoolBase const &)=delete
std::function< void(void)> FUN
Definition ThreadPoolBase.hxx:55
#define UTILS_SEMAPHORE
Definition ThreadUtils.hxx:42
Definition ThreadPoolBase.hxx:87
Definition SystemUtils.cc:39