stlab 2.3.0
Modern, modular C++ algorithms, data structures, and concurrency primitives
Loading...
Searching...
No Matches
channel.hpp
Go to the documentation of this file.
1/*
2 Copyright 2015 Adobe
3 Distributed under the Boost Software License, Version 1.0.
4 (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5*/
6
7/**************************************************************************************************/
8
9#ifndef STLAB_CONCURRENCY_CHANNEL_HPP
10#define STLAB_CONCURRENCY_CHANNEL_HPP
11
34
35#include <stlab/config.hpp>
36
37#include <algorithm>
38#include <array>
39#include <atomic>
40#include <cassert>
41#include <chrono>
42#include <cstdint>
43#include <deque>
44#include <exception>
45#include <memory>
46#include <mutex>
47#include <numeric>
48#include <optional>
49#include <tuple>
50#include <type_traits>
51#include <utility>
52#include <variant>
53
57#include <stlab/functional.hpp>
58#include <stlab/memory.hpp>
59
60/**************************************************************************************************/
61
62namespace stlab {
63STLAB_VERSION_NAMESPACE_BEGIN()
64
65
75
76/**************************************************************************************************/
77
78template <typename, typename = void>
79class sender;
80template <typename>
81class receiver;
82
83/**************************************************************************************************/
84
86enum class process_state : std::uint8_t { await, yield };
87
89enum class message_t : std::uint8_t { argument, error };
90
91/**************************************************************************************************/
92
102using process_state_scheduled = std::pair<process_state, std::chrono::nanoseconds>;
103
105constexpr process_state_scheduled await_forever{process_state::await,
106 std::chrono::nanoseconds::max()};
107
109constexpr process_state_scheduled yield_immediate{process_state::yield,
110 std::chrono::nanoseconds::min()};
111
112/**************************************************************************************************/
113
115enum class channel_error_codes : std::uint8_t {
116 broken_channel = 1,
117 process_already_running = 2,
118 no_state = 3
119};
120
121/**************************************************************************************************/
122
123namespace detail {
124
125inline auto channel_error_map(channel_error_codes code) noexcept
126 -> char const* { // convert to name of channel error
127 switch (code) { // switch on error code value
128 case channel_error_codes::broken_channel:
129 return "broken channel";
130
131 case channel_error_codes::process_already_running:
132 return "process already running";
133
134 case channel_error_codes::no_state:
135 return "no state";
136
137 default:
138 return nullptr;
139 }
140}
141
142/**************************************************************************************************/
143
144} // namespace detail
145
146/**************************************************************************************************/
147
149class channel_error : public std::logic_error {
150public:
151 explicit channel_error(channel_error_codes code) : logic_error(""), _code(code) {}
152
153 [[nodiscard]] auto code() const noexcept -> const channel_error_codes& { return _code; }
154
155 [[nodiscard]] auto what() const noexcept -> const char* override {
156 return detail::channel_error_map(_code);
157 }
158
159private:
160 const channel_error_codes _code;
161};
162
163/**************************************************************************************************/
164
165template <typename I, // I models ForwardIterator
166 typename N, // N models PositiveInteger
167 typename F> // F models UnaryFunction
168auto for_each_n(I p, N n, F f) -> I {
169 for (N i = 0; i != n; ++i, ++p) {
170 f(*p);
171 }
172 return p;
173}
174
175struct identity {
176 template <typename T>
177 auto operator()(T&& x) const -> T {
178 return std::forward<T>(x);
179 }
180};
181
182/**************************************************************************************************/
183
184template <typename>
186
187template <typename R, typename... Args>
188struct result_of_<R(Args...)> {
189 using type = R;
190};
191
192template <typename F>
193using result_of_t_ = typename result_of_<F>::type;
194
195template <class T1, class... T>
196struct first_ {
197 using type = T1;
198};
199
200template <typename... T>
201using first_t = typename first_<T...>::type;
202
203/**************************************************************************************************/
204
205template <typename>
207template <typename R, typename Arg>
208struct argument_of<R(Arg)> {
209 using type = Arg;
210};
211
212template <typename T>
213using argument_of_t = typename argument_of<T>::type;
214
215/**************************************************************************************************/
216
217namespace detail {
218
219/**************************************************************************************************/
220
221template <typename T, typename...>
222auto yield_type_(decltype(&T::yield)) -> decltype(std::declval<T>().yield());
223
224template <typename T, typename... Arg>
225auto yield_type_(...) -> decltype(std::declval<T>()(std::declval<Arg>()...));
226
227template <typename T, typename... Arg>
228using yield_type = decltype(yield_type_<T, Arg...>(0));
229
230/**************************************************************************************************/
231
232class avoid_ {};
233
234template <typename T>
235using avoid = std::conditional_t<std::is_same_v<void, T>, avoid_, T>;
236
237/**************************************************************************************************/
238
239template <typename F, std::size_t... I, typename... T>
240auto invoke_(F&& f,
241 std::tuple<std::variant<T, std::exception_ptr>...>& t,
242 std::index_sequence<I...>) {
243 return std::forward<F>(f)(std::move(std::get<I>(t))...);
244}
245
246template <typename F, typename... Args>
247auto avoid_invoke(F&& f, std::tuple<std::variant<Args, std::exception_ptr>...>& t)
248 -> std::enable_if_t<!std::is_same_v<void, yield_type<unwrap_reference_t<F>, Args...>>,
249 yield_type<unwrap_reference_t<F>, Args...>> {
250 return invoke_(std::forward<F>(f), t, std::make_index_sequence<sizeof...(Args)>());
251}
252
253template <typename F, typename... Args>
254auto avoid_invoke(F&& f, std::tuple<std::variant<Args, std::exception_ptr>...>& t)
255 -> std::enable_if_t<std::is_same_v<void, yield_type<unwrap_reference_t<F>, Args...>>, avoid_> {
256 invoke_(std::forward<F>(f), t, std::make_index_sequence<sizeof...(Args)>());
257 return avoid_();
258}
259
260/**************************************************************************************************/
261
262template <std::size_t S>
263struct invoke_variant_dispatcher {
264 template <typename F, typename T, typename... Args, std::size_t... I>
265 static auto invoke_(F&& f, T& t, std::index_sequence<I...>) {
266 return std::forward<F>(f)(std::move(std::get<Args>(std::get<I>(t)))...);
267 }
268
269 template <typename F, typename T, typename... Args>
270 static auto invoke(F&& f, T& t) {
271 return invoke_<F, T, Args...>(std::forward<F>(f), t,
272 std::make_index_sequence<sizeof...(Args)>());
273 }
274};
275
276template <>
277struct invoke_variant_dispatcher<1> {
278 template <typename F, typename T, typename... Args>
279 static auto invoke(F&& f, [[maybe_unused]] T& t) {
280 using arg1_t = first_t<Args...>;
281 if constexpr (std::is_same_v<arg1_t, void>) {
282 return;
283 } else if constexpr (std::is_same_v<arg1_t, detail::avoid_>) {
284 return std::forward<F>(f)();
285 } else {
286 return std::forward<F>(f)(std::move(std::get<arg1_t>(std::get<0>(t))));
287 }
288 }
289};
290
291template <typename F, typename T, typename R, std::size_t S, typename... Args>
292auto avoid_invoke_variant(F&& f, T& t) -> std::enable_if_t<!std::is_same_v<void, R>, R> {
293 return invoke_variant_dispatcher<S>::template invoke<F, T, Args...>(std::forward<F>(f), t);
294}
295
296template <typename F, typename T, typename R, std::size_t S, typename... Args>
297auto avoid_invoke_variant(F&& f, T& t) -> std::enable_if_t<std::is_same_v<void, R>, avoid_> {
298 invoke_variant_dispatcher<S>::template invoke<F, T, Args...>(std::forward<F>(f), t);
299 return avoid_();
300}
301
302/**************************************************************************************************/
303
304template <typename T>
305using receiver_t = typename std::remove_reference_t<T>::result_type;
306
307/**************************************************************************************************/
308
309template <typename T>
310struct shared_process_receiver {
311 virtual ~shared_process_receiver() = default;
312
313 virtual void map(sender<T>) = 0;
314 virtual void clear_to_send() = 0;
315 virtual void add_receiver() = 0;
316 virtual void remove_receiver() = 0;
317 [[nodiscard]] virtual auto executor() const -> executor_t = 0;
318 virtual void set_buffer_size(size_t) = 0;
319 [[nodiscard]] virtual auto buffer_size() const -> size_t = 0;
320};
321
322/**************************************************************************************************/
323
324template <typename T>
325struct shared_process_sender {
326 virtual ~shared_process_sender() = default;
327
328 virtual void send(avoid<T> x) = 0;
329 virtual void send(std::exception_ptr) = 0;
330 virtual void add_sender() = 0;
331 virtual void remove_sender() = 0;
332 [[nodiscard]] virtual auto free_buffer() const -> std::size_t = 0;
333};
334
335/**************************************************************************************************/
336
337template <typename T>
338using process_close_t = decltype(std::declval<T&>().close());
339
340template <typename T>
341constexpr bool has_process_close_v = is_detected_v<process_close_t, T>;
342
343template <typename T>
344auto process_close(std::optional<T>& x)
345 -> std::enable_if_t<has_process_close_v<unwrap_reference_t<T>>> {
346 if (x) unwrap(*x).close();
347}
348
349template <typename T>
350auto process_close(std::optional<T>&)
351 -> std::enable_if_t<!has_process_close_v<unwrap_reference_t<T>>> {}
352
353/**************************************************************************************************/
354
355template <typename T>
356using process_state_t = decltype(std::declval<const T&>().state());
357
358template <typename T>
359constexpr bool has_process_state_v = is_detected_v<process_state_t, T>;
360
361template <typename T>
362auto get_process_state(const std::optional<T>& x)
363 -> std::enable_if_t<has_process_state_v<unwrap_reference_t<T>>, process_state_scheduled> {
364 return unwrap(*x).state();
365}
366
367template <typename T>
368auto get_process_state(const std::optional<T>&)
369 -> std::enable_if_t<!has_process_state_v<unwrap_reference_t<T>>, process_state_scheduled> {
370 return await_forever;
371}
372
373/**************************************************************************************************/
374
375template <typename P>
376using process_set_error_t =
377 decltype(std::declval<P&>().set_error(std::declval<std::exception_ptr>()));
378
379template <typename P>
380constexpr bool has_set_process_error_v = is_detected_v<process_set_error_t, P>;
381
382template <typename P>
383auto set_process_error(P& process, std::exception_ptr&& error)
384 -> std::enable_if_t<has_set_process_error_v<unwrap_reference_t<P>>, void> {
385 unwrap(process).set_error(std::move(error));
386}
387
388template <typename P>
389auto set_process_error(P&, std::exception_ptr&&)
390 -> std::enable_if_t<!has_set_process_error_v<unwrap_reference_t<P>>, void> {}
391
392/**************************************************************************************************/
393
394template <typename T>
395using process_yield_t = decltype(std::declval<T&>().yield());
396
397template <typename T>
398constexpr bool has_process_yield_v = is_detected_v<process_yield_t, T>;
399
400/**************************************************************************************************/
401
402template <typename T, typename... Args>
403using process_await_t = decltype(std::declval<T&>().await(std::declval<Args>()...));
404
405template <typename T, typename... Args>
406constexpr bool has_process_await_v = is_detected_v<process_await_t, T, Args...>;
407
408/**************************************************************************************************/
409
410template <typename P, typename... T, std::size_t... I>
411void await_variant_args_(P& process,
412 std::tuple<std::variant<detail::avoid<T>, std::exception_ptr>...>& args,
413 std::index_sequence<I...>) {
414 unwrap(process).await(std::move(std::get<T>(std::get<I>(args)))...);
415}
416
417template <typename P, typename... T>
418void await_variant_args(P& process,
419 std::tuple<std::variant<detail::avoid<T>, std::exception_ptr>...>& args) {
420 await_variant_args_<P, T...>(process, args, std::make_index_sequence<sizeof...(T)>());
421}
422
423/**************************************************************************************************/
424
425template <typename T>
426auto find_argument_error(T& argument) -> std::optional<std::exception_ptr> {
427 std::optional<std::exception_ptr> result;
428
429 auto error_index = tuple_find(argument, [](const auto& c) {
430 return static_cast<message_t>(c.index()) == message_t::error;
431 });
432
433 if (error_index != std::tuple_size_v<T>) {
434 result = get_i(
435 argument, error_index, [](auto& elem) { return std::get<std::exception_ptr>(elem); },
436 std::exception_ptr{});
437 }
438
439 return result;
440}
441
442/**************************************************************************************************/
443
444template <typename T>
445struct default_queue_strategy {
446 static const std::size_t arguments_size = 1;
447 using value_type = std::tuple<std::variant<avoid<T>, std::exception_ptr>>;
448
449 std::deque<std::variant<avoid<T>, std::exception_ptr>> _queue;
450
451 [[nodiscard]] auto empty() const -> bool { return _queue.empty(); }
452
453 auto front() { return std::make_tuple(std::move(_queue.front())); }
454
455 void pop_front() { _queue.pop_front(); }
456
457 [[nodiscard]] auto size() const { return std::array<std::size_t, 1>{{_queue.size()}}; }
458
459 template <std::size_t>
460 [[nodiscard]] auto queue_size() const {
461 return _queue.size();
462 }
463
464 template <std::size_t, typename U>
465 void append(U&& u) {
466 _queue.emplace_back(std::forward<U>(u));
467 }
468};
469
470/**************************************************************************************************/
471
472template <typename... T>
473struct zip_with_queue_strategy {
474 static const std::size_t Size = sizeof...(T);
475 static const std::size_t arguments_size = Size;
476 using value_type = std::tuple<std::variant<T, std::exception_ptr>...>;
477 using queue_size_t = std::array<std::size_t, Size>;
478 using queue_t = std::tuple<std::deque<std::variant<T, std::exception_ptr>>...>;
479
480 queue_t _queue;
481
482 [[nodiscard]] auto empty() const -> bool {
483 return tuple_find(_queue, [](const auto& c) { return c.empty(); }) != Size;
484 }
485
486 template <std::size_t... I, typename U>
487 auto front_impl(U& u, std::index_sequence<I...>) {
488 return std::make_tuple(std::move(std::get<I>(u).front())...);
489 }
490
491 auto front() {
492 assert(!empty() && "front on an empty container is a very bad idea!");
493 return front_impl(_queue, std::make_index_sequence<Size>());
494 }
495
496 void pop_front() {
497 tuple_for_each(_queue, [](auto& c) { c.pop_front(); });
498 }
499
500 [[nodiscard]] auto size() const {
501 queue_size_t result;
502 std::size_t i = 0;
503 tuple_for_each(_queue, [&i, &result](const auto& c) { result[i++] = c.size(); });
504 return result;
505 }
506
507 template <std::size_t I>
508 [[nodiscard]] auto queue_size() const {
509 return std::get<I>(_queue).size();
510 }
511
512 template <std::size_t I, typename U>
513 void append(U&& u) {
514 std::get<I>(_queue).emplace_back(std::forward<U>(u));
515 }
516};
517
518/**************************************************************************************************/
519
520template <typename... T>
521struct round_robin_queue_strategy {
522 static const std::size_t Size = sizeof...(T);
523 static const std::size_t arguments_size = 1;
524 using item_t = std::variant<first_t<T...>, std::exception_ptr>;
525 using value_type = std::tuple<item_t>;
526 using queue_size_t = std::array<std::size_t, Size>;
527 using queue_t = std::tuple<std::deque<std::variant<T, std::exception_ptr>>...>;
528 std::size_t _index{0};
529 std::size_t _popped_index{0};
530 queue_t _queue;
531
532 [[nodiscard]] auto empty() const -> bool {
533 return get_i(_queue, _index, [](const auto& c) { return c.empty(); }, true);
534 }
535
536 auto front() {
537 assert(!empty() && "front on an empty container is a very bad idea!");
538 return std::make_tuple(get_i(_queue, _index, [](auto& c) { return c.front(); }, item_t{}));
539 }
540
541 void pop_front() {
542 void_i(_queue, _index, [](auto& c) { c.pop_front(); });
543 _popped_index = _index;
544 ++_index;
545 if (_index == Size) _index = 0; // restart from the first sender
546 }
547
548 [[nodiscard]] auto size() const {
549 queue_size_t result;
550 std::size_t i = 0;
551 tuple_for_each(_queue, [&i, &result, this](const auto& c) {
552 if (i == _popped_index)
553 result[i] = c.size();
554 else
555 result[i] = std::numeric_limits<std::size_t>::max();
556 ++i;
557 });
558 return result;
559 }
560
561 template <std::size_t I>
562 [[nodiscard]] auto queue_size() const {
563 return std::get<I>(_queue).size();
564 }
565
566 template <std::size_t I, typename U>
567 void append(U&& u) {
568 std::get<I>(_queue).emplace_back(std::forward<U>(u));
569 }
570};
571
572/**************************************************************************************************/
573
574template <typename... T>
575struct unordered_queue_strategy {
576 static const std::size_t Size = sizeof...(T);
577 static const std::size_t arguments_size = 1;
578 using item_t = std::variant<first_t<T...>, std::exception_ptr>;
579 using value_type = std::tuple<item_t>;
580 using queue_size_t = std::array<std::size_t, Size>;
581 using queue_t = std::tuple<std::deque<std::variant<T, std::exception_ptr>>...>;
582 std::size_t _index{0};
583 std::size_t _popped_index{0};
584 queue_t _queue;
585
586 [[nodiscard]] auto empty() const -> bool {
587 return tuple_find(_queue, [](const auto& c) { return !c.empty(); }) == Size;
588 }
589
590 auto front() {
591 assert(!empty() && "front on an empty container is a very bad idea!");
592 _index = tuple_find(_queue, [](const auto& c) { return !c.empty(); });
593 return std::make_tuple(
594 get_i(_queue, _index, [](auto& c) { return std::move(c.front()); }, item_t{}));
595 }
596
597 void pop_front() {
598 void_i(_queue, _index, [](auto& c) { c.pop_front(); });
599 _popped_index = _index;
600 }
601
602 [[nodiscard]] auto size() const {
603 queue_size_t result;
604 std::size_t i = 0;
605 tuple_for_each(_queue, [&i, &result, this](const auto& c) {
606 if (i == _popped_index)
607 result[i] = c.size();
608 else
609 result[i] = std::numeric_limits<std::size_t>::max();
610 ++i;
611 });
612
613 return result;
614 }
615
616 template <std::size_t I>
617 [[nodiscard]] auto queue_size() const {
618 return std::get<I>(_queue).size();
619 }
620
621 template <std::size_t I, typename U>
622 void append(U&& u) {
623 std::get<I>(_queue).emplace_back(std::forward<U>(u));
624 }
625};
626
627/**************************************************************************************************/
628
629template <typename Q, typename T, typename R, typename... Args>
630struct shared_process;
631
632template <typename Q, typename T, typename R, typename Arg, std::size_t I, typename... Args>
633struct shared_process_sender_indexed : public shared_process_sender<Arg> {
634 shared_process<Q, T, R, Args...>& _shared_process;
635
636 explicit shared_process_sender_indexed(shared_process<Q, T, R, Args...>& sp) :
637 _shared_process(sp) {}
638
639 void add_sender() override { ++_shared_process._sender_count; }
640
641 void remove_sender() override {
642 assert(_shared_process._sender_count > 0);
643 if (--_shared_process._sender_count == 0) {
644 bool do_run;
645 {
646 std::unique_lock<std::mutex> lock(_shared_process._process_mutex);
647 _shared_process._process_close_queue = true;
648 do_run = !_shared_process._receiver_count && !_shared_process._process_running;
649 _shared_process._process_running = _shared_process._process_running || do_run;
650 }
651 if (do_run) _shared_process.run();
652 }
653 }
654
655 template <typename U>
656 void enqueue(U&& u) {
657 bool do_run;
658 {
659 std::unique_lock<std::mutex> lock(_shared_process._process_mutex);
660 _shared_process._queue.template append<I>(
661 std::forward<U>(u)); // TODO (sparent) : overwrite here.
662 do_run = !_shared_process._receiver_count && (!_shared_process._process_running ||
663 _shared_process._timeout_function_active);
664
665 _shared_process._process_running = _shared_process._process_running || do_run;
666 }
667 if (do_run) _shared_process.run();
668 }
669
670 void send(std::exception_ptr error) override { enqueue(std::move(error)); }
671
672 void send(avoid<Arg> arg) override { enqueue(std::move(arg)); }
673
674 [[nodiscard]] auto free_buffer() const -> std::size_t override {
675 std::unique_lock<std::mutex> lock(_shared_process._process_mutex);
676 return _shared_process._process_buffer_size == 0 ?
677 std::numeric_limits<std::size_t>::max() :
678 (_shared_process._process_buffer_size -
679 _shared_process._queue.template queue_size<I>());
680 }
681};
682
683/**************************************************************************************************/
684
685template <typename Q, typename T, typename R, typename U, typename... Args>
686struct shared_process_sender_helper;
687
688template <typename Q, typename T, typename R, std::size_t... I, typename... Args>
689struct shared_process_sender_helper<Q, T, R, std::index_sequence<I...>, Args...>
690 : shared_process_sender_indexed<Q, T, R, Args, I, Args...>... {
691 explicit shared_process_sender_helper(shared_process<Q, T, R, Args...>& sp) :
692 shared_process_sender_indexed<Q, T, R, Args, I, Args...>(sp)... {}
693};
694
695/**************************************************************************************************/
696
697template <typename R, typename Enabled = void>
698struct downstream;
699
700template <typename R>
701struct downstream<R, std::enable_if_t<std::is_copy_constructible_v<R> || std::is_same_v<R, void>>> {
702 std::deque<sender<R>> _data;
703
704 template <typename F>
705 void append_receiver(F&& f) {
706 _data.emplace_back(std::forward<F>(f));
707 }
708
709 void clear() { _data.clear(); }
710
711 [[nodiscard]] auto size() const -> std::size_t { return _data.size(); }
712
713 template <typename... Args>
714 void send(std::size_t n, Args... args) {
715 stlab::for_each_n(begin(_data), n, [&](const auto& e) { e(args...); });
716 }
717
718 [[nodiscard]] auto minimum_free_buffer() const -> std::size_t {
719 if (size() == 0) return 0;
720 // std::reduce with C++17
721 return std::accumulate(_data.cbegin(), _data.cend(),
722 std::numeric_limits<std::size_t>::max(),
723 [](auto val, const auto& e) {
724 return std::min(val, e.free_buffer() ? *e.free_buffer() : val);
725 });
726 }
727};
728
729template <typename R>
730struct downstream<R,
731 std::enable_if_t<!std::is_copy_constructible_v<R> && !std::is_same_v<R, void>>> {
732 std::optional<sender<R>> _data;
733
734 template <typename F>
735 void append_receiver(F&& f) {
736 _data = std::forward<F>(f);
737 }
738
739 void clear() { _data = std::nullopt; }
740
741 [[nodiscard]] auto size() const -> std::size_t { return 1; }
742
743 template <typename... Args>
744 void send(std::size_t, Args&&... args) {
745 if (_data) (*_data)(std::forward<Args>(args)...);
746 }
747
748 [[nodiscard]] auto minimum_free_buffer() const -> std::size_t {
749 if (_data && (*_data).free_buffer()) return *(*_data).free_buffer();
750 return 0;
751 }
752};
753
754/**************************************************************************************************/
755
756template <typename Q, typename T, typename R, typename... Args>
757struct shared_process
758 : shared_process_receiver<R>,
759 shared_process_sender_helper<Q, T, R, std::make_index_sequence<sizeof...(Args)>, Args...>,
760 std::enable_shared_from_this<shared_process<Q, T, R, Args...>> {
761 static_assert((has_process_yield_v<unwrap_reference_t<T>> &&
762 has_process_state_v<unwrap_reference_t<T>>) ||
763 (!has_process_yield_v<unwrap_reference_t<T>> &&
764 !has_process_state_v<unwrap_reference_t<T>>),
765 "Processes that use .yield() must have .state() const");
766
767 /*
768 the downstream continuations are stored in a deque so we don't get reallocations
769 on push back - this allows us to make calls while additional inserts happen.
770 */
771
772 using result_t = R;
773 using queue_strategy_t = Q;
774 using process_t = T;
775 using lock_t = std::unique_lock<std::mutex>;
776
777 std::mutex _downstream_mutex;
778 downstream<result_t> _downstream;
779 queue_strategy_t _queue;
780
781 executor_t _executor;
782 std::optional<process_t> _process;
783
784 std::mutex _process_mutex;
785
786 bool _process_running = false;
787 std::atomic_size_t _process_suspend_count{0};
788 bool _process_close_queue = false;
789 // REVISIT (sparent) : I'm not certain final needs to be under the mutex
790 bool _process_final = false;
791
792 std::mutex _timeout_function_control;
793 std::atomic_bool _timeout_function_active{false};
794
795 std::atomic_size_t _sender_count{0};
796 std::atomic_size_t _receiver_count;
797
798 std::atomic_size_t _process_buffer_size{1};
799
800 const std::tuple<std::shared_ptr<shared_process_receiver<Args>>...> _upstream;
801
802 template <typename E, typename F>
803 shared_process(E&& e, F&& f) :
804 shared_process_sender_helper<Q, T, R, std::make_index_sequence<sizeof...(Args)>, Args...>(
805 *this),
806 _executor(std::forward<E>(e)), _process(std::forward<F>(f)) {
807 _sender_count = std::is_same_v<result_t, void> ? 0 : 1;
808 _receiver_count = !std::is_same_v<result_t, void>;
809 }
810
811 template <typename E, typename F, typename... U>
812 shared_process(E&& e, F&& f, U&&... u) :
813 shared_process_sender_helper<Q, T, R, std::make_index_sequence<sizeof...(Args)>, Args...>(
814 *this),
815 _executor(std::forward<E>(e)), _process(std::forward<F>(f)),
816 _upstream(std::forward<U>(u)...) {
817 _sender_count = sizeof...(Args);
818 _receiver_count = !std::is_same_v<result_t, void>;
819 }
820
821 void add_receiver() override {
822 if (std::is_same_v<result_t, void>) return;
823 ++_receiver_count;
824 }
825
826 void remove_receiver() override {
827 if (std::is_same_v<result_t, void>) return;
828 /*
829 NOTE (sparent) : Decrementing the receiver count can allow this to start running on a
830 send before we can get to the check - so we need to see if we are already running
831 before starting again.
832 */
833 assert(_receiver_count > 0);
834 if (--_receiver_count == 0) {
835 bool do_run;
836 {
837 std::unique_lock<std::mutex> lock(_process_mutex);
838 do_run = ((!_queue.empty() || std::is_same_v<first_t<Args...>, void>) ||
839 _process_close_queue) &&
840 !_process_running;
841 _process_running = _process_running || do_run;
842 }
843 if (do_run) run();
844 }
845 }
846
847 auto executor() const -> executor_t override { return _executor; }
848
849 void task_done() {
850 bool do_run;
851 bool do_final;
852 {
853 std::unique_lock<std::mutex> lock(_process_mutex);
854 do_run = !_queue.empty() || _process_close_queue;
855 _process_running = do_run;
856 do_final = _process_final;
857 }
858 // The mutual exclusiveness of this assert implies too many variables. Should have a single
859 // "get state" call.
860 assert(!(do_run && do_final) && "ERROR (sparent) : can't run and close at the same time.");
861 // I met him on a Monday and my heart stood still
862 if (do_run) run();
863 if (do_final) {
864 std::unique_lock<std::mutex> lock(_downstream_mutex);
865 _downstream.clear(); // This will propagate the close to anything downstream
866 _process = std::nullopt;
867 }
868 }
869
870 void clear_to_send() override {
871 {
872 std::unique_lock<std::mutex> lock(_process_mutex);
873 if (_process_final) {
874 return;
875 }
876 }
877
878 bool do_run = false;
879 {
880 const auto ps = get_process_state(_process);
881 std::unique_lock<std::mutex> lock(_process_mutex);
882 if (_process_suspend_count > 0) --_process_suspend_count; // could be atomic?
883
884 if (!_process_suspend_count) {
885 if (ps.first == process_state::yield || !_queue.empty() || _process_close_queue) {
886 do_run = true;
887 } else {
888 _process_running = false;
889 do_run = false;
890 }
891 }
892 }
893 // Somebody told me that his name was Bill
894 if (do_run) run();
895 }
896
897 auto pop_from_queue() {
898 std::optional<typename Q::value_type> message;
899 std::array<bool, sizeof...(Args)> do_cts = {{false}};
900 bool do_close = false;
901
902 std::unique_lock<std::mutex> lock(_process_mutex);
903 if (_queue.empty()) {
904 std::swap(do_close, _process_close_queue);
905 _process_final = do_close; // unravel after any yield
906 } else {
907 message = std::move(_queue.front());
908 _queue.pop_front();
909 auto queue_size = _queue.size();
910 for (auto index = 0u; index < queue_size.size(); ++index)
911 do_cts[index] = queue_size[index] <= (_process_buffer_size - 1);
912 }
913 return std::make_tuple(std::move(message), do_cts, do_close);
914 }
915
916 auto dequeue() -> bool {
917 using queue_t = typename Q::value_type;
918 std::optional<queue_t> message;
919 std::array<bool, sizeof...(Args)> do_cts;
920 bool do_close = false;
921
922 std::tie(message, do_cts, do_close) = pop_from_queue();
923
924 std::size_t i = 0;
925 tuple_for_each(_upstream, [do_cts, &i](auto& u) {
926 if (do_cts[i] && u) u->clear_to_send();
927 ++i;
928 });
929
930 if (message) {
931 auto error = find_argument_error(*message);
932 if (error) {
933 if (has_set_process_error_v<T>)
934 set_process_error(*_process, std::move(*error));
935 else
936 do_close = true;
937 } else
938 await_variant_args<process_t, Args...>(*_process, *message);
939 } else {
940 do_close = true;
941 }
942
943 if (do_close) process_close(_process);
944
945 return bool(message);
946 }
947
948 /*
949 REVISIT (sparent) : Next two cases are nearly identical, complicated by the need to
950 remove constexpr if to support C++14.
951 */
952
953 template <typename U>
954 auto step() -> std::enable_if_t<has_process_yield_v<unwrap_reference_t<U>> &&
955 !has_process_await_v<unwrap_reference_t<T>, Args...>> {
956 // in case that the timeout function is just been executed then we have to re-schedule
957 // the current run
958 lock_t lock(_timeout_function_control, std::try_to_lock);
959 if (!lock) {
960 run();
961 return;
962 }
963 _timeout_function_active = false;
964
965 /*
966 While we are waiting we will flush the queue. The assumption here is that work
967 is done on yield()
968 */
969 try {
970 if (get_process_state(_process).first == process_state::await) return;
971
972 // Workaround until we can use structured bindings
973 auto tmp = get_process_state(_process);
974 const auto& state = tmp.first;
975 const auto& duration = tmp.second;
976
977 /*
978 Once we hit yield, go ahead and call it. If the yield is delayed then schedule it.
979 This process will be considered running until it executes.
980 */
981 if (state == process_state::yield) {
982 if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
983 std::chrono::nanoseconds::min())
984 broadcast(unwrap(*_process).yield());
985 else
986 execute_at(duration, _executor)(
987 [_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
988 auto _this = _weak_this.lock();
989 if (!_this) return;
990 _this->try_broadcast();
991 });
992 }
993
994 /*
995 We are in an await state and the queue is empty.
996
997 If we await forever then task_done() leaving us in an await state.
998 else if we await with an expired timeout then go ahead and yield now.
999 else schedule a timeout when we will yield if not canceled by intervening await.
1000 */
1001 else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) ==
1002 std::chrono::nanoseconds::max()) {
1003 task_done();
1004 } else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
1005 std::chrono::nanoseconds::min()) {
1006 broadcast(unwrap(*_process).yield());
1007 } else {
1008 /* Schedule a timeout. */
1009 _timeout_function_active = true;
1010 execute_at(duration, _executor)(
1011 [_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
1012 auto _this = _weak_this.lock();
1013 // It may be that the complete channel is gone in the meanwhile
1014 if (!_this) return;
1015
1016 // try_lock can fail spuriously
1017 while (true) {
1018 lock_t lock(_this->_timeout_function_control, std::try_to_lock);
1019 if (!lock) continue;
1020
1021 // we were cancelled
1022 if (get_process_state(_this->_process).first != process_state::yield) {
1023 _this->try_broadcast();
1024 _this->_timeout_function_active = false;
1025 }
1026 return;
1027 }
1028 });
1029 }
1030 } catch (...) { // this catches exceptions during _process.await() and _process.yield()
1031 broadcast(std::move(std::current_exception()));
1032 }
1033 }
1034
1035 template <typename U>
1036 auto step() -> std::enable_if_t<has_process_yield_v<unwrap_reference_t<U>> &&
1037 has_process_await_v<unwrap_reference_t<T>, Args...>> {
1038 // in case that the timeout function is just been executed then we have to re-schedule
1039 // the current run
1040 lock_t lock(_timeout_function_control, std::try_to_lock);
1041 if (!lock) {
1042 run();
1043 return;
1044 }
1045 _timeout_function_active = false;
1046
1047 /*
1048 While we are waiting we will flush the queue. The assumption here is that work
1049 is done on yield()
1050 */
1051 try {
1052 while (get_process_state(_process).first == process_state::await) {
1053 if (!dequeue()) break;
1054 }
1055
1056 // Workaround until we can use structured bindings
1057 auto tmp = get_process_state(_process);
1058 const auto& state = tmp.first;
1059 const auto& duration = tmp.second;
1060
1061 /*
1062 Once we hit yield, go ahead and call it. If the yield is delayed then schedule it.
1063 This process will be considered running until it executes.
1064 */
1065 if (state == process_state::yield) {
1066 if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
1067 std::chrono::nanoseconds::min())
1068 broadcast(unwrap(*_process).yield());
1069 else
1070 execute_at(duration, _executor)(
1071 [_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
1072 auto _this = _weak_this.lock();
1073 if (!_this) return;
1074 _this->try_broadcast();
1075 });
1076 }
1077
1078 /*
1079 We are in an await state and the queue is empty.
1080
1081 If we await forever then task_done() leaving us in an await state.
1082 else if we await with an expired timeout then go ahead and yield now.
1083 else schedule a timeout when we will yield if not canceled by intervening await.
1084 */
1085 else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) ==
1086 std::chrono::nanoseconds::max()) {
1087 task_done();
1088 } else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
1089 std::chrono::nanoseconds::min()) {
1090 broadcast(unwrap(*_process).yield());
1091 } else {
1092 /* Schedule a timeout. */
1093 _timeout_function_active = true;
1094 execute_at(duration, _executor)(
1095 [_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
1096 auto _this = _weak_this.lock();
1097 // It may be that the complete channel is gone in the meanwhile
1098 if (!_this) return;
1099
1100 // try_lock can fail spuriously
1101 while (true) {
1102 lock_t lock(_this->_timeout_function_control, std::try_to_lock);
1103 if (!lock) continue;
1104
1105 // we were cancelled
1106 if (get_process_state(_this->_process).first != process_state::yield) {
1107 _this->try_broadcast();
1108 _this->_timeout_function_active = false;
1109 }
1110 return;
1111 }
1112 });
1113 }
1114 } catch (...) { // this catches exceptions during _process.await() and _process.yield()
1115 broadcast(std::move(std::current_exception()));
1116 }
1117 }
1118
1119 void try_broadcast() {
1120 try {
1121 if (_process) broadcast(unwrap(*_process).yield());
1122 } catch (...) {
1123 broadcast(std::move(std::current_exception()));
1124 }
1125 }
1126 /*
1127 REVISIT (sparent) : See above comments on step() and ensure consistency.
1128
1129 What is this code doing, if we don't have a yield then it also assumes no await?
1130
1131 This seems to be doing a lot for a (required) noexcept operation - are we sure?
1132 */
1133
1134 template <typename U>
1135 auto step() noexcept -> std::enable_if_t<!has_process_yield_v<unwrap_reference_t<U>>> {
1136 using queue_t = typename Q::value_type;
1137 std::optional<queue_t> message;
1138 std::array<bool, sizeof...(Args)> do_cts;
1139 bool do_close = false;
1140
1141 std::tie(message, do_cts, do_close) = pop_from_queue();
1142
1143 std::size_t i = 0;
1144 tuple_for_each(_upstream, [do_cts, &i](auto& u) {
1145 if (do_cts[i] && u) u->clear_to_send();
1146 ++i;
1147 });
1148
1149 if (message) {
1150 auto error = find_argument_error(*message);
1151 if (error) {
1152 do_close = true;
1153 } else {
1154 try {
1155 broadcast(
1156 avoid_invoke_variant<process_t, queue_t, R, Q::arguments_size, Args...>(
1157 std::move(*_process), *message));
1158 } catch (...) {
1159 broadcast(std::move(std::current_exception()));
1160 }
1161 }
1162 } else
1163 task_done();
1164 }
1165
1166 void run() {
1167 _executor([_p = make_weak_ptr(this->shared_from_this())]() noexcept {
1168 auto p = _p.lock();
1169 if (p) p->template step<T>();
1170 });
1171 }
1172
1173 template <typename... A>
1174 void broadcast(A&&... args) {
1175 /*
1176 We broadcast the result to any processes currently attached to receiver
1177 */
1178
1179 std::size_t n;
1180 bool suspend_process;
1181 {
1182 std::unique_lock<std::mutex> lock(_downstream_mutex);
1183 n = _downstream.size();
1184 suspend_process = _downstream.minimum_free_buffer() <= 1;
1185 }
1186
1187 {
1188 std::unique_lock<std::mutex> lock(_process_mutex);
1189 if (suspend_process) {
1190 /*
1191 Suspend for however many downstream processes we have + 1 for this process -
1192 that ensures that we are not kicked to run while still broadcasting.
1193 */
1194 _process_suspend_count = n + 1;
1195 } else
1196 _process_suspend_count = 1;
1197 }
1198 /*
1199 There is no lock on _downstream here. We only ever append to this deque so the first
1200 n elements are good.
1201 */
1202
1203 _downstream.send(n, std::forward<A>(args)...);
1204
1205 clear_to_send(); // unsuspend this process and decrement the _process_suspend_count
1206 // immediately by 1
1207 }
1208
1209 void map(sender<result_t> f) override {
1210 /*
1211 REVISIT (sparent) : If we are in a final state then we should destruct the sender
1212 and not add it here.
1213 */
1214 {
1215 std::unique_lock<std::mutex> lock(_downstream_mutex);
1216 _downstream.append_receiver(std::move(f));
1217 }
1218 }
1219
1220 void set_buffer_size(size_t buffer_size) override { _process_buffer_size = buffer_size; }
1221
1222 auto buffer_size() const -> size_t override { return _process_buffer_size; }
1223};
1224
1225/**************************************************************************************************/
1226
1227} // namespace detail
1228
1231 template <typename... R>
1232 using strategy_type = detail::unordered_queue_strategy<detail::receiver_t<R>...>;
1233};
1234
1237 template <typename... R>
1238 using strategy_type = detail::round_robin_queue_strategy<detail::receiver_t<R>...>;
1239};
1240
1244 template <typename... R>
1245 using strategy_type = detail::zip_with_queue_strategy<detail::receiver_t<R>...>;
1246};
1247
1248/**************************************************************************************************/
1249
1250namespace detail {
1251
1252// This helper class is necessary to encapsulate the following functions, because Clang
1253// currently has a bug in accepting friend functions with auto return type
1254struct channel_combiner {
1255 template <typename P, typename URP, typename... R, std::size_t... I>
1256 static void map_as_sender_(P& p, URP& upstream_receiver_processes, std::index_sequence<I...>) {
1257 using shared_process_t = typename P::element_type;
1258 using queue_t = typename shared_process_t::queue_strategy_t;
1259 using process_t = typename shared_process_t::process_t;
1260 using result_t = typename shared_process_t::result_t;
1261
1262 (void)std::initializer_list<int>{
1263 (std::get<I>(upstream_receiver_processes)
1264 ->map(sender<R>(std::dynamic_pointer_cast<shared_process_sender<R>>(
1265 std::dynamic_pointer_cast<
1266 shared_process_sender_indexed<queue_t, process_t, result_t, R, I, R...>>(
1267 p)))),
1268 0)...};
1269 }
1270
1271 template <typename P, typename URP, typename... R>
1272 static void map_as_sender(P& p, URP& upstream_receiver_processes) {
1273 map_as_sender_<P, URP, R...>(p, upstream_receiver_processes,
1274 std::make_index_sequence<sizeof...(R)>());
1275 }
1276
1277 template <typename M, typename F, typename... R>
1278 struct merge_result {
1279 using type = yield_type<unwrap_reference_t<F>, receiver_t<first_t<R...>>>;
1280 };
1281
1282 template <typename F, typename... R>
1283 struct merge_result<zip_with_t, F, R...> {
1284 using type = yield_type<unwrap_reference_t<F>, receiver_t<R>...>;
1285 };
1286
1287 template <typename M, typename S, typename F, typename... R>
1288 static auto merge_helper(S&& s, F&& f, R&&... upstream_receiver) {
1289 using result_t = typename merge_result<M, F, R...>::type;
1290
1291 auto upstream_receiver_processes = std::make_tuple(upstream_receiver._p...);
1292 auto merge_process =
1293 std::make_shared<shared_process<typename M::template strategy_type<R...>, F, result_t,
1294 receiver_t<R>...>>(
1295 std::forward<S>(s), std::forward<F>(f), upstream_receiver._p...);
1296
1297 map_as_sender<decltype(merge_process), decltype(upstream_receiver_processes),
1298 receiver_t<R>...>(merge_process, upstream_receiver_processes);
1299
1300 return receiver<result_t>(std::move(merge_process));
1301 }
1302};
1303
1304struct zip_helper {
1305 template <typename... T>
1306 auto operator()(T&&... t) const {
1307 return std::make_tuple(std::forward<T>(t)...);
1308 }
1309};
1310
1311template <typename E, typename T>
1312struct channel_ {
1313 static auto create(E executor) {
1314 auto p = std::make_shared<
1315 detail::shared_process<detail::default_queue_strategy<T>, identity, T, T>>(
1316 std::move(executor), identity());
1317
1318 return std::make_pair(sender<T>(p), receiver<T>(p));
1319 }
1320};
1321
1322template <typename E>
1323struct channel_<E, void> {
1324 static auto create(E executor) {
1325 auto p = std::make_shared<
1326 detail::shared_process<detail::default_queue_strategy<void>, identity, void, void>>(
1327 std::move(executor), identity());
1328
1329 return receiver<void>(p);
1330 }
1331};
1332
1333/**************************************************************************************************/
1334
1335} // namespace detail
1336
1337/**************************************************************************************************/
1338
1340template <typename T, typename E>
1342 return detail::channel_<E, T>::create(std::move(executor));
1343}
1344
1345/**************************************************************************************************/
1346
1353template <typename S, typename F, typename... R>
1354[[deprecated("Use zip_with")]] auto join(S s, F f, R... upstream_receiver) {
1355 return detail::channel_combiner::merge_helper<zip_with_t, S, F, R...>(
1356 std::move(s), std::move(f), std::forward<R>(upstream_receiver)...);
1357}
1358
1359/**************************************************************************************************/
1360
1366template <typename S, typename F, typename... R>
1367[[deprecated("Use merge_channel<unordered_t>")]] auto merge(S s, F f, R... upstream_receiver) {
1368 return detail::channel_combiner::merge_helper<unordered_t, S, F, R...>(
1369 std::move(s), std::move(f), std::move(upstream_receiver)...);
1370}
1371
1372/**************************************************************************************************/
1373
1380template <typename M, typename S, typename F, typename... R>
1381auto merge_channel(S s, F f, R&&... upstream_receiver) {
1382 return detail::channel_combiner::merge_helper<M>(std::move(s), std::move(f),
1383 std::forward<R>(upstream_receiver)...);
1384}
1385
1386/**************************************************************************************************/
1387
1393template <typename S, typename F, typename... R>
1394auto zip_with(S s, F f, const R&... upstream_receiver) {
1395 return detail::channel_combiner::merge_helper<zip_with_t>(std::move(s), std::move(f),
1396 upstream_receiver...);
1397}
1398
1399/**************************************************************************************************/
1400
1406template <typename S, typename... R>
1407auto zip(S s, const R&... r) {
1408 return zip_with(std::move(s), detail::zip_helper{}, r...);
1409}
1410
1411// template <typename S, typename F, typename... R>
1412// [[deprecated("Use merge_channel<round_robin_t>")]] auto zip(S s, F f, R... upstream_receiver);
1413
1414/**************************************************************************************************/
1415
1425 std::size_t _value;
1427 buffer_size(std::size_t b) : _value(b) {}
1428};
1429
1430/**************************************************************************************************/
1431
1432namespace detail {
1433
1434struct annotations {
1435 std::optional<executor_t> _executor;
1436 std::optional<std::size_t> _buffer_size;
1437
1438 annotations(executor_t e, std::size_t bs) : _executor(std::move(e)), _buffer_size(bs) {}
1439 explicit annotations(executor_t e) : _executor(std::move(e)) {}
1440 explicit annotations(std::size_t bs) : _buffer_size(bs) {}
1441};
1442
1443template <typename F>
1444struct annotated_process {
1445 using process_type = F;
1446
1447 F _f;
1448 annotations _annotations;
1449
1450 explicit annotated_process(executor_task_pair<F>&& etp) :
1451 _f(std::move(etp._f)), _annotations(std::move(etp._executor)) {}
1452
1453 annotated_process(F f, const executor& e) : _f(std::move(f)), _annotations(e._executor) {}
1454 annotated_process(F f, buffer_size bs) : _f(std::move(f)), _annotations(bs._value) {}
1455
1456 annotated_process(F f, executor&& e) : _f(std::move(f)), _annotations(std::move(e._executor)) {}
1457 annotated_process(F f, annotations&& a) : _f(std::move(f)), _annotations(std::move(a)) {}
1458 annotated_process(executor_task_pair<F>&& etp, buffer_size bs) :
1459 _f(std::move(etp._f)), _annotations(std::move(etp._executor), bs) {}
1460};
1461
1462template <typename B, typename E>
1463auto combine_bs_executor(B&& b, E&& e) -> detail::annotations {
1464 detail::annotations result{b._value};
1465 result._executor = std::forward<E>(e)._executor;
1466 return result;
1467}
1468
1469} // namespace detail
1470
1471// !std::is_enum_v<std::remove_reference_t<F>>, int> = 0 is used to exclude enum types from
1472// operator& so built-in & is used (e.g. doctest assertType).
1473
1474inline auto operator&(buffer_size bs, const executor& e) -> detail::annotations {
1475 return detail::combine_bs_executor(bs, e);
1476}
1477
1478inline auto operator&(buffer_size bs, executor&& e) -> detail::annotations {
1479 return detail::combine_bs_executor(bs, std::move(e));
1480}
1481
1482inline auto operator&(const executor& e, buffer_size bs) -> detail::annotations {
1483 return detail::combine_bs_executor(bs, e);
1484}
1485
1486inline auto operator&(executor&& e, buffer_size bs) -> detail::annotations {
1487 return detail::combine_bs_executor(bs, std::move(e));
1488}
1489
1490template <typename F, std::enable_if_t<!std::is_enum_v<std::remove_reference_t<F>>, int> = 0>
1491auto operator&(buffer_size bs, F&& f) -> detail::annotated_process<F> {
1492 return detail::annotated_process<F>(std::forward<F>(f), bs);
1493}
1494
1495template <typename F, std::enable_if_t<!std::is_enum_v<std::remove_reference_t<F>>, int> = 0>
1496auto operator&(F&& f, buffer_size bs) -> detail::annotated_process<F> {
1497 return detail::annotated_process<F>(std::forward<F>(f), bs);
1498}
1499
1500template <typename F>
1501auto operator&(executor_task_pair<F>&& etp, buffer_size bs) -> detail::annotated_process<F> {
1502 return detail::annotated_process<F>{std::move(etp), bs};
1503}
1504
1505template <typename F>
1506auto operator&(buffer_size bs, executor_task_pair<F>&& etp) -> detail::annotated_process<F> {
1507 return detail::annotated_process<F>{std::move(etp), bs};
1508}
1509
1510template <typename F, std::enable_if_t<!std::is_enum_v<std::remove_reference_t<F>>, int> = 0>
1511auto operator&(detail::annotations&& a, F&& f) -> detail::annotated_process<F> {
1512 return detail::annotated_process<F>{std::forward<F>(f), std::move(a)};
1513}
1514
1515template <typename F, std::enable_if_t<!std::is_enum_v<std::remove_reference_t<F>>, int> = 0>
1516auto operator&(F&& f, detail::annotations&& a) -> detail::annotated_process<F> {
1517 return detail::annotated_process<F>{std::forward<F>(f), std::move(a)};
1518}
1519
1520template <typename F>
1521auto operator&(detail::annotated_process<F>&& a, executor&& e) -> detail::annotated_process<F> {
1522 auto result{std::move(a)};
1523 a._annotations._executor = std::move(e._executor);
1524 return result;
1525}
1526
1527template <typename F>
1528auto operator&(detail::annotated_process<F>&& a, buffer_size bs) -> detail::annotated_process<F> {
1529 auto result{std::move(a)};
1530 a._annotations._buffer_size = bs._value;
1531 return result;
1532}
1533
1534/**************************************************************************************************/
1535
1553template <typename T>
1554class STLAB_NODISCARD() receiver {
1555 using ptr_t = std::shared_ptr<detail::shared_process_receiver<T>>;
1556
1557 ptr_t _p;
1558 bool _ready = false;
1559
1560 template <typename U, typename>
1561 friend class sender;
1562
1563 template <typename U>
1564 friend class receiver;
1565
1566 template <typename U, typename V>
1567 friend struct detail::channel_;
1568
1569 friend struct detail::channel_combiner;
1570
1571 explicit receiver(ptr_t p) : _p(std::move(p)) {}
1572
1573public:
1574 using result_type = T;
1575
1576 receiver() = default;
1577
1578 ~receiver() {
1579 if (!_ready && _p) _p->remove_receiver();
1580 }
1581
1582 receiver(const receiver& x) : _p(x._p), _ready(x._ready) {
1583 if (_p) _p->add_receiver();
1584 }
1585
1586 receiver(receiver&&) noexcept = default;
1587
1588 auto operator=(const receiver& x) -> receiver& {
1589 // self-assignment is not allowed to disable cert-oop54-cpp warning (and is likely a bug)
1590 assert(this != &x && "self-assignment is not allowed");
1591 return *this = receiver(x);
1592 }
1593
1594 auto operator=(receiver&& x) noexcept -> receiver& = default;
1595
1597 void set_ready() {
1598 if (!_ready && _p) _p->remove_receiver();
1599 _ready = true;
1600 }
1601
1602 void swap(receiver& x) noexcept { std::swap(*this, x); }
1603
1604 inline friend void swap(receiver& x, receiver& y) noexcept { x.swap(y); }
1605 inline friend auto operator==(const receiver& x, const receiver& y) -> bool {
1606 return x._p == y._p;
1607 };
1608 inline friend auto operator!=(const receiver& x, const receiver& y) -> bool {
1609 return !(x == y);
1610 };
1611
1613 [[nodiscard]] auto ready() const -> bool { return _ready; }
1614
1619
1621 template <typename F>
1622 auto operator|(F&& f) const {
1623 if (!_p) throw channel_error(channel_error_codes::broken_channel);
1624
1625 if (_ready) throw channel_error(channel_error_codes::process_already_running);
1626
1627 auto p = std::make_shared<detail::shared_process<
1628 detail::default_queue_strategy<T>, F, detail::yield_type<unwrap_reference_t<F>, T>, T>>(
1629 _p->executor(), std::forward<F>(f), _p);
1630 _p->map(sender<T>(p));
1631 return receiver<detail::yield_type<unwrap_reference_t<F>, T>>(std::move(p));
1632 }
1633
1635 template <typename F>
1636 auto operator|(detail::annotated_process<F> ap) {
1637 if (!_p) throw channel_error(channel_error_codes::broken_channel);
1638
1639 if (_ready) throw channel_error(channel_error_codes::process_already_running);
1640
1641 auto executor = ap._annotations._executor.value_or(_p->executor());
1642 auto p = std::make_shared<detail::shared_process<
1643 detail::default_queue_strategy<T>, F, detail::yield_type<unwrap_reference_t<F>, T>, T>>(
1644 executor, std::move(ap._f), _p);
1645
1646 _p->map(sender<T>(p));
1647
1648 if (ap._annotations._buffer_size) p->set_buffer_size(*ap._annotations._buffer_size);
1649
1650 return receiver<detail::yield_type<unwrap_reference_t<F>, T>>(std::move(p));
1651 }
1652
1654 template <typename F>
1656 return operator|(detail::annotated_process<F>(std::move(etp)));
1657 }
1658
1660 auto operator|(sender<T> send) {
1661 return operator|(
1662 [_send = std::move(send)](auto&& x) { _send(std::forward<decltype(x)>(x)); });
1663 }
1664};
1665
1666/**************************************************************************************************/
1667
1669template <typename T>
1670class sender<T, enable_if_copyable<T>> {
1671 using ptr_t = std::weak_ptr<detail::shared_process_sender<T>>;
1672 ptr_t _p;
1673
1674 template <typename U>
1675 friend class receiver;
1676
1677 template <typename U, typename V>
1678 friend struct detail::channel_;
1679
1680 friend struct detail::channel_combiner;
1681
1682 sender(ptr_t p) : _p(std::move(p)) {}
1683
1684public:
1685 sender() = default;
1686
1687 ~sender() {
1688 if (auto p = _p.lock()) p->remove_sender();
1689 }
1690
1691 sender(const sender& x) : _p(x._p) {
1692 if (auto p = _p.lock()) p->add_sender();
1693 }
1694
1695 sender(sender&&) noexcept = default;
1696
1697 // copy-assign uses copy-ctor to keep sender count correct.
1698 auto operator=(const sender& x) -> sender& {
1699 // self-assignment is not allowed to disable cert-oop54-cpp warning (and is likely a bug)
1700 assert(this != &x && "self-assignment is not allowed");
1701 return *this = sender(x);
1702 }
1703
1704 auto operator=(sender&&) noexcept -> sender& = default;
1705
1706 void swap(sender& x) noexcept { std::swap(*this, x); }
1707
1708 inline friend void swap(sender& x, sender& y) noexcept { x.swap(y); }
1709
1710 inline friend auto operator==(const sender& x, const sender& y) -> bool {
1711 return x._p.lock() == y._p.lock();
1712 };
1713
1714 inline friend auto operator!=(const sender& x, const sender& y) -> bool { return !(x == y); };
1715
1717 void close() {
1718 auto p = _p.lock();
1719 if (p) p->remove_sender();
1720 _p.reset();
1721 }
1722
1724 template <typename... A>
1725 void operator()(A&&... args) const {
1726 auto p = _p.lock();
1727 if (p) p->send(std::forward<A>(args)...);
1728 }
1729
1731 [[nodiscard]] auto free_buffer() const -> std::optional<std::size_t> {
1732 std::optional<std::size_t> result;
1733 auto p = _p.lock();
1734 if (p) result = p->free_buffer();
1735 return result;
1736 }
1737};
1738
1740template <typename T>
1741class sender<T, enable_if_not_copyable<T>> {
1742 using ptr_t = std::weak_ptr<detail::shared_process_sender<T>>;
1743 ptr_t _p;
1744
1745 template <typename U>
1746 friend class receiver;
1747
1748 template <typename U, typename V>
1749 friend struct detail::channel_;
1750
1751 friend struct detail::channel_combiner;
1752
1753 sender(ptr_t p) : _p(std::move(p)) {}
1754
1755public:
1756 sender() = default;
1757
1758 ~sender() {
1759 auto p = _p.lock();
1760 if (p) p->remove_sender();
1761 }
1762
1763 sender(const sender& x) = delete;
1764 sender(sender&&) noexcept = default;
1765 auto operator=(const sender& x) -> sender& = delete;
1766 auto operator=(sender&&) noexcept -> sender& = default;
1767
1768 void swap(sender& x) noexcept { std::swap(*this, x); }
1769
1770 inline friend void swap(sender& x, sender& y) noexcept { x.swap(y); }
1771
1772 inline friend auto operator==(const sender& x, const sender& y) -> bool {
1773 return x._p.lock() == y._p.lock();
1774 };
1775
1776 inline friend auto operator!=(const sender& x, const sender& y) -> bool { return !(x == y); };
1777
1779 void close() {
1780 auto p = _p.lock();
1781 if (p) p->remove_sender();
1782 _p.reset();
1783 }
1784
1786 template <typename... A>
1787 void operator()(A&&... args) const {
1788 auto p = _p.lock();
1789 if (p) p->send(std::forward<A>(args)...);
1790 }
1791
1793 [[nodiscard]] auto free_buffer() const -> std::optional<std::size_t> {
1794 std::optional<std::size_t> result;
1795 auto p = _p.lock();
1796 if (p) result = p->free_buffer();
1797 return result;
1798 }
1799};
1800
1801/**************************************************************************************************/
1802
1804template <typename F>
1806
1808template <typename R, typename... Args>
1809struct function_process<R(Args...)> {
1810 std::function<R(Args...)> _f;
1811 std::function<R()> _bound;
1812 bool _done = true;
1813
1814 using signature = R(Args...);
1815
1816 template <typename F>
1817 function_process(F&& f) : _f(std::forward<F>(f)) {}
1818
1820 template <typename... A>
1821 void await(A&&... args) {
1822 _bound = std::bind(_f, std::forward<A>(args)...);
1823 _done = false;
1824 }
1825
1827 auto yield() -> R {
1828 _done = true;
1829 return _bound();
1830 }
1831
1833 [[nodiscard]] auto state() const -> process_state_scheduled {
1834 return _done ? await_forever : yield_immediate;
1835 }
1836};
1837
1838/**************************************************************************************************/
1839
1841
1842STLAB_VERSION_NAMESPACE_END()
1843} // namespace stlab
1844
1845/**************************************************************************************************/
1846
1847#endif
Exception type for channel usage errors (broken channel, process already running, etc....
Definition channel.hpp:149
Receiving end of a CSP channel.
Definition channel.hpp:1554
auto operator|(executor_task_pair< F > etp)
Attaches etp (process plus executor from operator&).
Definition channel.hpp:1655
auto ready() const -> bool
Returns true if set_ready() was called on this receiver.
Definition channel.hpp:1613
auto operator|(F &&f) const
Attaches f and returns a new receiver (inherits executor unless overridden).
Definition channel.hpp:1622
auto operator|(detail::annotated_process< F > ap)
Attaches an annotated process (buffer size and/or executor from operator&).
Definition channel.hpp:1636
void set_ready()
Marks this receiver ready so values may flow through the graph.
Definition channel.hpp:1597
auto operator|(sender< T > send)
Forwards values into downstream send.
Definition channel.hpp:1660
void operator()(A &&... args) const
Sends a value (or exception) into the channel.
Definition channel.hpp:1725
void close()
Closes this side of the channel (releases the weak link to the shared process).
Definition channel.hpp:1717
auto free_buffer() const -> std::optional< std::size_t >
Remaining capacity in the downstream process buffer, if known.
Definition channel.hpp:1731
void operator()(A &&... args) const
Sends a value (or exception) into the channel.
Definition channel.hpp:1787
void close()
Closes this side of the channel (releases the weak link to the shared process).
Definition channel.hpp:1779
auto free_buffer() const -> std::optional< std::size_t >
Remaining capacity in the downstream process buffer, if known.
Definition channel.hpp:1793
Definition channel.hpp:79
Executor type aliases and scheduling helpers.
Reference unwrapping and related functional helpers.
auto await(future< T > &&x) -> T
Synchronously wait for the result x. If x resolves as an exception, the exception is rethrown....
Definition await.hpp:72
auto merge(S s, F f, R... upstream_receiver)
Definition channel.hpp:1367
message_t
Discriminator for tuple elements that carry either a value or an exception pointer.
Definition channel.hpp:89
channel_error_codes
Error codes reported by channel_error.
Definition channel.hpp:115
constexpr process_state_scheduled await_forever
Always await the next upstream value (no yield timeout).
Definition channel.hpp:105
std::pair< process_state, std::chrono::nanoseconds > process_state_scheduled
process_state plus a deadline returned from process::state().
Definition channel.hpp:102
auto zip(S s, const R &... r)
Zips upstream receivers in step; yields std::tuple<T...> of their result_types.
Definition channel.hpp:1407
auto merge_channel(S s, F f, R &&... upstream_receiver)
Creates a receiver that merges upstream channels using merge strategy M.
Definition channel.hpp:1381
auto join(S s, F f, R... upstream_receiver)
Definition channel.hpp:1354
process_state
Scheduling hint for a process: wait for input (await) or run to produce output (yield).
Definition channel.hpp:86
auto zip_with(S s, F f, const R &... upstream_receiver)
Creates a receiver that runs f when each upstream has produced one value.
Definition channel.hpp:1394
constexpr process_state_scheduled yield_immediate
Yield as soon as the scheduler permits.
Definition channel.hpp:109
auto channel(E executor)
Creates a sender/receiver pair on executor (receiver<void> only when T is void).
Definition channel.hpp:1341
std::function< void(stlab::task< void() noexcept >)> executor_t
Type-erased executor: accepts a void() noexcept task.
Definition executor_base.hpp:44
auto execute_at(std::chrono::duration< Rep, Per > duration, executor_t executor) -> executor_t
Returns an executor that posts tasks to executor after duration (immediate if zero).
Definition executor_base.hpp:48
auto unwrap(T &val) -> T &
Unwraps val, forwarding through std::reference_wrapper when present.
Definition functional.hpp:63
auto make_weak_ptr(const std::shared_ptr< T > &x)
Returns a std::weak_ptr<T> sharing ownership with x.
Definition memory.hpp:35
constexpr auto move(T &&t) noexcept -> std::remove_reference_t< T > &&
A standard move implementation but with a compile-time check for const types.
Definition utility.hpp:154
Small memory-related utilities (make_weak_ptr).
Definition reverse.hpp:28
Definition channel.hpp:206
buffer_size(std::size_t b)
Constructs a buffer limit of b elements (0 = unbounded).
Definition channel.hpp:1427
Executor plus callable, produced by executor & f (used by futures and channels).
Definition executor_base.hpp:80
Wraps an executor_t for use with operator&.
Definition executor_base.hpp:74
Definition channel.hpp:196
void await(A &&... args)
Stores arguments until yield runs the bound call.
Definition channel.hpp:1821
auto state() const -> process_state_scheduled
yield_immediate while a call is pending, otherwise await_forever.
Definition channel.hpp:1833
auto yield() -> R
Invokes the function bound by the last await and returns its result.
Definition channel.hpp:1827
Adapts a std::function into a channel process with await / yield / state.
Definition channel.hpp:1805
Definition channel.hpp:175
Definition channel.hpp:185
Merge strategy for merge_channel: round-robin among upstream senders.
Definition channel.hpp:1236
Merge strategy for merge_channel: invoke the process in arbitrary order as values arrive.
Definition channel.hpp:1230
Merge strategy for merge_channel / zip_with: wait for one value from each upstream,...
Definition channel.hpp:1243
Type traits and detection helpers used by the concurrency library.
Tuple algorithms and utilities (including for future combiners).