Describes a process that gets executed whenever a value is passed into the channel

The process is a policy that must follow certain rules; it is not a type that the library provides.

Each channel receiver is associated with a process. The process can either be:

  • an n-ary function object, further here called a function-process.
  • a type that must have properties, that are described below. It is further here called an await-process.

Since a function-process does not need further explication, the following concentrates on await-processes. The process accepts values via await() as long as the state() returns await. If the state changes to yield then the associated receiver will call yield(). For a further details see the state() method.

It is ensured with a static assert that the process implements a state() const method.

Member Functions

Called on an await-process whenever a new value was received from upstream.

Called on an await-process whenever the process state is await_forever and the incoming queue went dry.

Called if either on calling await or yield an exception was thrown

This method must return the current state of the await-process

Called on an await-process when it should yield

Signature of an await-process

Here it is assumed that the process takes T...as argument and its result type is U.

struct process 
{
    void await(T... val);

    U yield();

    void close();                             // optional

    void set_error(std::exception_ptr error); // optional

    process_state_scheduled state() const;
};

Example 1

#include <atomic>
#include <iostream>
#include <thread>

#include <stlab/concurrency/channel.hpp>
#include <stlab/concurrency/default_executor.hpp>

using namespace stlab;

/*
  This process adds all values until a zero is passed as value.
  Then it will yield the result and start over again.
*/

struct adder
{
  int _sum = 0;
  process_state_scheduled _state = await_forever;

  void await(int x) {
    _sum += x;
    if (x == 0) {
      _state = yield_immediate;
    }
  }

  int yield() {
    int result = _sum;
    _sum = 0;
    _state = await_forever;
    return result;
  }

  auto state() const { return _state; }
};


int main() {
  sender<int> send;       
  receiver<int> receiver;
  std::tie(send, receiver) = channel<int>(default_executor);

  std::atomic_bool done{false};

  auto calculator = receiver | 
    adder{} | 
    [&_done = done](int x) { std::cout << x << '\n'; 
      _done = true;
    };
    
  receiver.set_ready();

  send(1);
  send(2);
  send(3);
  send(0);

    // Waiting just for illustrational purpose
    while (!done) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
}