Describes a process that gets executed whenever a value is passed into the channel

The process is a policy that must follow certain rules; it is not a type that the library provides.

Each channel receiver is associated with a process. The process can either be:

  • an n-ary function object, further here called a function-process.
  • a type that must have properties, that are described below. It is further here called an await-process.

Since a function-process does not need further explication, the following concentrates on await-processes. The process accepts values via await() as long as the state() returns await. If the state changes to yield then the associated receiver will call yield(). For a further details see the state() method.

It is ensured with a static assert that the process implements a state() const method.

Member Functions

Called on an await-process whenever a new value was received from upstream.

Called on an await-process whenever the process state is await_forever and the incoming queue went dry.

Called if either on calling await or yield of the upstream process an exception was thrown.

This method must return the current state of the await-process

Called on an await-process when it should yield

Signature of an await-process

Here it is assumed that the process takes T...as argument and its result type is U.

struct process 
{
    void await(T... val);

    U yield();

    void close();                             // optional

    void set_error(std::exception_ptr error); // optional

    process_state_scheduled state() const;
};

Example 1

#include <atomic>
#include <iostream>
#include <thread>

#include <stlab/concurrency/channel.hpp>
#include <stlab/concurrency/default_executor.hpp>

using namespace stlab;

/*
  This process adds all values until a zero is passed as value.
  Then it will yield the result and start over again.
*/

struct adder
{
  int _sum = 0;
  process_state_scheduled _state = await_forever;

  void await(int x) {
    _sum += x;
    if (x == 0) {
      _state = yield_immediate;
    }
  }

  int yield() {
    int result = _sum;
    _sum = 0;
    _state = await_forever;
    return result;
  }

  auto state() const { return _state; }
};


int main() {
  sender<int> send;       
  receiver<int> receiver;
  std::tie(send, receiver) = channel<int>(default_executor);

  std::atomic_bool done{false};

  auto calculator = receiver | 
    adder{} | 
    [&_done = done](int x) { std::cout << x << '\n'; 
      _done = true;
    };
    
  receiver.set_ready();

  send(1);
  send(2);
  send(3);
  send(0);

    // Waiting just for illustrational purpose
    while (!done) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
}

/*
    Result:
      6
*/