stlab 2.3.0
Modern, modular C++ algorithms, data structures, and concurrency primitives
Loading...
Searching...
No Matches
default_executor.hpp
Go to the documentation of this file.
1/*
2 Copyright 2015 Adobe
3 Distributed under the Boost Software License, Version 1.0.
4 (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5*/
6
7/**************************************************************************************************/
8
9#ifndef STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP
10#define STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP
11
27
28#include <stlab/config.hpp>
29
30#include <cassert>
31#include <cstdint>
32#include <type_traits>
33#include <utility>
34
35#if STLAB_TASK_SYSTEM(LIBDISPATCH)
36#include <dispatch/dispatch.h>
37#elif STLAB_TASK_SYSTEM(WINDOWS)
38#include <stlab/pre_exit.hpp>
39
40#include <Windows.h>
41#include <memory>
42#elif STLAB_TASK_SYSTEM(PORTABLE)
45
46#include <algorithm>
47#include <atomic>
48#include <climits>
49#include <condition_variable>
50#include <thread>
51#include <vector>
52#endif
53
54/**************************************************************************************************/
55
56namespace stlab {
57STLAB_VERSION_NAMESPACE_BEGIN()
58
59
64
65/**************************************************************************************************/
66
67namespace detail {
68
69/**************************************************************************************************/
70
71enum class executor_priority : std::uint8_t { high, medium, low };
72
73/**************************************************************************************************/
74
75#if STLAB_TASK_SYSTEM(LIBDISPATCH)
76
77constexpr auto platform_priority(executor_priority p) {
78 switch (p) {
79 case executor_priority::high:
80 return DISPATCH_QUEUE_PRIORITY_HIGH;
81 case executor_priority::medium:
82 return DISPATCH_QUEUE_PRIORITY_DEFAULT;
83 case executor_priority::low:
84 return DISPATCH_QUEUE_PRIORITY_LOW;
85 default:
86 assert(false && "Unknown value!");
87 }
88 return DISPATCH_QUEUE_PRIORITY_DEFAULT;
89}
90
91struct group_t {
92 dispatch_group_t _group = dispatch_group_create();
93 group_t() = default;
94 group_t(const group_t&) = delete;
95 group_t(group_t&& a) noexcept : _group(std::exchange(a._group, nullptr)) {}
96 auto operator=(const group_t&) -> group_t& = delete;
97 auto operator=(group_t&& a) noexcept -> group_t& {
98 _group = std::exchange(a._group, nullptr);
99 return *this;
100 }
101
102 ~group_t();
103};
104
105auto group() -> const group_t&;
106
107template <executor_priority P = executor_priority::medium>
108struct executor_type {
109 using result_type = void;
110
111 template <typename F>
112 auto operator()(F&& f) const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
113 using f_t = std::decay_t<F>;
114
115 dispatch_group_async_f(detail::group()._group,
116 dispatch_get_global_queue(platform_priority(P), 0),
117 new f_t(std::forward<F>(f)), [](void* f_) {
118 auto f = static_cast<f_t*>(f_);
119 (*f)();
120 delete f;
121 });
122 }
123};
124
125/**************************************************************************************************/
126
127#elif STLAB_TASK_SYSTEM(WINDOWS)
128
129constexpr auto platform_priority(executor_priority p) {
130 switch (p) {
131 case executor_priority::high:
132 return TP_CALLBACK_PRIORITY_HIGH;
133 case executor_priority::medium:
134 return TP_CALLBACK_PRIORITY_NORMAL;
135 case executor_priority::low:
136 return TP_CALLBACK_PRIORITY_LOW;
137 default:
138 assert(!"Unknown value!");
139 }
140 return TP_CALLBACK_PRIORITY_NORMAL;
141}
142
143template <executor_priority P = executor_priority::medium>
144class task_system {
145 PTP_POOL _pool = nullptr;
146 TP_CALLBACK_ENVIRON _callBackEnvironment;
147 PTP_CLEANUP_GROUP _cleanupgroup = nullptr;
148
149public:
150 task_system() {
151 InitializeThreadpoolEnvironment(&_callBackEnvironment);
152 _pool = CreateThreadpool(nullptr);
153 if (_pool == nullptr) throw std::bad_alloc();
154
155 _cleanupgroup = CreateThreadpoolCleanupGroup();
156 if (_cleanupgroup == nullptr) {
157 CloseThreadpool(_pool);
158 throw std::bad_alloc();
159 }
160
161 SetThreadpoolCallbackPriority(&_callBackEnvironment, platform_priority(P));
162 SetThreadpoolCallbackPool(&_callBackEnvironment, _pool);
163 SetThreadpoolCallbackCleanupGroup(&_callBackEnvironment, _cleanupgroup, nullptr);
164 }
165
166 void join() {
167 CloseThreadpoolCleanupGroupMembers(_cleanupgroup, FALSE, nullptr);
168 CloseThreadpoolCleanupGroup(_cleanupgroup);
169 CloseThreadpool(_pool);
170 _pool = nullptr;
171 }
172
173 ~task_system() {
174 assert((_pool == nullptr) && "stlab: Thread pool not joined prior to destruction.");
175 }
176
177 template <typename F>
178 void operator()(F&& f) {
179 auto p = std::make_unique<F>(std::forward<F>(f));
180 auto work = CreateThreadpoolWork(&callback_impl<F>, p.get(), &_callBackEnvironment);
181
182 if (work == nullptr) {
183 throw std::bad_alloc();
184 }
185 p.release(); // ownership was passed to thread
186 SubmitThreadpoolWork(work);
187 }
188
189private:
190 template <typename F>
191 static void CALLBACK callback_impl(PTP_CALLBACK_INSTANCE /*instance*/,
192 PVOID parameter,
193 PTP_WORK work) {
194 std::unique_ptr<F> f(static_cast<F*>(parameter));
195 (*f)();
196 CloseThreadpoolWork(work);
197 }
198};
199
200/**************************************************************************************************/
201
202#elif STLAB_TASK_SYSTEM(PORTABLE)
203
204class waiter {
205 std::mutex _mutex;
206 using lock_t = std::unique_lock<std::mutex>;
207 std::condition_variable _ready;
208
209 bool _waiting{false};
210 bool _done{false};
211
212public:
213 void done() {
214 {
215 lock_t lock{_mutex};
216 _done = true;
217 }
218 _ready.notify_one();
219 }
220
221 // If wait() is waiting, wake and return true, otherwise return false
222 bool wake() {
223 {
224 lock_t lock{_mutex, std::try_to_lock};
225 if (!lock || !_waiting) return false;
226 _waiting = false;
227 }
228 _ready.notify_one();
229 return true;
230 }
231
232 // Will wait until `wake()` or `done()` returns true if done
233 bool wait() {
234 lock_t lock{_mutex};
235 _waiting = true;
236 while (_waiting && !_done)
237 _ready.wait(lock);
238 _waiting = false;
239 return _done;
240 }
241};
242
243class notification_queue {
244 struct element_t {
245 std::size_t _priority;
246 task<void() noexcept> _task;
247
248 template <class F>
249 element_t(F&& f, std::size_t priority) : _priority{priority}, _task{std::forward<F>(f)} {}
250
251 struct greater {
252 bool operator()(const element_t& a, const element_t& b) const {
253 return b._priority < a._priority;
254 }
255 };
256 };
257
258 std::mutex _mutex;
259 using lock_t = std::unique_lock<std::mutex>;
260 std::condition_variable _ready;
261 std::vector<element_t> _q; // can't use priority queue because top() is const
262 std::size_t _count{0};
263 bool _done{false};
264 bool _waiting{false};
265
266 static constexpr std::size_t merge_priority_count(std::size_t priority, std::size_t count) {
267 assert((priority < 4) && "Priority must be in the range [0, 4).");
268 return (priority << (sizeof(std::size_t) * CHAR_BIT - 2)) | count;
269 }
270
271 // Must be called under a lock with a non-empty _q, always returns a valid task
272 auto pop_not_empty() -> task<void() noexcept> {
273 auto result = std::move(_q.front()._task);
274 std::pop_heap(begin(_q), end(_q), element_t::greater());
275 _q.pop_back();
276 return result;
277 }
278
279public:
280 auto try_pop() -> task<void() noexcept> {
281 lock_t lock{_mutex, std::try_to_lock};
282 if (!lock || _q.empty()) return nullptr;
283 return pop_not_empty();
284 }
285
286 // If waiting in `pop()`, wakes and returns true. Otherwise returns false.
287 bool wake() {
288 {
289 lock_t lock{_mutex, std::try_to_lock};
290 if (!lock || !_waiting) return false;
291 _waiting = false; // triggers wake
292 }
293 _ready.notify_one();
294 return true;
295 }
296
297 auto pop() -> std::pair<bool, task<void() noexcept>> {
298 lock_t lock{_mutex};
299 _waiting = true;
300 while (_q.empty() && !_done && _waiting)
301 _ready.wait(lock);
302 _waiting = false;
303 if (_q.empty()) return {_done, nullptr};
304 return {false, pop_not_empty()};
305 }
306
307 void done() {
308 {
309 lock_t lock{_mutex};
310 _done = true;
311 }
312 _ready.notify_one();
313 }
314
315 template <typename F>
316 bool try_push(F&& f, std::size_t priority) {
317 {
318 lock_t lock{_mutex, std::try_to_lock};
319 if (!lock) return false;
320 _q.emplace_back(std::forward<F>(f), merge_priority_count(priority, _count++));
321 std::push_heap(begin(_q), end(_q), element_t::greater());
322 }
323 _ready.notify_one();
324 return true;
325 }
326
327 template <typename F>
328 void push(F&& f, std::size_t priority) {
329 {
330 lock_t lock{_mutex};
331 _q.emplace_back(std::forward<F>(f), merge_priority_count(priority, _count++));
332 std::push_heap(begin(_q), end(_q), element_t::greater());
333 }
334 _ready.notify_one();
335 }
336};
337
338/**************************************************************************************************/
339
341
342class priority_task_system {
343 // Returns the number of hardware threads, or 1 if not available.
344 static unsigned hardware_concurrency() {
345#if STLAB_TASK_POOL_MAXIMUM() > 0
346 return std::clamp(STLAB_TASK_POOL_MAXIMUM(), 1u, std::thread::hardware_concurrency());
347#else
348 return std::max(1u, std::thread::hardware_concurrency());
349#endif
350 }
351 // _count is the number of threads in the thread pool
352 // it is at least 1 but usually number of cores - 1 reserved for the main thread
353 const unsigned _count{std::max(1u, hardware_concurrency() - 1)};
354 // thread limit is the total number of threads, including expansion threads for waiting calls
355 // It is odd number because a usual pattern is to fan out based on the number of cores, we want
356 // one additional thread so if we fan out the limit number of times we have one additional
357 // thread
358 const unsigned _thread_limit{std::max(9U, hardware_concurrency() * 4 + 1)};
359
360 std::vector<notification_queue> _q{_count};
361 std::atomic<unsigned> _index{0};
362
363 std::mutex _mutex;
364 using lock_t = std::unique_lock<std::mutex>;
365 std::vector<std::thread> _threads;
366 std::vector<waiter> _waiters{_thread_limit - _count};
367
368 void run(unsigned i) {
369 stlab::set_current_thread_name("cc.stlab.default_executor");
370 while (true) {
371 task<void() noexcept> f;
372
373 for (unsigned n = 0; n != _count && !f; ++n) {
374 f = _q[(i + n) % _count].try_pop();
375 }
376 if (!f) {
377 bool done;
378 std::tie(done, f) = _q[i].pop();
379 if (done) break;
380 }
381 if (f) f(); // we can wake with no task.
382 }
383 }
384
385 std::size_t waiters_size() {
386 lock_t lock{_mutex};
387 return _threads.size() - _count;
388 }
389
390public:
392 priority_task_system() {
393 _threads.reserve(_thread_limit);
394 for (unsigned n = 0; n != _count; ++n) {
395 _threads.emplace_back([&, n] { run(n); });
396 }
397 }
398
402 priority_task_system(std::nullptr_t) : priority_task_system() {}
403
404 void join() {
405 for (auto& e : _q)
406 e.done();
407 for (auto& e : _waiters)
408 e.done();
409 for (auto& e : _threads)
410 e.join();
411
412 _q.clear();
413 }
414
415 ~priority_task_system() {
416 assert(_q.empty() && "stlab: Thread pool not joined prior to destruction.");
417 }
418
419 template <std::size_t P, typename F>
420 void execute(F&& f) {
421 static_assert(P < 3, "More than 3 priorities are not known!");
422 auto i = _index++;
423
424 for (unsigned n = 0; n != _count; ++n) {
425 if (_q[(i + n) % _count].try_push(std::forward<F>(f), P)) return;
426 }
427
428 _q[i % _count].push(std::forward<F>(f), P);
429 }
430
431 void add_thread() {
432 lock_t lock{_mutex};
433 if (_threads.size() == _thread_limit) return; // log with cerr
434 _threads.emplace_back([&, i = _threads.size()] {
435 stlab::set_current_thread_name("cc.stlab.default_executor.expansion");
436
437 while (true) {
438 task<void() noexcept> f;
439
440 for (unsigned n = 0; n != _count && !f; ++n) {
441 f = _q[(i + n) % _count].try_pop();
442 }
443
444 if (f) {
445 f(); // we can wake with no task.
446 continue;
447 }
448 if (_waiters[i - _count].wait()) break;
449 };
450 });
451 }
452
453 // returns true if a thread was woken
454 bool wake() {
455 for (auto& e : _q) {
456 if (e.wake()) return true;
457 }
458 for (std::size_t n = 0, l = waiters_size(); n != l; ++n) {
459 if (_waiters[n].wake()) return true;
460 }
461 return false;
462 }
463};
464
467
468priority_task_system& pts();
469
470#endif
471
472/**************************************************************************************************/
473
474#if STLAB_TASK_SYSTEM(WINDOWS)
475
476template <executor_priority P>
477extern task_system<P>& single_task_system();
478
479template <executor_priority P = executor_priority::medium>
480struct executor_type {
481 using result_type = void;
482
483 template <class F>
484 auto operator()(F&& f) const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
485 single_task_system<P>()(std::forward<F>(f));
486 }
487};
488
489#elif STLAB_TASK_SYSTEM(PORTABLE)
490
491template <executor_priority P = executor_priority::medium>
492struct executor_type {
493 using result_type = void;
494
495 template <class F>
496 auto operator()(F&& f) const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
497 pts().execute<static_cast<std::size_t>(P)>(std::forward<F>(f));
498 }
499};
500
501#endif
502
503/**************************************************************************************************/
504
505} // namespace detail
506
507/**************************************************************************************************/
508
510inline constexpr auto low_executor = detail::executor_type<detail::executor_priority::low>{};
512inline constexpr auto default_executor = detail::executor_type<detail::executor_priority::medium>{};
514inline constexpr auto high_executor = detail::executor_type<detail::executor_priority::high>{};
515
516/**************************************************************************************************/
517
519
520STLAB_VERSION_NAMESPACE_END()
521} // namespace stlab
522
523/**************************************************************************************************/
524
525#endif // STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP
526
527/**************************************************************************************************/
auto join(S s, F f, R... upstream_receiver)
Definition channel.hpp:1354
constexpr auto low_executor
Default task pool executor using low thread priority (when using the portable or Windows task system)...
Definition default_executor.hpp:510
constexpr auto default_executor
Default concurrent executor used by stlab::async and related APIs when none is specified.
Definition default_executor.hpp:512
constexpr auto high_executor
Default task pool executor using high thread priority (when using the portable or Windows task system...
Definition default_executor.hpp:514
typename noexcept_deducer< task_, F >::type task
task_ with noexcept deduced from the function type F (e.g. void() vs void() noexcept).
Definition task.hpp:324
Definition reverse.hpp:28
Register and run operations that must execute before program exit.
Set the current thread name (platform-specific).
Move-only callable wrapper for executor scheduling (task<Signature>).