9#ifndef STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP
10#define STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP
28#include <stlab/config.hpp>
35#if STLAB_TASK_SYSTEM(LIBDISPATCH)
36#include <dispatch/dispatch.h>
37#elif STLAB_TASK_SYSTEM(WINDOWS)
42#elif STLAB_TASK_SYSTEM(PORTABLE)
49#include <condition_variable>
57STLAB_VERSION_NAMESPACE_BEGIN()
71enum class executor_priority : std::uint8_t { high, medium, low };
75#if STLAB_TASK_SYSTEM(LIBDISPATCH)
77constexpr auto platform_priority(executor_priority p) {
79 case executor_priority::high:
80 return DISPATCH_QUEUE_PRIORITY_HIGH;
81 case executor_priority::medium:
82 return DISPATCH_QUEUE_PRIORITY_DEFAULT;
83 case executor_priority::low:
84 return DISPATCH_QUEUE_PRIORITY_LOW;
86 assert(
false &&
"Unknown value!");
88 return DISPATCH_QUEUE_PRIORITY_DEFAULT;
92 dispatch_group_t _group = dispatch_group_create();
94 group_t(
const group_t&) =
delete;
95 group_t(group_t&& a) noexcept : _group(std::exchange(a._group,
nullptr)) {}
96 auto operator=(
const group_t&) -> group_t& =
delete;
97 auto operator=(group_t&& a)
noexcept -> group_t& {
98 _group = std::exchange(a._group,
nullptr);
105auto group() ->
const group_t&;
107template <executor_priority P = executor_priority::medium>
108struct executor_type {
109 using result_type = void;
111 template <
typename F>
112 auto operator()(F&& f)
const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
113 using f_t = std::decay_t<F>;
115 dispatch_group_async_f(detail::group()._group,
116 dispatch_get_global_queue(platform_priority(P), 0),
117 new f_t(std::forward<F>(f)), [](
void* f_) {
118 auto f =
static_cast<f_t*
>(f_);
127#elif STLAB_TASK_SYSTEM(WINDOWS)
129constexpr auto platform_priority(executor_priority p) {
131 case executor_priority::high:
132 return TP_CALLBACK_PRIORITY_HIGH;
133 case executor_priority::medium:
134 return TP_CALLBACK_PRIORITY_NORMAL;
135 case executor_priority::low:
136 return TP_CALLBACK_PRIORITY_LOW;
138 assert(!
"Unknown value!");
140 return TP_CALLBACK_PRIORITY_NORMAL;
143template <executor_priority P = executor_priority::medium>
145 PTP_POOL _pool =
nullptr;
146 TP_CALLBACK_ENVIRON _callBackEnvironment;
147 PTP_CLEANUP_GROUP _cleanupgroup =
nullptr;
151 InitializeThreadpoolEnvironment(&_callBackEnvironment);
152 _pool = CreateThreadpool(
nullptr);
153 if (_pool ==
nullptr)
throw std::bad_alloc();
155 _cleanupgroup = CreateThreadpoolCleanupGroup();
156 if (_cleanupgroup ==
nullptr) {
157 CloseThreadpool(_pool);
158 throw std::bad_alloc();
161 SetThreadpoolCallbackPriority(&_callBackEnvironment, platform_priority(P));
162 SetThreadpoolCallbackPool(&_callBackEnvironment, _pool);
163 SetThreadpoolCallbackCleanupGroup(&_callBackEnvironment, _cleanupgroup,
nullptr);
167 CloseThreadpoolCleanupGroupMembers(_cleanupgroup, FALSE,
nullptr);
168 CloseThreadpoolCleanupGroup(_cleanupgroup);
169 CloseThreadpool(_pool);
174 assert((_pool ==
nullptr) &&
"stlab: Thread pool not joined prior to destruction.");
177 template <
typename F>
178 void operator()(F&& f) {
179 auto p = std::make_unique<F>(std::forward<F>(f));
180 auto work = CreateThreadpoolWork(&callback_impl<F>, p.get(), &_callBackEnvironment);
182 if (work ==
nullptr) {
183 throw std::bad_alloc();
186 SubmitThreadpoolWork(work);
190 template <
typename F>
191 static void CALLBACK callback_impl(PTP_CALLBACK_INSTANCE ,
194 std::unique_ptr<F> f(
static_cast<F*
>(parameter));
196 CloseThreadpoolWork(work);
202#elif STLAB_TASK_SYSTEM(PORTABLE)
206 using lock_t = std::unique_lock<std::mutex>;
207 std::condition_variable _ready;
209 bool _waiting{
false};
224 lock_t lock{_mutex, std::try_to_lock};
225 if (!lock || !_waiting)
return false;
236 while (_waiting && !_done)
243class notification_queue {
245 std::size_t _priority;
246 task<void() noexcept> _task;
249 element_t(F&& f, std::
size_t priority) : _priority{priority}, _task{std::forward<F>(f)} {}
252 bool operator()(
const element_t& a,
const element_t& b)
const {
253 return b._priority < a._priority;
259 using lock_t = std::unique_lock<std::mutex>;
260 std::condition_variable _ready;
261 std::vector<element_t> _q;
262 std::size_t _count{0};
264 bool _waiting{
false};
266 static constexpr std::size_t merge_priority_count(std::size_t priority, std::size_t count) {
267 assert((priority < 4) &&
"Priority must be in the range [0, 4).");
268 return (priority << (
sizeof(std::size_t) * CHAR_BIT - 2)) | count;
272 auto pop_not_empty() ->
task<void() noexcept> {
273 auto result = std::move(_q.front()._task);
274 std::pop_heap(begin(_q), end(_q), element_t::greater());
280 auto try_pop() ->
task<void() noexcept> {
281 lock_t lock{_mutex, std::try_to_lock};
282 if (!lock || _q.empty())
return nullptr;
283 return pop_not_empty();
289 lock_t lock{_mutex, std::try_to_lock};
290 if (!lock || !_waiting)
return false;
297 auto pop() -> std::pair<bool,
task<void() noexcept>> {
300 while (_q.empty() && !_done && _waiting)
303 if (_q.empty())
return {_done,
nullptr};
304 return {
false, pop_not_empty()};
315 template <
typename F>
316 bool try_push(F&& f, std::size_t priority) {
318 lock_t lock{_mutex, std::try_to_lock};
319 if (!lock)
return false;
320 _q.emplace_back(std::forward<F>(f), merge_priority_count(priority, _count++));
321 std::push_heap(begin(_q), end(_q), element_t::greater());
327 template <
typename F>
328 void push(F&& f, std::size_t priority) {
331 _q.emplace_back(std::forward<F>(f), merge_priority_count(priority, _count++));
332 std::push_heap(begin(_q), end(_q), element_t::greater());
342class priority_task_system {
344 static unsigned hardware_concurrency() {
345#if STLAB_TASK_POOL_MAXIMUM() > 0
346 return std::clamp(STLAB_TASK_POOL_MAXIMUM(), 1u, std::thread::hardware_concurrency());
348 return std::max(1u, std::thread::hardware_concurrency());
353 const unsigned _count{std::max(1u, hardware_concurrency() - 1)};
358 const unsigned _thread_limit{std::max(9U, hardware_concurrency() * 4 + 1)};
360 std::vector<notification_queue> _q{_count};
361 std::atomic<unsigned> _index{0};
364 using lock_t = std::unique_lock<std::mutex>;
365 std::vector<std::thread> _threads;
366 std::vector<waiter> _waiters{_thread_limit - _count};
368 void run(
unsigned i) {
369 stlab::set_current_thread_name(
"cc.stlab.default_executor");
371 task<void() noexcept> f;
373 for (
unsigned n = 0; n != _count && !f; ++n) {
374 f = _q[(i + n) % _count].try_pop();
378 std::tie(done, f) = _q[i].pop();
385 std::size_t waiters_size() {
387 return _threads.size() - _count;
392 priority_task_system() {
393 _threads.reserve(_thread_limit);
394 for (
unsigned n = 0; n != _count; ++n) {
395 _threads.emplace_back([&, n] { run(n); });
402 priority_task_system(std::nullptr_t) : priority_task_system() {}
407 for (
auto& e : _waiters)
409 for (
auto& e : _threads)
415 ~priority_task_system() {
416 assert(_q.empty() &&
"stlab: Thread pool not joined prior to destruction.");
419 template <std::
size_t P,
typename F>
420 void execute(F&& f) {
421 static_assert(P < 3,
"More than 3 priorities are not known!");
424 for (
unsigned n = 0; n != _count; ++n) {
425 if (_q[(i + n) % _count].try_push(std::forward<F>(f), P))
return;
428 _q[i % _count].push(std::forward<F>(f), P);
433 if (_threads.size() == _thread_limit)
return;
434 _threads.emplace_back([&, i = _threads.size()] {
435 stlab::set_current_thread_name(
"cc.stlab.default_executor.expansion");
438 task<void() noexcept> f;
440 for (unsigned n = 0; n != _count && !f; ++n) {
441 f = _q[(i + n) % _count].try_pop();
448 if (_waiters[i - _count].wait()) break;
456 if (e.wake())
return true;
458 for (std::size_t n = 0, l = waiters_size(); n != l; ++n) {
459 if (_waiters[n].wake())
return true;
468priority_task_system& pts();
474#if STLAB_TASK_SYSTEM(WINDOWS)
476template <executor_priority P>
477extern task_system<P>& single_task_system();
479template <executor_priority P = executor_priority::medium>
480struct executor_type {
481 using result_type = void;
484 auto operator()(F&& f)
const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
485 single_task_system<P>()(std::forward<F>(f));
489#elif STLAB_TASK_SYSTEM(PORTABLE)
491template <executor_priority P = executor_priority::medium>
492struct executor_type {
493 using result_type = void;
496 auto operator()(F&& f)
const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
497 pts().execute<
static_cast<std::size_t
>(P)>(std::forward<F>(f));
510inline constexpr auto low_executor = detail::executor_type<detail::executor_priority::low>{};
512inline constexpr auto default_executor = detail::executor_type<detail::executor_priority::medium>{};
514inline constexpr auto high_executor = detail::executor_type<detail::executor_priority::high>{};
520STLAB_VERSION_NAMESPACE_END()
auto join(S s, F f, R... upstream_receiver)
Definition channel.hpp:1354
constexpr auto low_executor
Default task pool executor using low thread priority (when using the portable or Windows task system)...
Definition default_executor.hpp:510
constexpr auto default_executor
Default concurrent executor used by stlab::async and related APIs when none is specified.
Definition default_executor.hpp:512
constexpr auto high_executor
Default task pool executor using high thread priority (when using the portable or Windows task system...
Definition default_executor.hpp:514
typename noexcept_deducer< task_, F >::type task
task_ with noexcept deduced from the function type F (e.g. void() vs void() noexcept).
Definition task.hpp:324
Definition reverse.hpp:28
Register and run operations that must execute before program exit.
Set the current thread name (platform-specific).
Move-only callable wrapper for executor scheduling (task<Signature>).