High-performance cluster computing engine

Event-driven Programming

DtCraft applies event-driven programming to enable efficient IO multiplexing. Event-driven programming is a programming paradigm where events (input, output, timeout) determines the program flow. Instead of using famous 3rd-party library such as Libevent and Libev, we developed our own reactor class to dispatch events. The reactor is the core of DtCraft. All kernel components, master, agent, and executor are derived from the reactor. In fact, people also use DtCraft to develop event-driven applications with modern C++.

Example: Timout Event

The example below shows how to create a timeout event that expires after one second and a periodic event that expires at every 100 milliseconds in DtCraft.

// create a reactor with 2 threads
dtc::Reactor reactor(2);

// create a timeout event of 1 s
auto ev1 = reactor.insert<dtc::TimeoutEvent>(
  std::chrono::seconds(1),
  [] (dtc::Event& e) {
    std::cout << "1 s passed" <<'\n';
  }
);

// create a periodic event running every 100 ms
auto ev2 = reactor.insert<dtc::TimeoutEvent>(
  std::chrono::milliseconds(100),
  true,  // true to include the moment of this call
  [] (dtc::Event& e) {
    std::cout << "100 ms passed" <<'\n';
  }
);

// dispatch events
reactor.dispatch();

Example: I/O Event

The example below shows how to create a read event that operates on a domain socket pair in DtCraft.

// create a reactor with 4 threads
dtc::Reactor reactor(4);

// create a domain socket device pair (std::shared_ptr)
auto [rdev, wdev] = dtc::make_socket_pair();

// create a read event
auto rev = reactor.insert<dtc::ReadEvent>(
  std::move(rdev),
  [] (dtc::Event& ev) {
    std::cout << "read event on fd=" << ev.device()->fd() << '\n';
  }
);

// write one byte to the socket
auto one {'a'};
wdev->write(&one, sizeof(one));

// dispatch events
reactor.dispatch();

Reactor

Reactor is the main gateway to create events, remove events, and dispatch your program to an event-driven loop. Our reactor is written in modern C++ and has many distinct features compared to existing libraries:

The code below shows a list of commonly used methods of the reactor. All methods are thread-safe.

class Reactor {
  const std::thread::id owner {std::this_thread::get_id()};

  size_t num_events() const;
  size_t num_workers() const;

  template <typename C>
  void break_loop_on(C&&);

  auto break_loop();

  void threshold(size_t);

  template <typename T, typename... ArgsT>
  auto insert(ArgsT&&... args);

  template <typename C>
  auto promise(C&&);

  template <typename C>
  auto async(C&&);

  void dispatch();                                        
  bool is_owner() const;
  auto remove(auto&&... events);
  auto freeze(auto&&... events);
  auto thaw(auto&&... events);
};

Here is a brief explanation for each method:

The source code of the reactor is placed in include/dtc/event and src/event, respectively.

Create a Reactor

Before using any event operations, you need to create one or more Reactor objects. Each reactor object holds a set of events and polls to determine which events are active. When an event becomes active, the reactor dispatches it to a thread executing its callback.

Example: Create a Reactor with Four Threads

The example below creates a reactor with four threads, one master and three workers.

dtc::Reactor reactor(4);

Note: When an event is in the context of its callback, the reactor will not poll it in the subsequent loops. Polling will continue when the execution thread completes its callback.

Request a Promise/Async

Reactor uses a thread pool to schedule event operations on threads to maximize the performance. We use single-producer multiple-consumer model to deliver task-based parallelism. The producer is the owner of the reactor, often the master thread creating the reactor in your application, and consumers are threads to execute event callbacks. You can requests task to run on the master thread through promise and worker threads through async. Both methods return std::future objects for you to retrieve the task results. Our reactor imposes strong and clear onwership between the master thread and worker threads. This largely facilitates the design of an event-driven application with multiple threads.

Example: Execute a Task on the Master Thread

The example below demonstrates how to execute a task on the master thread (reactor owner).

