stlab 2.3.0
Modern, modular C++ algorithms, data structures, and concurrency primitives
Loading...
Searching...
No Matches
channel.hpp File Reference

CSP-style channels (sender/receiver) for reusable processing graphs. More...

#include <stlab/config.hpp>
#include <algorithm>
#include <array>
#include <atomic>
#include <cassert>
#include <chrono>
#include <cstdint>
#include <deque>
#include <exception>
#include <memory>
#include <mutex>
#include <numeric>
#include <optional>
#include <tuple>
#include <type_traits>
#include <utility>
#include <variant>
#include <stlab/concurrency/executor_base.hpp>
#include <stlab/concurrency/traits.hpp>
#include <stlab/concurrency/tuple_algorithm.hpp>
#include <stlab/functional.hpp>
#include <stlab/memory.hpp>

Go to the source code of this file.

Classes

class  stlab::channel_error
 Exception type for channel usage errors (broken channel, process already running, etc.). More...
struct  stlab::identity
struct  stlab::result_of_< R(Args...)>
struct  stlab::first_< T1, T >
struct  stlab::argument_of< R(Arg)>
struct  stlab::unordered_t
 Merge strategy for merge_channel: invoke the process in arbitrary order as values arrive. More...
struct  stlab::round_robin_t
 Merge strategy for merge_channel: round-robin among upstream senders. More...
struct  stlab::zip_with_t
 Merge strategy for merge_channel / zip_with: wait for one value from each upstream, then invoke with the full argument set. More...
struct  stlab::buffer_size
 Per-process input queue capacity for flow control (combine with a process via operator&). More...
class  stlab::receiver< T >
 Receiving end of a CSP channel. More...
class  stlab::sender< T, enable_if_copyable< T > >
 Sending end of a CSP channel (copyable T). More...
class  stlab::sender< T, enable_if_not_copyable< T > >
 Sending end of a CSP channel (move-only T; not copyable). More...
struct  stlab::function_process< R(Args...)>
 Function-object process: binds arguments in await, runs the call in yield. More...

Typedefs

using stlab::process_state_scheduled
 process_state plus a deadline returned from process::state().
template<typename F>
using stlab::result_of_t_
template<typename... T>
using stlab::first_t
template<typename T>
using stlab::argument_of_t

Enumerations

enum class  stlab::process_state : std::uint8_t { await , yield }
 Scheduling hint for a process: wait for input (await) or run to produce output (yield).
enum class  stlab::message_t : std::uint8_t { argument , error }
 Discriminator for tuple elements that carry either a value or an exception pointer.
enum class  stlab::channel_error_codes : std::uint8_t { broken_channel , process_already_running , no_state }
 Error codes reported by channel_error.

Functions

template<typename I, typename N, typename F>
auto stlab::for_each_n (I p, N n, F f) -> I
template<typename T, typename E>
auto stlab::channel (E executor)
 Creates a sender/receiver pair on executor (receiver<void> only when T is void).
template<typename S, typename F, typename... R>
auto stlab::join (S s, F f, R... upstream_receiver)
template<typename S, typename F, typename... R>
auto stlab::merge (S s, F f, R... upstream_receiver)
template<typename M, typename S, typename F, typename... R>
auto stlab::merge_channel (S s, F f, R &&... upstream_receiver)
 Creates a receiver that merges upstream channels using merge strategy M.
template<typename S, typename F, typename... R>
auto stlab::zip_with (S s, F f, const R &... upstream_receiver)
 Creates a receiver that runs f when each upstream has produced one value.
template<typename S, typename... R>
auto stlab::zip (S s, const R &... r)
 Zips upstream receivers in step; yields std::tuple<T...> of their result_types.
auto stlab::operator& (buffer_size bs, const executor &e) -> detail::annotations
auto stlab::operator& (buffer_size bs, executor &&e) -> detail::annotations
auto stlab::operator& (const executor &e, buffer_size bs) -> detail::annotations
auto stlab::operator& (executor &&e, buffer_size bs) -> detail::annotations
template<typename F, std::enable_if_t<!std::is_enum_v< std::remove_reference_t< F > >, int > = 0>
auto stlab::operator& (buffer_size bs, F &&f) -> detail::annotated_process< F >
template<typename F, std::enable_if_t<!std::is_enum_v< std::remove_reference_t< F > >, int > = 0>
auto stlab::operator& (F &&f, buffer_size bs) -> detail::annotated_process< F >
template<typename F>
auto stlab::operator& (executor_task_pair< F > &&etp, buffer_size bs) -> detail::annotated_process< F >
template<typename F>
auto stlab::operator& (buffer_size bs, executor_task_pair< F > &&etp) -> detail::annotated_process< F >
template<typename F, std::enable_if_t<!std::is_enum_v< std::remove_reference_t< F > >, int > = 0>
auto stlab::operator& (detail::annotations &&a, F &&f) -> detail::annotated_process< F >
template<typename F, std::enable_if_t<!std::is_enum_v< std::remove_reference_t< F > >, int > = 0>
auto stlab::operator& (F &&f, detail::annotations &&a) -> detail::annotated_process< F >
template<typename F>
auto stlab::operator& (detail::annotated_process< F > &&a, executor &&e) -> detail::annotated_process< F >
template<typename F>
auto stlab::operator& (detail::annotated_process< F > &&a, buffer_size bs) -> detail::annotated_process< F >

Variables

constexpr process_state_scheduled stlab::await_forever
 Always await the next upstream value (no yield timeout).
constexpr process_state_scheduled stlab::yield_immediate
 Yield as soon as the scheduler permits.

Detailed Description

CSP-style channels (sender/receiver) for reusable processing graphs.

Channels follow the tradition of communicating sequential processes (CSP). They let you build processing graphs that can be run many times, unlike wiring that uses only one-shot futures.

Each channel has sending and receiving ends. A receiver can attach a process that runs when values arrive. Channels support split, zip, zip-with, and merge style composition.

Processes may be:

  • a function object (unary for a single upstream, or n-ary matching upstream arity), or
  • an await-process type with await(...), yield(), and state() const returning process_state_scheduled (process_state plus a chrono::nanoseconds deadline).

Optional await-process members: close() when upstream closes while awaiting; set_error(exception_ptr) when upstream throws. While state() is process_state::await, values arrive via await; when it is process_state::yield, the runtime calls yield() (subject to buffer limits and timers encoded in the scheduled time point).