UtilsLite
Utilities for C++ programming
Loading...
Searching...
No Matches
ThreadUtils.hxx
Go to the documentation of this file.
1/*--------------------------------------------------------------------------*\
2 | |
3 | Copyright (C) 2020 |
4 | |
5 | , __ , __ |
6 | /|/ \ /|/ \ |
7 | | __/ _ ,_ | __/ _ ,_ |
8 | | \|/ / | | | | \|/ / | | | |
9 | |(__/|__/ |_/ \_/|/|(__/|__/ |_/ \_/|/ |
10 | /| /| |
11 | \| \| |
12 | |
13 | Enrico Bertolazzi |
14 | Dipartimento di Ingegneria Industriale |
15 | Università degli Studi di Trento |
16 | email: enrico.bertolazzi@unitn.it |
17 | |
18\*--------------------------------------------------------------------------*/
19
20//
21// file: ThreadUtils.hxx
22//
23
24#include <functional>
25#include <iostream>
26#include <type_traits>
27#include <utility>
28
29namespace Utils {
30
35
36 #ifdef UTILS_OS_WINDOWS
37 #define UTILS_SEMAPHORE Utils::WinSemaphore
38 #define UTILS_MUTEX Utils::WinCriticalSection
39 #define UTILS_SPINLOCK Utils::WinCriticalSection
40 #define UTILS_BARRIER Utils::WinBarrier
41 #else
42 #define UTILS_SEMAPHORE Utils::SimpleSemaphore
43 #define UTILS_MUTEX std::mutex
44 #define UTILS_SPINLOCK Utils::SpinLock
45 #define UTILS_BARRIER Utils::Barrier
46 #endif
47
48 #ifdef UTILS_OS_WINDOWS
49
50 class WinMutex {
51 HANDLE m_mutex;
52
53 public:
54
55 WinMutex();
56 ~WinMutex() { CloseHandle(m_mutex); }
57
58 void lock();
59 void unlock();
60
61 };
62
63 class WinCriticalSection {
64 CRITICAL_SECTION m_critical;
65 public:
66 WinCriticalSection()
67 { InitializeCriticalSection(&m_critical); }
68 ~WinCriticalSection() { DeleteCriticalSection(&m_critical); }
69 void lock() { EnterCriticalSection(&m_critical); }
70 void unlock() { LeaveCriticalSection(&m_critical); }
71 bool try_lock() { return TryEnterCriticalSection(&m_critical) ? true : false; }
72 void wait() { lock(); unlock(); }
73 CRITICAL_SECTION const & data() const { return m_critical; }
74 CRITICAL_SECTION & data() { return m_critical; }
75 };
76
77 class WinSemaphore {
78 bool m_is_red;
79 WinCriticalSection m_critical;
80 CONDITION_VARIABLE m_condition;
81 unsigned m_waiting_green;
82 unsigned m_waiting_red;
83
84 //void notify_one() noexcept { ReleaseSemaphore( m_semaphore, 1, NULL ); }
85 void notify_one() noexcept { WakeConditionVariable( &m_condition ); }
86 void notify_all() noexcept { WakeAllConditionVariable( &m_condition ); }
87 void wait_cond() noexcept { SleepConditionVariableCS( &m_condition, &m_critical.data(), INFINITE ); }
88
89 public:
90
91 WinSemaphore()
92 : m_is_red(false)
93 , m_waiting_green(0)
94 , m_waiting_red(0)
95 { InitializeConditionVariable( &m_condition ); }
96
97 ~WinSemaphore() {}
98
102 void
103 green() noexcept {
104 m_critical.lock();
105 m_is_red = false;
106 m_critical.unlock();
107 if ( m_waiting_green > 1 ) notify_all();
108 else if ( m_waiting_green > 0 ) notify_one();
109 }
110
114 void
115 red() noexcept {
116 m_critical.lock();
117 m_is_red = true;
118 m_critical.unlock();
119 if ( m_waiting_red > 1 ) notify_all();
120 else if ( m_waiting_red > 0 ) notify_one();
121 }
122
126 void
127 wait() {
128 m_critical.lock();
129 ++m_waiting_green;
130 while ( m_is_red ) wait_cond();
131 --m_waiting_green;
132 m_critical.unlock();
133 }
134
138 void
139 wait_red() {
140 m_critical.lock();
141 ++m_waiting_red;
142 while ( !m_is_red ) wait_cond();
143 --m_waiting_red;
144 m_critical.unlock();
145 }
146
147 };
148
149 class WinBarrier {
150 int m_to_be_done;
151 WinCriticalSection m_critical;
152 CONDITION_VARIABLE m_condition;
153
154 //void notify_one() noexcept { WakeConditionVariable( &m_condition ); }
155 void notify_all() noexcept { WakeAllConditionVariable( &m_condition ); }
156 void wait_cond() noexcept { SleepConditionVariableCS( &m_condition, &m_critical.data(), INFINITE ); }
157
158 public:
159 WinBarrier() : m_to_be_done(0) {}
160
161 void
162 setup( int nthreads )
163 { m_to_be_done = nthreads ; }
164
165 void
166 count_down() {
167 m_critical.lock();
168 if ( --m_to_be_done <= 0 ) notify_all() ; // wake up all tread
169 m_critical.unlock();
170 }
171
172 void
173 wait() {
174 m_critical.lock();
175 while( m_to_be_done > 0 ) wait_cond();
176 m_critical.unlock();
177 }
178
179 void
180 count_down_and_wait() {
181 m_critical.lock();
182 if ( --m_to_be_done <= 0 ) {
183 notify_all() ; // wake up all tread
184 } else {
185 while( m_to_be_done > 0 ) wait_cond();
186 }
187 m_critical.unlock();
188 }
189 };
190
191 #endif
192
193 /*\
194 | ___ _ _ _
195 | / __|_ __(_)_ _ | | ___ __| |__
196 | \__ \ '_ \ | ' \| |__/ _ \/ _| / /
197 | |___/ .__/_|_||_|____\___/\__|_\_\
198 | |_|
199 \*/
200
201 class SpinLock {
202 private:
203 std::atomic<bool> m_lock;
204 public:
205 SpinLock() : m_lock(false) { }
206 SpinLock( SpinLock const & ) = delete;
207 ~SpinLock() = default;
208
209 void
211 while( m_lock.load(std::memory_order_acquire) )
212 std::this_thread::yield();
213 }
214
215 void
217 while( m_lock.exchange(true, std::memory_order_acquire) )
218 std::this_thread::yield();
219 }
220
221 bool
223 return !m_lock.exchange(true, std::memory_order_acquire);
224 }
225
226 void
228 m_lock.store(false, std::memory_order_release);
229 }
230 };
231
232 /*\
233 | ___ _ _ _ _ _
234 | / __|_ __(_)_ _ | | ___ __| |__ | |__ __ _ _ _ _ _(_)___ _ _
235 | \__ \ '_ \ | ' \| |__/ _ \/ _| / / | '_ \/ _` | '_| '_| / -_) '_|
236 | |___/ .__/_|_||_|____\___/\__|_\_\_|_.__/\__,_|_| |_| |_\___|_|
237 | |_| |___|
238 \*/
239
241 private:
242 std::atomic<unsigned> m_count;
243 std::atomic<unsigned> m_generation;
244 unsigned m_count_reset_value;
245 public:
248
249 explicit
251 : m_count(0)
252 , m_generation(0)
253 , m_count_reset_value(0)
254 {}
255
256 void
257 setup( unsigned count ) {
258 m_count_reset_value = m_count = count ;
259 }
260
261 void
263 unsigned gen = m_generation.load();
264 if ( --m_count == 0 ) {
265 if ( m_generation.compare_exchange_weak(gen, gen + 1) )
266 m_count = m_count_reset_value;
267 return;
268 }
269 }
270
271 void
273 unsigned gen = m_generation.load();
274 while ((gen == m_generation) && (m_count != 0))
275 std::this_thread::yield();
276 }
277
278 void
280 unsigned gen = m_generation.load();
281 if ( --m_count == 0 ) {
282 if ( m_generation.compare_exchange_weak(gen, gen + 1) )
283 m_count = m_count_reset_value;
284 return;
285 }
286 while ((gen == m_generation) && (m_count != 0))
287 std::this_thread::yield();
288 }
289 };
290
291 /*\
292 | ___ _
293 | | _ ) __ _ _ _ _ _(_)___ _ _
294 | | _ \/ _` | '_| '_| / -_) '_|
295 | |___/\__,_|_| |_| |_\___|_|
296 \*/
297
298 class Barrier {
299 int m_to_be_done;
300 std::mutex m_mtx;
301 std::condition_variable m_cond;
302 public:
303 Barrier() : m_to_be_done(0) {}
304
305 void
306 setup( int nthreads )
307 { m_to_be_done = nthreads ; }
308
309 void
311 std::unique_lock<std::mutex> lck(m_mtx);
312 if ( --m_to_be_done <= 0 ) m_cond.notify_all() ; // wake up all tread
313 }
314
315 void
317 std::unique_lock<std::mutex> lck(m_mtx);
318 m_cond.wait( lck, [&]()->bool { return m_to_be_done <= 0; } );
319 }
320
321 void
323 std::unique_lock<std::mutex> lck(m_mtx);
324 if ( --m_to_be_done <= 0 ) {
325 m_cond.notify_all() ; // wake up all tread
326 } else {
327 m_cond.wait( lck, [&]()->bool { return m_to_be_done <= 0; } );
328 }
329 }
330 };
331
332 /*\
333 | ___ _ _ ___ _
334 | / __(_)_ __ _ __| |___/ __| ___ _ __ __ _ _ __| |_ ___ _ _ ___
335 | \__ \ | ' \| '_ \ / -_)__ \/ -_) ' \/ _` | '_ \ ' \/ _ \ '_/ -_)
336 | |___/_|_|_|_| .__/_\___|___/\___|_|_|_\__,_| .__/_||_\___/_| \___|
337 | |_| |_|
338 \*/
339
341 private:
342 bool m_is_red;
343 unsigned m_waiting_green;
344 unsigned m_waiting_red;
345 std::mutex m_mutex;
346 std::condition_variable_any m_cv_red;
347 std::condition_variable_any m_cv_green;
348 public:
350 : m_is_red(false)
351 , m_waiting_green(0)
352 , m_waiting_red(0)
353 { }
354
358 void
359 green() noexcept {
360 std::unique_lock<std::mutex> lock( m_mutex );
361 m_is_red = false;
362 if ( m_waiting_green > 1 ) m_cv_green.notify_all();
363 else if ( m_waiting_green > 0 ) m_cv_green.notify_one();
364 }
365
369 void
370 red() noexcept {
371 std::unique_lock<std::mutex> lock( m_mutex );
372 m_is_red = true;
373 if ( m_waiting_red > 1 ) m_cv_red.notify_all();
374 else if ( m_waiting_red > 0 ) m_cv_red.notify_one();
375 }
376
380 void
381 wait() noexcept {
382 std::unique_lock<std::mutex> lock( m_mutex );
383 ++m_waiting_green;
384 while ( m_is_red ) m_cv_green.wait( m_mutex );
385 --m_waiting_green;
386 }
387
391 void
392 wait_red() noexcept {
393 std::unique_lock<std::mutex> lock( m_mutex );
394 ++m_waiting_red;
395 while ( !m_is_red ) m_cv_red.wait( m_mutex );
396 --m_waiting_red;
397 }
398
399 };
400
401 /*\
402 | __ __ _ _
403 | \ \ / /__ _ __| | _____ _ __| | ___ ___ _ __
404 | \ \ /\ / / _ \| '__| |/ / _ \ '__| | / _ \ / _ \| '_ \
405 | \ V V / (_) | | | < __/ | | |__| (_) | (_) | |_) |
406 | \_/\_/ \___/|_| |_|\_\___|_| |_____\___/ \___/| .__/
407 | |_|
408 \*/
410
411 bool m_active;
412 bool m_running;
413 bool m_do_job;
414 std::thread m_running_thread;
415 std::function<void()> m_job;
416
417 std::mutex m_mutex;
418 std::condition_variable m_cv;
419
420 void worker_loop();
421
422 public:
423
424 WorkerLoop( WorkerLoop && ) = delete;
425 WorkerLoop( WorkerLoop const & ) = delete;
426 WorkerLoop& operator = ( WorkerLoop const & ) = delete;
428
431
432 void exec( std::function<void()> & fun );
433 void exec();
434 void wait();
435
436 template <typename Func, typename... Args>
437 void
438 run( Func && func, Args && ... args ) {
439 std::function<void()> f = std::bind( func, std::forward<Args>(args)... );
440 this->exec( f );
441 }
442
443 std::thread::id get_id() const { return m_running_thread.get_id(); }
444 std::thread const & get_thread() const { return m_running_thread; }
445 std::thread & get_thread() { return m_running_thread; }
446 };
447
448
449 /*\
450 | __ __ _ _ __ __ _
451 | \ \ / /_ _(_) |\ \ / /__ _ _| |_____ _ _
452 | \ \/\/ / _` | | _\ \/\/ / _ \ '_| / / -_) '_|
453 | \_/\_/\__,_|_|\__|\_/\_/\___/_| |_\_\___|_|
454 \*/
455
457 private:
458 #ifdef UTILS_OS_WINDOWS
459 std::atomic<int> n_worker;
460 #else
461 std::atomic<int> n_worker{0};
462 #endif
463 public:
464 #ifdef UTILS_OS_WINDOWS
465 WaitWorker() { n_worker = 0; }
466 #else
467 WaitWorker() = default;
468 #endif
469
470 void
472 { while (n_worker.load(std::memory_order_relaxed) != 0 ){} }
473
474 void enter() { ++n_worker; }
475 void leave() { --n_worker; }
476 };
477
478 /*\
479 | ___ _ ___ _
480 | | _ |_)_ _ __ _ _ _ _ _/ __| ___ __ _ _ _ __| |_
481 | | _ \ | ' \/ _` | '_| || \__ \/ -_) _` | '_/ _| ' \
482 | |___/_|_||_\__,_|_| \_, |___/\___\__,_|_| \__|_||_|
483 | |__/
484 \*/
485
486 template <typename DATA>
488 private:
489 using DATA_TYPE = std::pair<std::thread::id,DATA*>;
490 mutable std::vector<DATA_TYPE> m_data;
491 mutable UTILS_SPINLOCK m_spin_write;
492 mutable WaitWorker m_worker_read;
493
494 public:
495
497 m_data.clear();
498 m_data.reserve(64);
499 }
500
502 m_spin_write.wait();
503 for ( auto const & a : m_data ) delete a.second;
504 m_data.clear();
505 m_spin_write.wait();
506 }
507
508 void
510 m_spin_write.wait();
511 for ( auto const & a : m_data ) delete a.second;
512 m_data.clear(); m_data.reserve(64);
513 m_spin_write.wait();
514 }
515
516 DATA *
517 search( std::thread::id const & id, bool & ok ) const {
518 m_spin_write.wait(); // wait writing finished
519 m_worker_read.enter();
520 ok = true;
521
522 size_t U = m_data.size();
523
524 if ( U == 0 ) {
525 m_worker_read.leave();
526 m_spin_write.lock();
527 m_worker_read.wait(); // wait all read finished
528 ok = false;
529 U = m_data.size(); // MAI USATO
530 m_data.resize(1);
531 DATA_TYPE & dL = m_data[0];
532 dL.first = id;
533 DATA * res = dL.second = new DATA();
534 m_spin_write.unlock();
535 return res;
536 }
537
538 size_t L = 0;
539 while ( U-L > 1 ) {
540 size_t pos = (L+U)>>1;
541 std::thread::id const & id_pos = m_data[pos].first;
542 if ( id_pos < id ) L = pos; else U = pos;
543 }
544 DATA_TYPE & dL = m_data[L];
545 if ( dL.first == id ) { m_worker_read.leave(); return dL.second; }
546 DATA_TYPE & dU = m_data[U];
547 if ( dU.first == id ) { m_worker_read.leave(); return dU.second; }
548 m_worker_read.leave();
549
550 // not found must insert
551 m_spin_write.lock();
552 m_worker_read.wait(); // wait all read finished
553 ok = false;
554 if ( dL.first < id ) ++L;
555 U = m_data.size();
556 m_data.resize(U+1);
557 while ( U > L ) {
558 --U;
559 m_data[U+1].first = m_data[U].first;
560 m_data[U+1].second = m_data[U].second;
561 }
562 DATA_TYPE & dL1 = m_data[L];
563 dL1.first = id;
564 DATA * res = dL1.second = new DATA();
565 m_spin_write.unlock();
566 return res;
567 }
568
569 };
570
571 /*\
572 | _ _ _
573 | __ _| |_ ___ ___ ___ _ __ ___ _____ _(_) |_
574 | / _` | __| / __|/ __/ _ \| '_ \ / _ \ / _ \ \/ / | __|
575 | | (_| | |_ \__ \ (_| (_) | |_) | __/ | __/> <| | |_
576 | \__,_|\__|___|___/\___\___/| .__/ \___|___\___/_/\_\_|\__|
577 | |_____| |_| |_____|
578 \*/
579
583 template <class Destructor>
585 Destructor m_destructor;
586 bool m_active;
587
588 public:
589
592
593 at_scope_exit_impl() : m_active(false) { }
594
595 explicit
596 at_scope_exit_impl( Destructor&& destructor )
597 : m_destructor(std::forward<Destructor>(destructor))
598 , m_active(true)
599 { }
600
601 explicit
602 at_scope_exit_impl( Destructor const & destructor )
603 : m_destructor(destructor)
604 , m_active(true)
605 { }
606
608 : m_destructor(std::move(x.m_destructor))
609 , m_active(x.m_active)
610 { x.m_active = false; }
611
614 m_destructor = std::move(x.m_destructor);
615 m_active = x.m_active;
616 x.m_active = false;
617 return *this;
618 }
619
620 ~at_scope_exit_impl() { if (m_active) m_destructor(); }
621 };
622
640 template<class Function>
642 { return at_scope_exit_impl<Function>(std::forward<Function>(fun)); }
643
644 template<class Function>
647
648 #if 0
649
654
655 // from https://coliru.stacked-crooked.com/a/933248d6a9f07094
656 template<typename T>
657 class unique_function : public std::function<T> {
658 template<typename Fn, typename En = void> struct wrapper;
659
660 // specialization for CopyConstructible Fn
661 template<typename Fn>
662 struct wrapper<Fn, std::enable_if_t< std::is_copy_constructible<Fn>::value >>
663 {
664 Fn fn;
665 template<typename... Args>
666 auto operator()(Args&&... args) { return fn(std::forward<Args>(args)...); }
667 };
668
669 // specialization for MoveConstructible-only Fn
670 template<typename Fn>
671 struct wrapper<Fn, std::enable_if_t< !std::is_copy_constructible<Fn>::value && std::is_move_constructible<Fn>::value >>
672 {
673 Fn fn;
674
675 wrapper(Fn&& fn) : fn(std::forward<Fn>(fn)) { }
676
677 wrapper(wrapper&&) = default;
678 wrapper& operator=(wrapper&&) = default;
679
680 // these two functions are instantiated by std::function
681 // and are never called
682 wrapper(const wrapper& rhs) : fn(const_cast<Fn&&>(rhs.fn)) { throw 0; } // hack to initialize fn for non-DefaultContructible types
683
684 wrapper& operator=(wrapper&) { throw 0; return *this; }
685
686 template<typename... Args> auto operator()(Args&&... args) { return fn(std::forward<Args>(args)...); }
687 };
688
689 using base = std::function<T>;
690
691 public:
692 unique_function() noexcept = default;
693 unique_function(std::nullptr_t) noexcept : base(nullptr) { }
694
695 template<typename Fn>
696 unique_function(Fn&& f) : base(wrapper<Fn>{ std::forward<Fn>(f) }) { }
697
698 unique_function(unique_function&&) = default;
699 unique_function& operator=(unique_function&&) = default;
700
701 unique_function& operator=(std::nullptr_t) { base::operator=(nullptr); return *this; }
702
703 template<typename Fn>
704 unique_function& operator=(Fn&& f)
705 { base::operator=(wrapper<Fn>{ std::forward<Fn>(f) }); return *this; }
706
707 using base::operator();
708 };
709
710 #endif
711
712 /*
713 using std::cout; using std::endl;
714
715 struct move_only {
716 move_only(std::size_t) { }
717
718 move_only(move_only&&) = default;
719 move_only& operator=(move_only&&) = default;
720
721 move_only(move_only const&) = delete;
722 move_only& operator=(move_only const&) = delete;
723
724 void operator()() { cout << "move_only" << endl; }
725 };
726
727 int main()
728 {
729 using fn = unique_function<void()>;
730
731 fn f0;
732 fn f1 { nullptr };
733 fn f2 { [](){ cout << "f2" << endl; } }; f2();
734 fn f3 { move_only(42) }; f3();
735 fn f4 { std::move(f2) }; f4();
736
737 f0 = std::move(f3); f0();
738 f0 = nullptr;
739 f2 = [](){ cout << "new f2" << endl; }; f2();
740 f3 = move_only(69); f3();
741
742 return 0;
743 }
744 */
745
747
748}
749
750//
751// eof: ThreadUtils.hxx
752//
void setup(int nthreads)
Definition ThreadUtils.hxx:306
Barrier()
Definition ThreadUtils.hxx:303
void count_down_and_wait()
Definition ThreadUtils.hxx:322
void wait()
Definition ThreadUtils.hxx:316
void count_down()
Definition ThreadUtils.hxx:310
void clear()
Definition ThreadUtils.hxx:509
DATA * search(std::thread::id const &id, bool &ok) const
Definition ThreadUtils.hxx:517
~BinarySearch()
Definition ThreadUtils.hxx:501
BinarySearch()
Definition ThreadUtils.hxx:496
void wait_red() noexcept
Definition ThreadUtils.hxx:392
void red() noexcept
Definition ThreadUtils.hxx:370
void green() noexcept
Definition ThreadUtils.hxx:359
SimpleSemaphore() noexcept
Definition ThreadUtils.hxx:349
void wait() noexcept
Definition ThreadUtils.hxx:381
SpinLock_barrier()
Definition ThreadUtils.hxx:250
void count_down_and_wait()
Definition ThreadUtils.hxx:279
void wait()
Definition ThreadUtils.hxx:272
SpinLock_barrier & operator=(SpinLock_barrier const &)=delete
SpinLock_barrier(SpinLock_barrier const &)=delete
void setup(unsigned count)
Definition ThreadUtils.hxx:257
void count_down()
Definition ThreadUtils.hxx:262
~SpinLock()=default
void lock()
Definition ThreadUtils.hxx:216
SpinLock(SpinLock const &)=delete
void unlock()
Definition ThreadUtils.hxx:227
bool try_lock()
Definition ThreadUtils.hxx:222
void wait()
Definition ThreadUtils.hxx:210
SpinLock()
Definition ThreadUtils.hxx:205
Definition ThreadUtils.hxx:456
void enter()
Definition ThreadUtils.hxx:474
WaitWorker()=default
void leave()
Definition ThreadUtils.hxx:475
void wait()
Definition ThreadUtils.hxx:471
WorkerLoop(WorkerLoop &&)=delete
void exec(std::function< void()> &fun)
std::thread & get_thread()
Definition ThreadUtils.hxx:445
std::thread::id get_id() const
Definition ThreadUtils.hxx:443
void run(Func &&func, Args &&... args)
Definition ThreadUtils.hxx:438
WorkerLoop(WorkerLoop const &)=delete
WorkerLoop & operator=(WorkerLoop const &)=delete
std::thread const & get_thread() const
Definition ThreadUtils.hxx:444
Definition ThreadUtils.hxx:584
at_scope_exit_impl(at_scope_exit_impl &&x) noexcept
Definition ThreadUtils.hxx:607
at_scope_exit_impl(Destructor &&destructor)
Definition ThreadUtils.hxx:596
at_scope_exit_impl(at_scope_exit_impl const &)=delete
at_scope_exit_impl(Destructor const &destructor)
Definition ThreadUtils.hxx:602
at_scope_exit_impl & operator=(at_scope_exit_impl &&x)
Definition ThreadUtils.hxx:613
~at_scope_exit_impl()
Definition ThreadUtils.hxx:620
at_scope_exit_impl()
Definition ThreadUtils.hxx:593
at_scope_exit_impl & operator=(at_scope_exit_impl const &)=delete
auto at_scope_exit(Function &&fun) -> at_scope_exit_impl< Function >
Definition ThreadUtils.hxx:641
#define UTILS_SPINLOCK
Definition ThreadUtils.hxx:44
Definition SystemUtils.cc:39
@ green
Definition rang.hxx:41
@ red
Definition rang.hxx:40