std::future future = reactor.promise([] () {
  std::cout << "This task only runs on the master thread" << '\n';
  return 1;
});
Example: Execute a Task on a Worker (or Master) Thread

The example below demonstrates how to execute a task on a worker thread. The master thread will execute the task if the reactor has only one thread.

std::future future = reactor.async([] () {
  std::cout << "This task runs on a worker thread" << '\n';
});

Note: Requesting a promise from the reactor can be useful when tasks need to synchronize at a certain point to avoid data race.

Event

Event is the basic unit of operation. Currently, there are four types of events, TIMEOUT, PERIODIC, READ, and WRITE. Each event is a unique entity and is exclusive to each other. You can derive your event class inheriting one of the four event bases TimeoutEvent, PeriodicEvent, ReadEvent, and WriteEvent.

// Base class to create a timeout event.
// d: duration in std::chrono
// c: callable object (callback)
class TimeoutEvent : public Event {
 template <typename D, typename C>
 TimeoutEvent(D&& d, C&& c);
};

// Base class to create a periodic event.
// d: duration in std::chrono
// from_now: start from now or next cycle (now + duration)
// c: callable object (callback)
class PeriodicEvent : public Event {
  template <typename D, typename C>
  PeriodicEvent(D&& d, const bool from_now, C&& c);
};

// Base class to create a read event.
// d: device pointer
// c: callable object (callback)
class ReadEvent : public Event {
  template <typename C>
  ReadEvent(std::shared_ptr<Device> d, C&& c);
};

// Base class to create a write event.
// d: device pointer
// c: callable object (callback)
class WriteEvent : public Event {
  template <typename C>
  WriteEvent(std::shared_ptr<Device> d, C&& c);
};

DtCraft represents every event in std::shared_ptr to ensure multiple threads can perform event operations correctly. It is completely safe to, for example, ask a thread to remove an event from the reactor while another thread is running the event's callback and vice versa. The event class inherits std::enable_shared_from_this and allows an event holder to safely generate additional std::shared_ptr instances that share ownership of the event.

The bottom-most event callback is of the form std::function<dtc::Event::Signal(dtc::Event&)>. The callback takes an event argument by which you can access the underlying handle or create another shared ownership of the event. The return value is a handy feature to signal reactor an action after the callback is finished.

Example: Define an Event Callback

The example below shows how to define an event callback and use the return value to interact with the reactor.

auto callback = [a=rand()%2] (dtc::Event& event) {
  auto another_owner = event.shared_from_this(); 
  assert(another_owner.count() >= 2);
  return a == 1 ? dtc::Event::REMOVE : dtc::Event::DEFAULT;
};

Note: The thread in the context of an event callback holds a shared ownership of that event.

Example: Derive a Timeout Event

The example below declares a timeout class that prints a pre-defined message after one second.

struct MyTimeoutEvent : public dtc::TimeoutEvent {
  const std::string message {"hello"};
  MyTimeoutEvent() : TimeoutEvent(std::chrono::seconds(1), [this](dtc::Event& e){
    std::cout << message << '\n';
  }) {}
};

Create an Event

DtCraft reactor does not allow users to directly create an event object. We use factory design pattern to instantiate any event types. To create an event, you simply need to call the method Reactor::insert. It takes an event type T and arguments to forward to the constructor.

class Reactor {
  template <typename T, typename... ArgsT>
  auto insert(ArgsT&&...);
};
The return is a std::future object of a shared pointer to the event (std::future<std::shared_ptr<T>>). It is important to be aware of our reactor runs in a multi-threaded environment by default. A successful event creation takes two steps: (1) the event is allocated from the memory and (2) the event is inserted to the reactor backend for polling. Calling get on the future object blocks until the two steps complete.
Example: Create a Periodic Event

The example below creates a periodic event that expires every two seconds.

auto event = reactor.insert<dtc::PeriodicEvent>(
  std::chrono::seconds(2),
  false,
  [] (dtc::Event& e) {
    std::cout << "timeout" << '\n';
  }
).get();

