9#ifndef STLAB_CONCURRENCY_CHANNEL_HPP
10#define STLAB_CONCURRENCY_CHANNEL_HPP
35#include <stlab/config.hpp>
63STLAB_VERSION_NAMESPACE_BEGIN()
78template <typename, typename =
void>
89enum class message_t : std::uint8_t { argument, error };
106 std::chrono::nanoseconds::max()};
110 std::chrono::nanoseconds::min()};
117 process_already_running = 2,
128 case channel_error_codes::broken_channel:
129 return "broken channel";
131 case channel_error_codes::process_already_running:
132 return "process already running";
134 case channel_error_codes::no_state:
149class channel_error :
public std::logic_error {
155 [[nodiscard]]
auto what()
const noexcept ->
const char*
override {
156 return detail::channel_error_map(_code);
168auto for_each_n(I p, N n, F f) -> I {
169 for (N i = 0; i != n; ++i, ++p) {
176 template <
typename T>
177 auto operator()(T&& x)
const -> T {
178 return std::forward<T>(x);
187template <
typename R,
typename... Args>
195template <
class T1,
class... T>
200template <
typename... T>
201using first_t =
typename first_<T...>::type;
207template <
typename R,
typename Arg>
221template <
typename T,
typename...>
222auto yield_type_(
decltype(&T::yield)) ->
decltype(std::declval<T>().yield());
224template <
typename T,
typename... Arg>
225auto yield_type_(...) ->
decltype(std::declval<T>()(std::declval<Arg>()...));
227template <
typename T,
typename... Arg>
228using yield_type =
decltype(yield_type_<T, Arg...>(0));
235using avoid = std::conditional_t<std::is_same_v<void, T>, avoid_, T>;
239template <
typename F, std::size_t... I,
typename... T>
241 std::tuple<std::variant<T, std::exception_ptr>...>& t,
242 std::index_sequence<I...>) {
243 return std::forward<F>(f)(std::move(std::get<I>(t))...);
246template <
typename F,
typename... Args>
247auto avoid_invoke(F&& f, std::tuple<std::variant<Args, std::exception_ptr>...>& t)
248 -> std::enable_if_t<!std::is_same_v<void, yield_type<unwrap_reference_t<F>, Args...>>,
249 yield_type<unwrap_reference_t<F>, Args...>> {
250 return invoke_(std::forward<F>(f), t, std::make_index_sequence<
sizeof...(Args)>());
253template <
typename F,
typename... Args>
254auto avoid_invoke(F&& f, std::tuple<std::variant<Args, std::exception_ptr>...>& t)
255 -> std::enable_if_t<std::is_same_v<void, yield_type<unwrap_reference_t<F>, Args...>>, avoid_> {
256 invoke_(std::forward<F>(f), t, std::make_index_sequence<
sizeof...(Args)>());
262template <std::
size_t S>
263struct invoke_variant_dispatcher {
264 template <
typename F,
typename T,
typename... Args, std::size_t... I>
265 static auto invoke_(F&& f, T& t, std::index_sequence<I...>) {
266 return std::forward<F>(f)(std::move(std::get<Args>(std::get<I>(t)))...);
269 template <
typename F,
typename T,
typename... Args>
270 static auto invoke(F&& f, T& t) {
271 return invoke_<F, T, Args...>(std::forward<F>(f), t,
272 std::make_index_sequence<
sizeof...(Args)>());
277struct invoke_variant_dispatcher<1> {
278 template <
typename F,
typename T,
typename... Args>
279 static auto invoke(F&& f, [[maybe_unused]] T& t) {
280 using arg1_t = first_t<Args...>;
281 if constexpr (std::is_same_v<arg1_t, void>) {
283 }
else if constexpr (std::is_same_v<arg1_t, detail::avoid_>) {
284 return std::forward<F>(f)();
286 return std::forward<F>(f)(std::move(std::get<arg1_t>(std::get<0>(t))));
291template <
typename F,
typename T,
typename R, std::size_t S,
typename... Args>
292auto avoid_invoke_variant(F&& f, T& t) -> std::enable_if_t<!std::is_same_v<void, R>, R> {
293 return invoke_variant_dispatcher<S>::template invoke<F, T, Args...>(std::forward<F>(f), t);
296template <
typename F,
typename T,
typename R, std::size_t S,
typename... Args>
297auto avoid_invoke_variant(F&& f, T& t) -> std::enable_if_t<std::is_same_v<void, R>, avoid_> {
298 invoke_variant_dispatcher<S>::template invoke<F, T, Args...>(std::forward<F>(f), t);
305using receiver_t =
typename std::remove_reference_t<T>::result_type;
310struct shared_process_receiver {
311 virtual ~shared_process_receiver() =
default;
313 virtual void map(sender<T>) = 0;
314 virtual void clear_to_send() = 0;
315 virtual void add_receiver() = 0;
316 virtual void remove_receiver() = 0;
317 [[nodiscard]]
virtual auto executor() const ->
executor_t = 0;
318 virtual
void set_buffer_size(
size_t) = 0;
319 [[nodiscard]] virtual auto buffer_size() const ->
size_t = 0;
325struct shared_process_sender {
326 virtual ~shared_process_sender() =
default;
328 virtual void send(avoid<T> x) = 0;
329 virtual void send(std::exception_ptr) = 0;
330 virtual void add_sender() = 0;
331 virtual void remove_sender() = 0;
332 [[nodiscard]]
virtual auto free_buffer() const -> std::
size_t = 0;
338using process_close_t = decltype(std::declval<T&>().close());
341constexpr
bool has_process_close_v = is_detected_v<process_close_t, T>;
344auto process_close(std::optional<T>& x)
345 -> std::enable_if_t<has_process_close_v<unwrap_reference_t<T>>> {
346 if (x)
unwrap(*x).close();
350auto process_close(std::optional<T>&)
351 -> std::enable_if_t<!has_process_close_v<unwrap_reference_t<T>>> {}
356using process_state_t =
decltype(std::declval<const T&>().state());
359constexpr bool has_process_state_v = is_detected_v<process_state_t, T>;
362auto get_process_state(
const std::optional<T>& x)
364 return unwrap(*x).state();
368auto get_process_state(
const std::optional<T>&)
376using process_set_error_t =
377 decltype(std::declval<P&>().set_error(std::declval<std::exception_ptr>()));
380constexpr bool has_set_process_error_v = is_detected_v<process_set_error_t, P>;
383auto set_process_error(P& process, std::exception_ptr&& error)
384 -> std::enable_if_t<has_set_process_error_v<unwrap_reference_t<P>>,
void> {
385 unwrap(process).set_error(std::move(error));
389auto set_process_error(P&, std::exception_ptr&&)
390 -> std::enable_if_t<!has_set_process_error_v<unwrap_reference_t<P>>,
void> {}
395using process_yield_t =
decltype(std::declval<T&>().yield());
398constexpr bool has_process_yield_v = is_detected_v<process_yield_t, T>;
402template <
typename T,
typename... Args>
403using process_await_t =
decltype(std::declval<T&>().await(std::declval<Args>()...));
405template <
typename T,
typename... Args>
406constexpr bool has_process_await_v = is_detected_v<process_await_t, T, Args...>;
410template <
typename P,
typename... T, std::size_t... I>
411void await_variant_args_(P& process,
412 std::tuple<std::variant<detail::avoid<T>, std::exception_ptr>...>& args,
413 std::index_sequence<I...>) {
414 unwrap(process).await(std::move(std::get<T>(std::get<I>(args)))...);
417template <
typename P,
typename... T>
418void await_variant_args(P& process,
419 std::tuple<std::variant<detail::avoid<T>, std::exception_ptr>...>& args) {
420 await_variant_args_<P, T...>(process, args, std::make_index_sequence<
sizeof...(T)>());
426auto find_argument_error(T& argument) -> std::optional<std::exception_ptr> {
427 std::optional<std::exception_ptr> result;
429 auto error_index = tuple_find(argument, [](
const auto& c) {
430 return static_cast<message_t>(c.index()) == message_t::error;
433 if (error_index != std::tuple_size_v<T>) {
435 argument, error_index, [](
auto& elem) {
return std::get<std::exception_ptr>(elem); },
436 std::exception_ptr{});
445struct default_queue_strategy {
446 static const std::size_t arguments_size = 1;
447 using value_type = std::tuple<std::variant<avoid<T>, std::exception_ptr>>;
449 std::deque<std::variant<avoid<T>, std::exception_ptr>> _queue;
451 [[nodiscard]]
auto empty() const ->
bool {
return _queue.empty(); }
453 auto front() {
return std::make_tuple(std::move(_queue.front())); }
455 void pop_front() { _queue.pop_front(); }
457 [[nodiscard]]
auto size()
const {
return std::array<std::size_t, 1>{{_queue.size()}}; }
459 template <std::
size_t>
460 [[nodiscard]]
auto queue_size()
const {
461 return _queue.size();
464 template <std::
size_t,
typename U>
466 _queue.emplace_back(std::forward<U>(u));
472template <
typename... T>
473struct zip_with_queue_strategy {
474 static const std::size_t Size =
sizeof...(T);
475 static const std::size_t arguments_size = Size;
476 using value_type = std::tuple<std::variant<T, std::exception_ptr>...>;
477 using queue_size_t = std::array<std::size_t, Size>;
478 using queue_t = std::tuple<std::deque<std::variant<T, std::exception_ptr>>...>;
482 [[nodiscard]]
auto empty() const ->
bool {
483 return tuple_find(_queue, [](
const auto& c) {
return c.empty(); }) != Size;
486 template <std::size_t... I,
typename U>
487 auto front_impl(U& u, std::index_sequence<I...>) {
488 return std::make_tuple(std::move(std::get<I>(u).front())...);
492 assert(!empty() &&
"front on an empty container is a very bad idea!");
493 return front_impl(_queue, std::make_index_sequence<Size>());
497 tuple_for_each(_queue, [](
auto& c) { c.pop_front(); });
500 [[nodiscard]]
auto size()
const {
503 tuple_for_each(_queue, [&i, &result](
const auto& c) { result[i++] = c.size(); });
507 template <std::
size_t I>
508 [[nodiscard]]
auto queue_size()
const {
509 return std::get<I>(_queue).size();
512 template <std::
size_t I,
typename U>
514 std::get<I>(_queue).emplace_back(std::forward<U>(u));
520template <
typename... T>
521struct round_robin_queue_strategy {
522 static const std::size_t Size =
sizeof...(T);
523 static const std::size_t arguments_size = 1;
524 using item_t = std::variant<first_t<T...>, std::exception_ptr>;
525 using value_type = std::tuple<item_t>;
526 using queue_size_t = std::array<std::size_t, Size>;
527 using queue_t = std::tuple<std::deque<std::variant<T, std::exception_ptr>>...>;
528 std::size_t _index{0};
529 std::size_t _popped_index{0};
532 [[nodiscard]]
auto empty() const ->
bool {
533 return get_i(_queue, _index, [](
const auto& c) {
return c.empty(); },
true);
537 assert(!empty() &&
"front on an empty container is a very bad idea!");
538 return std::make_tuple(get_i(_queue, _index, [](
auto& c) {
return c.front(); }, item_t{}));
542 void_i(_queue, _index, [](
auto& c) { c.pop_front(); });
543 _popped_index = _index;
545 if (_index == Size) _index = 0;
548 [[nodiscard]]
auto size()
const {
551 tuple_for_each(_queue, [&i, &result,
this](
const auto& c) {
552 if (i == _popped_index)
553 result[i] = c.size();
555 result[i] = std::numeric_limits<std::size_t>::max();
561 template <std::
size_t I>
562 [[nodiscard]]
auto queue_size()
const {
563 return std::get<I>(_queue).size();
566 template <std::
size_t I,
typename U>
568 std::get<I>(_queue).emplace_back(std::forward<U>(u));
574template <
typename... T>
575struct unordered_queue_strategy {
576 static const std::size_t Size =
sizeof...(T);
577 static const std::size_t arguments_size = 1;
578 using item_t = std::variant<first_t<T...>, std::exception_ptr>;
579 using value_type = std::tuple<item_t>;
580 using queue_size_t = std::array<std::size_t, Size>;
581 using queue_t = std::tuple<std::deque<std::variant<T, std::exception_ptr>>...>;
582 std::size_t _index{0};
583 std::size_t _popped_index{0};
586 [[nodiscard]]
auto empty() const ->
bool {
587 return tuple_find(_queue, [](
const auto& c) {
return !c.empty(); }) == Size;
591 assert(!empty() &&
"front on an empty container is a very bad idea!");
592 _index = tuple_find(_queue, [](
const auto& c) {
return !c.empty(); });
593 return std::make_tuple(
594 get_i(_queue, _index, [](
auto& c) {
return std::move(c.front()); }, item_t{}));
598 void_i(_queue, _index, [](
auto& c) { c.pop_front(); });
599 _popped_index = _index;
602 [[nodiscard]]
auto size()
const {
605 tuple_for_each(_queue, [&i, &result,
this](
const auto& c) {
606 if (i == _popped_index)
607 result[i] = c.size();
609 result[i] = std::numeric_limits<std::size_t>::max();
616 template <std::
size_t I>
617 [[nodiscard]]
auto queue_size()
const {
618 return std::get<I>(_queue).size();
621 template <std::
size_t I,
typename U>
623 std::get<I>(_queue).emplace_back(std::forward<U>(u));
629template <
typename Q,
typename T,
typename R,
typename... Args>
630struct shared_process;
632template <
typename Q,
typename T,
typename R,
typename Arg, std::size_t I,
typename... Args>
633struct shared_process_sender_indexed :
public shared_process_sender<Arg> {
634 shared_process<Q, T, R, Args...>& _shared_process;
636 explicit shared_process_sender_indexed(shared_process<Q, T, R, Args...>& sp) :
637 _shared_process(sp) {}
639 void add_sender()
override { ++_shared_process._sender_count; }
641 void remove_sender()
override {
642 assert(_shared_process._sender_count > 0);
643 if (--_shared_process._sender_count == 0) {
646 std::unique_lock<std::mutex> lock(_shared_process._process_mutex);
647 _shared_process._process_close_queue =
true;
648 do_run = !_shared_process._receiver_count && !_shared_process._process_running;
649 _shared_process._process_running = _shared_process._process_running || do_run;
651 if (do_run) _shared_process.run();
655 template <
typename U>
656 void enqueue(U&& u) {
659 std::unique_lock<std::mutex> lock(_shared_process._process_mutex);
660 _shared_process._queue.template append<I>(
662 do_run = !_shared_process._receiver_count && (!_shared_process._process_running ||
663 _shared_process._timeout_function_active);
665 _shared_process._process_running = _shared_process._process_running || do_run;
667 if (do_run) _shared_process.run();
670 void send(std::exception_ptr error)
override { enqueue(std::move(error)); }
672 void send(avoid<Arg> arg)
override { enqueue(std::move(arg)); }
674 [[nodiscard]]
auto free_buffer() const -> std::
size_t override {
675 std::unique_lock<std::mutex> lock(_shared_process._process_mutex);
676 return _shared_process._process_buffer_size == 0 ?
677 std::numeric_limits<std::size_t>::max() :
678 (_shared_process._process_buffer_size -
679 _shared_process._queue.template queue_size<I>());
685template <
typename Q,
typename T,
typename R,
typename U,
typename... Args>
686struct shared_process_sender_helper;
688template <
typename Q,
typename T,
typename R, std::size_t... I,
typename... Args>
689struct shared_process_sender_helper<Q, T, R, std::index_sequence<I...>, Args...>
690 : shared_process_sender_indexed<Q, T, R, Args, I, Args...>... {
691 explicit shared_process_sender_helper(shared_process<Q, T, R, Args...>& sp) :
692 shared_process_sender_indexed<Q, T, R, Args, I, Args...>(sp)... {}
697template <
typename R,
typename Enabled =
void>
701struct downstream<R, std::enable_if_t<std::is_copy_constructible_v<R> || std::is_same_v<R, void>>> {
702 std::deque<sender<R>> _data;
704 template <
typename F>
705 void append_receiver(F&& f) {
706 _data.emplace_back(std::forward<F>(f));
709 void clear() { _data.clear(); }
711 [[nodiscard]]
auto size() const -> std::
size_t {
return _data.size(); }
713 template <
typename... Args>
714 void send(std::size_t n, Args... args) {
715 stlab::for_each_n(begin(_data), n, [&](
const auto& e) { e(args...); });
718 [[nodiscard]]
auto minimum_free_buffer() const -> std::
size_t {
719 if (size() == 0)
return 0;
721 return std::accumulate(_data.cbegin(), _data.cend(),
722 std::numeric_limits<std::size_t>::max(),
723 [](
auto val,
const auto& e) {
724 return std::min(val, e.free_buffer() ? *e.free_buffer() : val);
731 std::enable_if_t<!std::is_copy_constructible_v<R> && !std::is_same_v<R, void>>> {
732 std::optional<sender<R>> _data;
734 template <
typename F>
735 void append_receiver(F&& f) {
736 _data = std::forward<F>(f);
739 void clear() { _data = std::nullopt; }
741 [[nodiscard]]
auto size() const -> std::
size_t {
return 1; }
743 template <
typename... Args>
744 void send(std::size_t, Args&&... args) {
745 if (_data) (*_data)(std::forward<Args>(args)...);
748 [[nodiscard]]
auto minimum_free_buffer() const -> std::
size_t {
749 if (_data && (*_data).free_buffer())
return *(*_data).free_buffer();
756template <
typename Q,
typename T,
typename R,
typename... Args>
758 : shared_process_receiver<R>,
759 shared_process_sender_helper<Q, T, R, std::make_index_sequence<sizeof...(Args)>, Args...>,
760 std::enable_shared_from_this<shared_process<Q, T, R, Args...>> {
761 static_assert((has_process_yield_v<unwrap_reference_t<T>> &&
762 has_process_state_v<unwrap_reference_t<T>>) ||
763 (!has_process_yield_v<unwrap_reference_t<T>> &&
764 !has_process_state_v<unwrap_reference_t<T>>),
765 "Processes that use .yield() must have .state() const");
773 using queue_strategy_t = Q;
775 using lock_t = std::unique_lock<std::mutex>;
777 std::mutex _downstream_mutex;
778 downstream<result_t> _downstream;
779 queue_strategy_t _queue;
782 std::optional<process_t> _process;
784 std::mutex _process_mutex;
786 bool _process_running =
false;
787 std::atomic_size_t _process_suspend_count{0};
788 bool _process_close_queue =
false;
790 bool _process_final =
false;
792 std::mutex _timeout_function_control;
793 std::atomic_bool _timeout_function_active{
false};
795 std::atomic_size_t _sender_count{0};
796 std::atomic_size_t _receiver_count;
798 std::atomic_size_t _process_buffer_size{1};
800 const std::tuple<std::shared_ptr<shared_process_receiver<Args>>...> _upstream;
802 template <
typename E,
typename F>
803 shared_process(E&& e, F&& f) :
804 shared_process_sender_helper<Q, T, R, std::make_index_sequence<sizeof...(Args)>, Args...>(
806 _executor(std::forward<E>(e)), _process(std::forward<F>(f)) {
807 _sender_count = std::is_same_v<result_t, void> ? 0 : 1;
808 _receiver_count = !std::is_same_v<result_t, void>;
811 template <
typename E,
typename F,
typename... U>
812 shared_process(E&& e, F&& f, U&&... u) :
813 shared_process_sender_helper<Q, T, R, std::make_index_sequence<sizeof...(Args)>, Args...>(
815 _executor(std::forward<E>(e)), _process(std::forward<F>(f)),
816 _upstream(std::forward<U>(u)...) {
817 _sender_count =
sizeof...(Args);
818 _receiver_count = !std::is_same_v<result_t, void>;
821 void add_receiver()
override {
822 if (std::is_same_v<result_t, void>)
return;
826 void remove_receiver()
override {
827 if (std::is_same_v<result_t, void>)
return;
833 assert(_receiver_count > 0);
834 if (--_receiver_count == 0) {
837 std::unique_lock<std::mutex> lock(_process_mutex);
838 do_run = ((!_queue.empty() || std::is_same_v<first_t<Args...>,
void>) ||
839 _process_close_queue) &&
841 _process_running = _process_running || do_run;
847 auto executor() const -> executor_t
override {
return _executor; }
853 std::unique_lock<std::mutex> lock(_process_mutex);
854 do_run = !_queue.empty() || _process_close_queue;
855 _process_running = do_run;
856 do_final = _process_final;
860 assert(!(do_run && do_final) &&
"ERROR (sparent) : can't run and close at the same time.");
864 std::unique_lock<std::mutex> lock(_downstream_mutex);
866 _process = std::nullopt;
870 void clear_to_send()
override {
872 std::unique_lock<std::mutex> lock(_process_mutex);
873 if (_process_final) {
880 const auto ps = get_process_state(_process);
881 std::unique_lock<std::mutex> lock(_process_mutex);
882 if (_process_suspend_count > 0) --_process_suspend_count;
884 if (!_process_suspend_count) {
885 if (ps.first == process_state::yield || !_queue.empty() || _process_close_queue) {
888 _process_running =
false;
897 auto pop_from_queue() {
898 std::optional<typename Q::value_type> message;
899 std::array<bool,
sizeof...(Args)> do_cts = {{
false}};
900 bool do_close =
false;
902 std::unique_lock<std::mutex> lock(_process_mutex);
903 if (_queue.empty()) {
904 std::swap(do_close, _process_close_queue);
905 _process_final = do_close;
907 message = std::move(_queue.front());
909 auto queue_size = _queue.size();
910 for (
auto index = 0u; index < queue_size.size(); ++index)
911 do_cts[index] = queue_size[index] <= (_process_buffer_size - 1);
913 return std::make_tuple(std::move(message), do_cts, do_close);
916 auto dequeue() ->
bool {
917 using queue_t =
typename Q::value_type;
918 std::optional<queue_t> message;
919 std::array<bool,
sizeof...(Args)> do_cts;
920 bool do_close =
false;
922 std::tie(message, do_cts, do_close) = pop_from_queue();
925 tuple_for_each(_upstream, [do_cts, &i](
auto& u) {
926 if (do_cts[i] && u) u->clear_to_send();
931 auto error = find_argument_error(*message);
933 if (has_set_process_error_v<T>)
934 set_process_error(*_process, std::move(*error));
938 await_variant_args<process_t, Args...>(*_process, *message);
943 if (do_close) process_close(_process);
945 return bool(message);
953 template <
typename U>
954 auto step() -> std::enable_if_t<has_process_yield_v<unwrap_reference_t<U>> &&
955 !has_process_await_v<unwrap_reference_t<T>, Args...>> {
958 lock_t lock(_timeout_function_control, std::try_to_lock);
963 _timeout_function_active =
false;
970 if (get_process_state(_process).first == process_state::await)
return;
973 auto tmp = get_process_state(_process);
974 const auto& state = tmp.first;
975 const auto& duration = tmp.second;
981 if (state == process_state::yield) {
982 if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
983 std::chrono::nanoseconds::min())
984 broadcast(
unwrap(*_process).yield());
987 [_weak_this =
make_weak_ptr(this->shared_from_this())]()
noexcept {
988 auto _this = _weak_this.lock();
990 _this->try_broadcast();
1001 else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) ==
1002 std::chrono::nanoseconds::max()) {
1004 }
else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
1005 std::chrono::nanoseconds::min()) {
1006 broadcast(
unwrap(*_process).yield());
1009 _timeout_function_active =
true;
1011 [_weak_this =
make_weak_ptr(this->shared_from_this())]()
noexcept {
1012 auto _this = _weak_this.lock();
1018 lock_t lock(_this->_timeout_function_control, std::try_to_lock);
1019 if (!lock)
continue;
1022 if (get_process_state(_this->_process).first != process_state::yield) {
1023 _this->try_broadcast();
1024 _this->_timeout_function_active =
false;
1031 broadcast(std::move(std::current_exception()));
1035 template <
typename U>
1036 auto step() -> std::enable_if_t<has_process_yield_v<unwrap_reference_t<U>> &&
1037 has_process_await_v<unwrap_reference_t<T>, Args...>> {
1040 lock_t lock(_timeout_function_control, std::try_to_lock);
1045 _timeout_function_active =
false;
1052 while (get_process_state(_process).first == process_state::await) {
1053 if (!dequeue())
break;
1057 auto tmp = get_process_state(_process);
1058 const auto& state = tmp.first;
1059 const auto& duration = tmp.second;
1065 if (state == process_state::yield) {
1066 if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
1067 std::chrono::nanoseconds::min())
1068 broadcast(
unwrap(*_process).yield());
1071 [_weak_this =
make_weak_ptr(this->shared_from_this())]()
noexcept {
1072 auto _this = _weak_this.lock();
1074 _this->try_broadcast();
1085 else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) ==
1086 std::chrono::nanoseconds::max()) {
1088 }
else if (std::chrono::duration_cast<std::chrono::nanoseconds>(duration) <=
1089 std::chrono::nanoseconds::min()) {
1090 broadcast(
unwrap(*_process).yield());
1093 _timeout_function_active =
true;
1095 [_weak_this =
make_weak_ptr(this->shared_from_this())]()
noexcept {
1096 auto _this = _weak_this.lock();
1102 lock_t lock(_this->_timeout_function_control, std::try_to_lock);
1103 if (!lock)
continue;
1106 if (get_process_state(_this->_process).first != process_state::yield) {
1107 _this->try_broadcast();
1108 _this->_timeout_function_active =
false;
1115 broadcast(std::move(std::current_exception()));
1119 void try_broadcast() {
1121 if (_process) broadcast(
unwrap(*_process).yield());
1123 broadcast(std::move(std::current_exception()));
1134 template <
typename U>
1135 auto step() noexcept -> std::enable_if_t<!has_process_yield_v<unwrap_reference_t<U>>> {
1136 using queue_t =
typename Q::value_type;
1137 std::optional<queue_t> message;
1138 std::array<bool,
sizeof...(Args)> do_cts;
1139 bool do_close =
false;
1141 std::tie(message, do_cts, do_close) = pop_from_queue();
1144 tuple_for_each(_upstream, [do_cts, &i](
auto& u) {
1145 if (do_cts[i] && u) u->clear_to_send();
1150 auto error = find_argument_error(*message);
1156 avoid_invoke_variant<process_t, queue_t, R, Q::arguments_size, Args...>(
1157 std::move(*_process), *message));
1159 broadcast(std::move(std::current_exception()));
1167 _executor([_p =
make_weak_ptr(this->shared_from_this())]()
noexcept {
1169 if (p) p->template step<T>();
1173 template <
typename... A>
1174 void broadcast(A&&... args) {
1180 bool suspend_process;
1182 std::unique_lock<std::mutex> lock(_downstream_mutex);
1183 n = _downstream.size();
1184 suspend_process = _downstream.minimum_free_buffer() <= 1;
1188 std::unique_lock<std::mutex> lock(_process_mutex);
1189 if (suspend_process) {
1194 _process_suspend_count = n + 1;
1196 _process_suspend_count = 1;
1203 _downstream.send(n, std::forward<A>(args)...);
1209 void map(sender<result_t> f)
override {
1215 std::unique_lock<std::mutex> lock(_downstream_mutex);
1216 _downstream.append_receiver(std::move(f));
1220 void set_buffer_size(
size_t buffer_size)
override { _process_buffer_size = buffer_size; }
1222 auto buffer_size() const ->
size_t override {
return _process_buffer_size; }
1231 template <
typename... R>
1232 using strategy_type = detail::unordered_queue_strategy<detail::receiver_t<R>...>;
1237 template <
typename... R>
1238 using strategy_type = detail::round_robin_queue_strategy<detail::receiver_t<R>...>;
1244 template <
typename... R>
1245 using strategy_type = detail::zip_with_queue_strategy<detail::receiver_t<R>...>;
1254struct channel_combiner {
1255 template <
typename P,
typename URP,
typename... R, std::size_t... I>
1256 static void map_as_sender_(P& p, URP& upstream_receiver_processes, std::index_sequence<I...>) {
1257 using shared_process_t =
typename P::element_type;
1258 using queue_t =
typename shared_process_t::queue_strategy_t;
1259 using process_t =
typename shared_process_t::process_t;
1260 using result_t =
typename shared_process_t::result_t;
1262 (void)std::initializer_list<int>{
1263 (std::get<I>(upstream_receiver_processes)
1264 ->map(
sender<R>(std::dynamic_pointer_cast<shared_process_sender<R>>(
1265 std::dynamic_pointer_cast<
1266 shared_process_sender_indexed<queue_t, process_t, result_t, R, I, R...>>(
1271 template <
typename P,
typename URP,
typename... R>
1272 static void map_as_sender(P& p, URP& upstream_receiver_processes) {
1273 map_as_sender_<P, URP, R...>(p, upstream_receiver_processes,
1274 std::make_index_sequence<
sizeof...(R)>());
1277 template <
typename M,
typename F,
typename... R>
1278 struct merge_result {
1279 using type = yield_type<unwrap_reference_t<F>, receiver_t<first_t<R...>>>;
1282 template <
typename F,
typename... R>
1283 struct merge_result<zip_with_t, F, R...> {
1284 using type = yield_type<unwrap_reference_t<F>, receiver_t<R>...>;
1287 template <
typename M,
typename S,
typename F,
typename... R>
1288 static auto merge_helper(S&& s, F&& f, R&&... upstream_receiver) {
1289 using result_t =
typename merge_result<M, F, R...>::type;
1291 auto upstream_receiver_processes = std::make_tuple(upstream_receiver._p...);
1292 auto merge_process =
1293 std::make_shared<shared_process<
typename M::template strategy_type<R...>, F, result_t,
1295 std::forward<S>(s), std::forward<F>(f), upstream_receiver._p...);
1297 map_as_sender<
decltype(merge_process),
decltype(upstream_receiver_processes),
1298 receiver_t<R>...>(merge_process, upstream_receiver_processes);
1300 return receiver<result_t>(std::move(merge_process));
1305 template <
typename... T>
1306 auto operator()(T&&... t)
const {
1307 return std::make_tuple(std::forward<T>(t)...);
1311template <
typename E,
typename T>
1313 static auto create(E executor) {
1314 auto p = std::make_shared<
1315 detail::shared_process<detail::default_queue_strategy<T>, identity, T, T>>(
1316 std::move(executor), identity());
1318 return std::make_pair(sender<T>(p), receiver<T>(p));
1322template <
typename E>
1323struct channel_<E, void> {
1324 static auto create(E executor) {
1325 auto p = std::make_shared<
1326 detail::shared_process<detail::default_queue_strategy<void>, identity, void,
void>>(
1327 std::move(executor), identity());
1329 return receiver<void>(p);
1340template <
typename T,
typename E>
1342 return detail::channel_<E, T>::create(std::move(
executor));
1353template <
typename S,
typename F,
typename... R>
1354[[deprecated(
"Use zip_with")]]
auto join(S s, F f, R... upstream_receiver) {
1355 return detail::channel_combiner::merge_helper<
zip_with_t, S, F, R...>(
1356 std::move(s), std::move(f), std::forward<R>(upstream_receiver)...);
1366template <
typename S,
typename F,
typename... R>
1367[[deprecated(
"Use merge_channel<unordered_t>")]]
auto merge(S s, F f, R... upstream_receiver) {
1368 return detail::channel_combiner::merge_helper<
unordered_t, S, F, R...>(
1369 std::move(s), std::move(f), std::move(upstream_receiver)...);
1380template <
typename M,
typename S,
typename F,
typename... R>
1382 return detail::channel_combiner::merge_helper<M>(std::move(s), std::move(f),
1383 std::forward<R>(upstream_receiver)...);
1393template <
typename S,
typename F,
typename... R>
1394auto zip_with(S s, F f,
const R&... upstream_receiver) {
1395 return detail::channel_combiner::merge_helper<zip_with_t>(std::move(s), std::move(f),
1396 upstream_receiver...);
1406template <
typename S,
typename... R>
1408 return zip_with(std::move(s), detail::zip_helper{}, r...);
1435 std::optional<executor_t> _executor;
1436 std::optional<std::size_t> _buffer_size;
1438 annotations(executor_t e, std::size_t bs) : _executor(std::move(e)), _buffer_size(bs) {}
1439 explicit annotations(executor_t e) : _executor(std::
move(e)) {}
1440 explicit annotations(std::size_t bs) : _buffer_size(bs) {}
1443template <
typename F>
1444struct annotated_process {
1445 using process_type = F;
1448 annotations _annotations;
1450 explicit annotated_process(executor_task_pair<F>&& etp) :
1451 _f(std::
move(etp._f)), _annotations(std::
move(etp._executor)) {}
1453 annotated_process(F f,
const executor& e) : _f(std::
move(f)), _annotations(e._executor) {}
1454 annotated_process(F f, buffer_size bs) : _f(std::
move(f)), _annotations(bs._value) {}
1456 annotated_process(F f, executor&& e) : _f(std::
move(f)), _annotations(std::
move(e._executor)) {}
1457 annotated_process(F f, annotations&& a) : _f(std::
move(f)), _annotations(std::
move(a)) {}
1458 annotated_process(executor_task_pair<F>&& etp, buffer_size bs) :
1459 _f(std::
move(etp._f)), _annotations(std::
move(etp._executor), bs) {}
1462template <
typename B,
typename E>
1463auto combine_bs_executor(B&& b, E&& e) -> detail::annotations {
1464 detail::annotations result{b._value};
1465 result._executor = std::forward<E>(e)._executor;
1474inline auto operator&(buffer_size bs,
const executor& e) -> detail::annotations {
1475 return detail::combine_bs_executor(bs, e);
1478inline auto operator&(buffer_size bs, executor&& e) -> detail::annotations {
1479 return detail::combine_bs_executor(bs, std::move(e));
1482inline auto operator&(
const executor& e, buffer_size bs) -> detail::annotations {
1483 return detail::combine_bs_executor(bs, e);
1486inline auto operator&(executor&& e, buffer_size bs) -> detail::annotations {
1487 return detail::combine_bs_executor(bs, std::move(e));
1490template <
typename F, std::enable_if_t<!std::is_enum_v<std::remove_reference_t<F>>,
int> = 0>
1491auto operator&(buffer_size bs, F&& f) -> detail::annotated_process<F> {
1492 return detail::annotated_process<F>(std::forward<F>(f), bs);
1495template <
typename F, std::enable_if_t<!std::is_enum_v<std::remove_reference_t<F>>,
int> = 0>
1496auto operator&(F&& f, buffer_size bs) -> detail::annotated_process<F> {
1497 return detail::annotated_process<F>(std::forward<F>(f), bs);
1500template <
typename F>
1501auto operator&(executor_task_pair<F>&& etp, buffer_size bs) -> detail::annotated_process<F> {
1502 return detail::annotated_process<F>{std::move(etp), bs};
1505template <
typename F>
1506auto operator&(buffer_size bs, executor_task_pair<F>&& etp) -> detail::annotated_process<F> {
1507 return detail::annotated_process<F>{std::move(etp), bs};
1510template <
typename F, std::enable_if_t<!std::is_enum_v<std::remove_reference_t<F>>,
int> = 0>
1511auto operator&(detail::annotations&& a, F&& f) -> detail::annotated_process<F> {
1512 return detail::annotated_process<F>{std::forward<F>(f), std::move(a)};
1515template <
typename F, std::enable_if_t<!std::is_enum_v<std::remove_reference_t<F>>,
int> = 0>
1516auto operator&(F&& f, detail::annotations&& a) -> detail::annotated_process<F> {
1517 return detail::annotated_process<F>{std::forward<F>(f), std::move(a)};
1520template <
typename F>
1521auto operator&(detail::annotated_process<F>&& a, executor&& e) -> detail::annotated_process<F> {
1522 auto result{std::move(a)};
1523 a._annotations._executor = std::move(e._executor);
1527template <
typename F>
1528auto operator&(detail::annotated_process<F>&& a, buffer_size bs) -> detail::annotated_process<F> {
1529 auto result{std::move(a)};
1530 a._annotations._buffer_size = bs._value;
1553template <
typename T>
1554class STLAB_NODISCARD() receiver {
1555 using ptr_t = std::shared_ptr<detail::shared_process_receiver<T>>;
1558 bool _ready =
false;
1560 template <
typename U,
typename>
1561 friend class sender;
1563 template <
typename U>
1564 friend class receiver;
1566 template <
typename U,
typename V>
1567 friend struct detail::channel_;
1569 friend struct detail::channel_combiner;
1571 explicit receiver(ptr_t p) : _p(std::move(p)) {}
1574 using result_type = T;
1576 receiver() =
default;
1579 if (!_ready && _p) _p->remove_receiver();
1582 receiver(
const receiver& x) : _p(x._p), _ready(x._ready) {
1583 if (_p) _p->add_receiver();
1586 receiver(receiver&&)
noexcept =
default;
1588 auto operator=(
const receiver& x) -> receiver& {
1590 assert(
this != &x &&
"self-assignment is not allowed");
1591 return *
this = receiver(x);
1594 auto operator=(receiver&& x)
noexcept -> receiver& =
default;
1598 if (!_ready && _p) _p->remove_receiver();
1602 void swap(
receiver& x)
noexcept { std::swap(*
this, x); }
1604 inline friend void swap(receiver& x, receiver& y)
noexcept { x.swap(y); }
1605 inline friend auto operator==(
const receiver& x,
const receiver& y) ->
bool {
1606 return x._p == y._p;
1608 inline friend auto operator!=(
const receiver& x,
const receiver& y) ->
bool {
1613 [[nodiscard]]
auto ready() const ->
bool {
return _ready; }
1621 template <
typename F>
1623 if (!_p)
throw channel_error(channel_error_codes::broken_channel);
1625 if (_ready)
throw channel_error(channel_error_codes::process_already_running);
1627 auto p = std::make_shared<detail::shared_process<
1628 detail::default_queue_strategy<T>, F, detail::yield_type<unwrap_reference_t<F>, T>, T>>(
1629 _p->executor(), std::forward<F>(f), _p);
1630 _p->map(sender<T>(p));
1631 return receiver<detail::yield_type<unwrap_reference_t<F>, T>>(std::move(p));
1635 template <
typename F>
1637 if (!_p)
throw channel_error(channel_error_codes::broken_channel);
1639 if (_ready)
throw channel_error(channel_error_codes::process_already_running);
1641 auto executor = ap._annotations._executor.value_or(_p->executor());
1642 auto p = std::make_shared<detail::shared_process<
1643 detail::default_queue_strategy<T>, F, detail::yield_type<unwrap_reference_t<F>, T>, T>>(
1646 _p->map(sender<T>(p));
1648 if (ap._annotations._buffer_size) p->set_buffer_size(*ap._annotations._buffer_size);
1650 return receiver<detail::yield_type<unwrap_reference_t<F>, T>>(std::move(p));
1654 template <
typename F>
1656 return operator|(detail::annotated_process<F>(std::move(etp)));
1662 [_send = std::move(send)](
auto&& x) { _send(std::forward<
decltype(x)>(x)); });
1669template <
typename T>
1670class sender<T, enable_if_copyable<T>> {
1671 using ptr_t = std::weak_ptr<detail::shared_process_sender<T>>;
1674 template <
typename U>
1675 friend class receiver;
1677 template <
typename U,
typename V>
1678 friend struct detail::channel_;
1680 friend struct detail::channel_combiner;
1682 sender(ptr_t p) : _p(std::move(p)) {}
1688 if (
auto p = _p.lock()) p->remove_sender();
1691 sender(
const sender& x) : _p(x._p) {
1692 if (
auto p = _p.lock()) p->add_sender();
1695 sender(sender&&)
noexcept =
default;
1698 auto operator=(
const sender& x) -> sender& {
1700 assert(
this != &x &&
"self-assignment is not allowed");
1701 return *
this = sender(x);
1704 auto operator=(sender&&)
noexcept -> sender& =
default;
1706 void swap(sender& x)
noexcept { std::swap(*
this, x); }
1708 inline friend void swap(sender& x, sender& y)
noexcept { x.swap(y); }
1710 inline friend auto operator==(
const sender& x,
const sender& y) ->
bool {
1711 return x._p.lock() == y._p.lock();
1714 inline friend auto operator!=(
const sender& x,
const sender& y) ->
bool {
return !(x == y); };
1719 if (p) p->remove_sender();
1724 template <
typename... A>
1727 if (p) p->send(std::forward<A>(args)...);
1731 [[nodiscard]]
auto free_buffer() const -> std::optional<std::
size_t> {
1732 std::optional<std::size_t> result;
1734 if (p) result = p->free_buffer();
1740template <
typename T>
1741class sender<T, enable_if_not_copyable<T>> {
1742 using ptr_t = std::weak_ptr<detail::shared_process_sender<T>>;
1745 template <
typename U>
1746 friend class receiver;
1748 template <
typename U,
typename V>
1749 friend struct detail::channel_;
1751 friend struct detail::channel_combiner;
1753 sender(ptr_t p) : _p(std::move(p)) {}
1760 if (p) p->remove_sender();
1763 sender(
const sender& x) =
delete;
1764 sender(sender&&)
noexcept =
default;
1765 auto operator=(
const sender& x) -> sender& =
delete;
1766 auto operator=(sender&&)
noexcept -> sender& =
default;
1768 void swap(sender& x)
noexcept { std::swap(*
this, x); }
1770 inline friend void swap(sender& x, sender& y)
noexcept { x.swap(y); }
1772 inline friend auto operator==(
const sender& x,
const sender& y) ->
bool {
1773 return x._p.lock() == y._p.lock();
1776 inline friend auto operator!=(
const sender& x,
const sender& y) ->
bool {
return !(x == y); };
1781 if (p) p->remove_sender();
1786 template <
typename... A>
1789 if (p) p->send(std::forward<A>(args)...);
1793 [[nodiscard]]
auto free_buffer() const -> std::optional<std::
size_t> {
1794 std::optional<std::size_t> result;
1796 if (p) result = p->free_buffer();
1804template <
typename F>
1808template <
typename R,
typename... Args>
1809struct function_process<R(Args...)> {
1810 std::function<R(Args...)> _f;
1811 std::function<R()> _bound;
1814 using signature = R(Args...);
1816 template <
typename F>
1817 function_process(F&& f) : _f(std::forward<F>(f)) {}
1820 template <
typename... A>
1822 _bound = std::bind(_f, std::forward<A>(args)...);
1842STLAB_VERSION_NAMESPACE_END()
Exception type for channel usage errors (broken channel, process already running, etc....
Definition channel.hpp:149
Receiving end of a CSP channel.
Definition channel.hpp:1554
auto operator|(executor_task_pair< F > etp)
Attaches etp (process plus executor from operator&).
Definition channel.hpp:1655
auto ready() const -> bool
Returns true if set_ready() was called on this receiver.
Definition channel.hpp:1613
auto operator|(F &&f) const
Attaches f and returns a new receiver (inherits executor unless overridden).
Definition channel.hpp:1622
auto operator|(detail::annotated_process< F > ap)
Attaches an annotated process (buffer size and/or executor from operator&).
Definition channel.hpp:1636
void set_ready()
Marks this receiver ready so values may flow through the graph.
Definition channel.hpp:1597
auto operator|(sender< T > send)
Forwards values into downstream send.
Definition channel.hpp:1660
void operator()(A &&... args) const
Sends a value (or exception) into the channel.
Definition channel.hpp:1725
void close()
Closes this side of the channel (releases the weak link to the shared process).
Definition channel.hpp:1717
auto free_buffer() const -> std::optional< std::size_t >
Remaining capacity in the downstream process buffer, if known.
Definition channel.hpp:1731
void operator()(A &&... args) const
Sends a value (or exception) into the channel.
Definition channel.hpp:1787
void close()
Closes this side of the channel (releases the weak link to the shared process).
Definition channel.hpp:1779
auto free_buffer() const -> std::optional< std::size_t >
Remaining capacity in the downstream process buffer, if known.
Definition channel.hpp:1793
Definition channel.hpp:79
Executor type aliases and scheduling helpers.
Reference unwrapping and related functional helpers.
auto await(future< T > &&x) -> T
Synchronously wait for the result x. If x resolves as an exception, the exception is rethrown....
Definition await.hpp:72
auto merge(S s, F f, R... upstream_receiver)
Definition channel.hpp:1367
message_t
Discriminator for tuple elements that carry either a value or an exception pointer.
Definition channel.hpp:89
channel_error_codes
Error codes reported by channel_error.
Definition channel.hpp:115
constexpr process_state_scheduled await_forever
Always await the next upstream value (no yield timeout).
Definition channel.hpp:105
std::pair< process_state, std::chrono::nanoseconds > process_state_scheduled
process_state plus a deadline returned from process::state().
Definition channel.hpp:102
auto zip(S s, const R &... r)
Zips upstream receivers in step; yields std::tuple<T...> of their result_types.
Definition channel.hpp:1407
auto merge_channel(S s, F f, R &&... upstream_receiver)
Creates a receiver that merges upstream channels using merge strategy M.
Definition channel.hpp:1381
auto join(S s, F f, R... upstream_receiver)
Definition channel.hpp:1354
process_state
Scheduling hint for a process: wait for input (await) or run to produce output (yield).
Definition channel.hpp:86
auto zip_with(S s, F f, const R &... upstream_receiver)
Creates a receiver that runs f when each upstream has produced one value.
Definition channel.hpp:1394
constexpr process_state_scheduled yield_immediate
Yield as soon as the scheduler permits.
Definition channel.hpp:109
auto channel(E executor)
Creates a sender/receiver pair on executor (receiver<void> only when T is void).
Definition channel.hpp:1341
std::function< void(stlab::task< void() noexcept >)> executor_t
Type-erased executor: accepts a void() noexcept task.
Definition executor_base.hpp:44
auto execute_at(std::chrono::duration< Rep, Per > duration, executor_t executor) -> executor_t
Returns an executor that posts tasks to executor after duration (immediate if zero).
Definition executor_base.hpp:48
auto unwrap(T &val) -> T &
Unwraps val, forwarding through std::reference_wrapper when present.
Definition functional.hpp:63
auto make_weak_ptr(const std::shared_ptr< T > &x)
Returns a std::weak_ptr<T> sharing ownership with x.
Definition memory.hpp:35
constexpr auto move(T &&t) noexcept -> std::remove_reference_t< T > &&
A standard move implementation but with a compile-time check for const types.
Definition utility.hpp:154
Small memory-related utilities (make_weak_ptr).
Definition reverse.hpp:28
Definition channel.hpp:206
buffer_size(std::size_t b)
Constructs a buffer limit of b elements (0 = unbounded).
Definition channel.hpp:1427
Executor plus callable, produced by executor & f (used by futures and channels).
Definition executor_base.hpp:80
Wraps an executor_t for use with operator&.
Definition executor_base.hpp:74
Definition channel.hpp:196
void await(A &&... args)
Stores arguments until yield runs the bound call.
Definition channel.hpp:1821
auto state() const -> process_state_scheduled
yield_immediate while a call is pending, otherwise await_forever.
Definition channel.hpp:1833
auto yield() -> R
Invokes the function bound by the last await and returns its result.
Definition channel.hpp:1827
Adapts a std::function into a channel process with await / yield / state.
Definition channel.hpp:1805
Definition channel.hpp:175
Definition channel.hpp:185
Merge strategy for merge_channel: round-robin among upstream senders.
Definition channel.hpp:1236
Merge strategy for merge_channel: invoke the process in arbitrary order as values arrive.
Definition channel.hpp:1230
Merge strategy for merge_channel / zip_with: wait for one value from each upstream,...
Definition channel.hpp:1243
Type traits and detection helpers used by the concurrency library.
Tuple algorithms and utilities (including for future combiners).