stlab 2.3.0
Modern, modular C++ algorithms, data structures, and concurrency primitives
Loading...
Searching...
No Matches
serial_queue.hpp
Go to the documentation of this file.
1/*
2 Copyright 2015 Adobe
3 Distributed under the Boost Software License, Version 1.0.
4 (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5*/
6
7/**************************************************************************************************/
8
9#ifndef STLAB_CONCURRENCY_SERIAL_QUEUE_HPP
10#define STLAB_CONCURRENCY_SERIAL_QUEUE_HPP
11
20
21/**************************************************************************************************/
22
23#include <stlab/config.hpp>
24
25#include <cstdint>
26#include <deque>
27#include <mutex>
28#include <type_traits>
29#include <utility>
30
31#include <stlab/scope.hpp>
32
33#ifndef STLAB_DISABLE_FUTURE_COROUTINES
34#define STLAB_DISABLE_FUTURE_COROUTINES()
35#endif
36
39
40/**************************************************************************************************/
41
42namespace stlab {
43STLAB_VERSION_NAMESPACE_BEGIN()
44
45
53
54/**************************************************************************************************/
55
56
57enum class schedule_mode : std::uint8_t {
62};
63
64/**************************************************************************************************/
65
66namespace detail {
67
68/**************************************************************************************************/
69
70class serial_instance_t : public std::enable_shared_from_this<serial_instance_t> {
71 using executor_t = std::function<void(task<void() noexcept>&&)>;
72 using queue_t = std::deque<task<void() noexcept>>;
73 using lock_t = std::scoped_lock<std::mutex>;
74
75 std::mutex _m;
76 bool _running{false}; // mutex protects this
77 queue_t _queue; // mutex protects this
78 executor_t _executor;
79 void (serial_instance_t::*_kickstart)();
80
81 static auto pop_front_unsafe(queue_t& q) {
82 auto f = std::move(q.front());
83 q.pop_front();
84 return f;
85 }
86
87 auto empty() -> bool {
88 bool empty;
89
90 scope<lock_t>(_m, [&]() {
91 empty = _queue.empty();
92
93 if (empty) {
94 _running = false;
95 }
96 });
97
98 return empty;
99 }
100
101 void all() {
102 queue_t local_queue;
103
104 scope<lock_t>(_m, [&]() { std::swap(local_queue, _queue); });
105
106 while (!local_queue.empty()) {
107 pop_front_unsafe(local_queue)();
108 }
109
110 if (!empty()) _executor([_this(shared_from_this())]() noexcept { _this->all(); });
111 }
112
113 void single() {
114 task<void()> f;
115
116 scope<lock_t>(_m, [&]() { f = pop_front_unsafe(_queue); });
117
118 f();
119
120 if (!empty()) _executor([_this(shared_from_this())]() noexcept { _this->single(); });
121 }
122
123 // The kickstart allows us to grab a pointer to either the single or all
124 // routine at construction time. When it comes time to process the queue, we
125 // call either via the abstracted _kickstart.
126 void kickstart() { (this->*_kickstart)(); }
127
128 static auto kickstarter(schedule_mode mode) {
129 switch (mode) {
131 return &serial_instance_t::single;
133 return &serial_instance_t::all;
134 }
135
136 // silence some compilers...
137 return &serial_instance_t::single;
138 }
139
140public:
141 template <typename F>
142 void enqueue(F&& f) {
143 bool running(true);
144
145 scope<lock_t>(_m, [&]() {
146 _queue.emplace_back(std::forward<F>(f));
147
148 // A trick to get the value of _running within the lock scope, but then
149 // use it outside the scope, after the lock has been released. It also
150 // sets running to true if it is not yet; two birds, one stone.
151 std::swap(running, _running);
152 });
153
154 if (!running) {
155 _executor([_this(shared_from_this())]() noexcept { _this->kickstart(); });
156 }
157 }
158
159 serial_instance_t(executor_t executor, schedule_mode mode) :
160 _executor(std::move(executor)), _kickstart(kickstarter(mode)) {}
161};
162
163/**************************************************************************************************/
164
165} // namespace detail
166
167/**************************************************************************************************/
168
171 std::shared_ptr<detail::serial_instance_t> _impl;
172
173public:
175 template <typename Executor>
177 _impl(std::make_shared<detail::serial_instance_t>(
178 [_e = std::move(e)](auto&& f) { _e(std::forward<decltype(f)>(f)); }, mode)) {}
179
185 [[nodiscard]] auto executor() const {
186 return [_impl =
187 _impl](auto&& f) -> std::enable_if_t<std::is_nothrow_invocable_v<decltype(f)>> {
188 _impl->enqueue(std::forward<decltype(f)>(f));
189 };
190 }
191
197 template <typename F, typename... Args>
198 auto operator()(F&& f, Args&&... args) const {
199 return async(executor(), std::forward<F>(f), std::forward<Args>(args)...);
200 }
201};
202
203/**************************************************************************************************/
204
206
207STLAB_VERSION_NAMESPACE_END()
208} // namespace stlab
209
210/**************************************************************************************************/
211
212#endif
213
214/**************************************************************************************************/
serial_queue_t(Executor e, schedule_mode mode=schedule_mode::single)
Constructs a serial queue using underlying executor e and drain mode mode.
Definition serial_queue.hpp:176
auto executor() const
Returns an executor that enqueues void() noexcept tasks on this queue.
Definition serial_queue.hpp:185
auto operator()(F &&f, Args &&... args) const
Schedules f(args...) on this queue via async and returns the resulting future.
Definition serial_queue.hpp:198
Futures, packaged tasks, channels, and coroutine integration.
std::function< void(stlab::task< void() noexcept >)> executor_t
Type-erased executor: accepts a void() noexcept task.
Definition executor_base.hpp:44
auto async(const E &executor, F &&f, Args &&... args) -> detail::reduced_t< detail::result_t< std::decay_t< F >, std::decay_t< Args >... > >
Runs f with args on executor and returns a future for the result.
Definition future.hpp:1937
schedule_mode
How the serial queue drains its task deque when kicked.
Definition serial_queue.hpp:57
@ all
Swap out the entire queue and run it to completion before accepting the next batch.
Definition serial_queue.hpp:61
@ single
Run one task, then reschedule; fair interleaving with newly enqueued work.
Definition serial_queue.hpp:59
typename noexcept_deducer< task_, F >::type task
task_ with noexcept deduced from the function type F (e.g. void() vs void() noexcept).
Definition task.hpp:324
auto scope(Args &&... args)
Scopes the lifetime of an instance of T. All but the last arguments construct T; the last argument is...
Definition scope.hpp:71
constexpr auto move(T &&t) noexcept -> std::remove_reference_t< T > &&
A standard move implementation but with a compile-time check for const types.
Definition utility.hpp:154
Definition reverse.hpp:28
Bind an object’s lifetime to a callable’s execution (scope).
Move-only callable wrapper for executor scheduling (task<Signature>).