assert(event.count() == 2);  // two owners, one reactor, one user

Note: The reactor retains shared ownership of every event it created.

Remove an Event

Removing events from a reactor is accomplished through the method Reactor::remove. Similary, this method returns a std::future object of a std::tuple of bools. Each bool in the tuple indicates whether or not the corresponding event is removed from the reactor.

class Reactor {
  auto remove(auto&&... events);
};

Calling get on the returned future object guarantees the event is removed from the reactor and will no longer enter the subsequent event loops.

Example: Remove Events from the Reactor

The example below removes two events from the reactor.

auto [b1, b2] = reactor.remove(event1, event2).get();
if(b1) std::cout << "event b1 is removed" << '\n';
if(b2) std::cout << "event b2 is removed" << '\n';

Note: The reference count of the event decreases by one when it is removed from the reactor.

Run an Event Loop

Once you have a reactor with events registered, the next step is to start the event loop and let the reactor to dispatch events to the associated handlers.

class Reactor {
  void dispatch();
};

By default, the method Reactor::dispatch enters a while loop until there are no more events to poll or reaching pre-defined stopping conditions. This means your program will persist in memory where the reactor owner, often the master thread, repeatedly checks if any events has triggered. Once this happens, it marks all triggered events as active and dispatches them to worker threads to run the callbacks.

Here is a sketch for how the event loop works to help with understanding. Only the reactor owner can run the event loop.

while (1) {
  1. Execute callable objects in the promise queue.
  2. if (no registered events or stopping criteria reached)
       break;
  3. Poll I/O events (select or epoll).
  4. Synchronize the clock with std::chrono::steady_clock::now().
  5. Poll timeout events.
}
Example: Start an Event Loop

The example below starts the event loop of a reactor.

reactor.dispatch();

Stop an Event Loop

By default, the reactor stops when no events are pollable. However, you can stop an active event loop at any time with any threads depending on your program flow. There are three ways to stop an event loop, break_loop, threshold, and break_loop_on. Calling break_loop lifts the stopping flag to true and will subsequently stop the reactor. The function threshold tells the reactor to stop when the number of registered events is under a given value. The function break_loop_on stops the event loop when a given condition is reached.

class Reactor {
  std::future<bool> break_loop();
 
  void threshold(size_t);

  template<typename C>
  void break_loop_on(C&&);
};
Example: Stop an Event Loop on a Given Threshold

The example below tells the reactor to stop the event loop when the number of registered events is fewer than or equal to two.

reactor.threshold(2);
Example: Customize Stopping Criteria for an Event Loop

The example below stops the event loop after 1000 iterations.

reactor.break_loop_on([cnt=0] (dtc::Reactor& reactor) mutable {
  return (cnt++ > 1000);
});

Note: Breaking the event loop might leave requested promises unfinished. It is user's responsibility to ensure critical promises are processed before stopping the reactor.

I/O Device

Another key difference of our Reactor to other libraries is the process of I/O event. We do not apply raw file descriptor to our events as it can be dangerous and confusing in multi-threaded I/O. Instead, each I/O is wrapped in a Device object managed by std::shared_ptr. By default, a DtCraft device is non-blocking and close-on-exec, and supports a set of basic read and write operations. When a device vanishes, its destructor closes the associated file descriptor. The source code of device is in src/device.cpp and include/dtc/device.hpp.

Create a Device

We have provided functions to create devices for socket, pipe, and notifier. Upon successful creation, you get a shared pointer to the device (std::shared_ptr<Device>) that manages a file descriptor. Built-in device support can be found in include/dtc/ipc/.

Example: Create a Pipe Device

The example below creates a device pair for pipe, one read and one write.

auto [rend, wend] = dtc::make_pipe();
Example: Create a Domain Socket Pair

The example below creates a device pair for domain socket, capable of read/write at both ends.

auto [rend, wend] = dtc::make_socket_pair();
Example: Create a Socket Server

The example below creates a socket device binding to a given port "9990".

auto socket = dtc::make_socket_server("9990");
Example: Create a Socket Client

The example below creates a socket device connecting to the remote server at "server-host-name" through port "9990".

auto socket = dtc::make_socket_client("server-host-name", "9990");
Example: Get the File Descriptor Associated with a Device

The example below acquires the raw file descriptor managed by a device.

auto fd = device->fd();

Read/Write through a Device

Each DtCraft device provides read and write methods to read and write data through a device. These two methods are nothing but safe wrappers of POSIX read and write to perform one-time synchronization between user-space data and the underlying file descriptors.

class Device {
  std::streamsize read(void*, std::streamsize) const;
  std::streamsize write(const void*, std::streamsize) const;
};

On success, the number of bytes read or written is returned. If the return is -1, a non-blocking condition incurs. On error, exception is thrown to indicate system error except EAGAIN and EWOULDBLOCK.

Example: Write through a Device

The example below performs a write synchronization through a device.

try {
  std::string data {"test"};
  if(device->write(data.c_str(), data.size()) == -1) {
    std::cout << "Device is busy. Try later.\n";
  }
}
catch (const std::system_error& se) {
  std::cout << se.what() << '\n';
}
Example: Read through a Device

The example below performs a read synchronization through a device.

try {
  char data[1024];
  if(device->read(data, 1024) == -1) {
    std::cout << "Device is busy. Try later.\n";
  }
}
catch (const std::system_error& se) {
  std::cout << se.what() << '\n';
}

Note: DtCraft does not treat the non-blocking condition as an error. It is users' responsibility to check the return value to see if the synchronization requires another try.

Create a Read Event from a Device

A read event becomes active when the associated device is ready to read. The reactor dispatches an active read event to a worker executing its callback. In DtCraft, read events are level-triggered. The reactor continues to dispatch a read event until its kernel buffer is emptied out.

Example: Create a Read Event on a Given Device

The example below creates a read event on a given device and reads a character from the device. On error, the callback returns a removal signal to remove this event from the reactor.

reactor.insert<dtc::ReadEvent>(
  std::move(device),
  [] (dtc::Event& e) {
    std::cout << "Read event on fd=" << e.device()->fd() << '\n';
    char c;
    try {
      e.device()->read(&c, sizeof(c));
    }
    catch(...) {
      std::cout << "Error occurs on read\n" <<
      return dtc::Event::REMOVE;
    }
    return dtc::Event::DEFAULT;
  }
);

Note: A read device can only be managed by one read event at one time. Creating two or more read events on a read device results in undefined behavior.

Create a Write Event from a Device

A write event becomes active when the associated device is ready to write. However, unlike read events, write events are not autonomously triggered by the reactor because it is decided by users when to write. By default, write events are frozen and users must explicitly call Reactor::thaw to unfreeze write events. A write event will enter the I/O polling only when thawed. We delegate this control to users to ensure write events are triggered when needed, for example, user-space buffer has data to write, rather than proactively invoking the callback.

Example: Create a Write Event on a Given Device

The example below creates a write event on a given device and freezes it for polling to write a character.

auto event = reactor.insert<dtc::WriteEvent>(
  std::move(device),
  [] (dtc::Event& e) {
    std::cout << "Write event on fd=" << e.device()->fd() << '\n';
    char c {'a'};
    e.device()->write(&c, sizeof(c));
  }
).get();

reactor.thaw(event);

Note 1: A write device can only be managed by one write event at one time. Creating two or more write events on a write device results in undefined behavior.

Note 2: When a worker thread leaves the callback of a write event, it freezes the write event again. It is users' responsibility to check if another thaw is required to flush all necessary data.

Where to Go from Here?

Congratulations on learning event-driven programming using DtCraft reactor! In the next chapter, we will cover more DtCraft features and functionalities to process data in an event-driven environment. You will get in-depth overview of: