Input/Output Stream

In the previous chapter, we covered the basics to create a write event and a read event. Most of the time, an application needs to perform buffered data operations in addition to responding to events. Read events are normally passive while write events are proactive. When we want to read data, the control flow looks like:

  1. Register a read callback.
  2. Wait for the device to become readable.
  3. Read as much as data from the kernel buffer to our data buffer.

On the other hand, the control flow to write data has the following pattern:

  1. Write data to our write buffer.
  2. Wait for the device to become writable.
  3. Flush the data from our data buffer to the kernel buffer in the write callback.
Since these two design patterns are very common, DtCraft provides two classes, InputStream and OutputStream for them. DtCraft currently suppors only stream-oriented protocols such as pipe and TCP socket. We may add support for datagram-oriented protocols like UDP in the future. The source code can be found in include/dtc/ipc/ipc.hpp and src/ipc/ipc.cpp.

Input Stream

InputStream is a class derived from ReadEvent and has an input buffer InputStreamBuffer in charge of read-related operations. The structure of InputStream is very similar to ReadEvent. When the associated device becomes readable, the reactor dispatches it to a worker thread to invoke the callback.

There are many ways to read data from InputStream. First, you could get its raw file descriptor and stick with POSIX read. Second, you could use read operations defined in its public member isbuf. Third, you could call its thread-safe operator () to deserialize data in binary format. We will discuss our built-in deserialization library in later section.

Example: Create an Input Stream Event

The example below creates an input stream event and synchronize its input buffer with the underlying device in its callback.

auto is = reactor.insert<InputStream>(
  std::move(device),
  [] (dtc::InputStream& istream) {
    try {
      istream.isbuf.sync();
    }
    catch(const std::exception& e) {
      std::<< e.what() << '\n';
      std::exit(EXIT_FAILURE);
    }
  }
);

Note: The input stream event is level-trigger. The reactor keeps invoking its callback until all data is drained out from the kernel buffer to the user-space buffer.

Output Stream

OutputStream is a class derived from WriteEvent and has a output buffer OutputStreamBuffer in charge of write-related operations. Unlike WriteEvent which requires users to explicitly call thaw to bring a write event back for polling, OutputStreamBuffer automatically notifies the reactor when the associated buffer has data. In other words, when you perform write operations through OutputStream, it will unfreeze itself for polling until the buffer becomes empty.

There are many ways to write data through OutputStream. First, you could get its raw file descriptor and stick with POSIX write. Second, you could use write operations defined in its public member osbuf. Third, you could call its thread-safe operator () to serialize data in binary format. We will discuss our built-in serialization library in later section.

Example: Create a Output Stream Event

The example below creates a output stream event and sends a test message. The callback synchronizes its output buffer with the underlying device.

auto os = reactor.insert<OutputStream>(
  std::move(device),
  [] (dtc::OutputStream& ostream) {
    try {
      ostream.osbuf.sync();
    }
    catch(const std::exception& e) {
      std::<< e.what() << '\n';
      std::exit(EXIT_FAILURE);
    }
  }
);

for(int i=0; i<4; ++i) {
  std::thread t([os](){
    (*os)(std::string{"hello world from thread "} + std::to_string(i));
  });
}

Note: The reactor keeps invoking the callback of a output stream event when both of its output buffer has data pending and the associated device becomes writable, until all data are flushed.

Stream Buffer

Stream buffer implements a queue of bytes, adding data to the end and removing data from the front. It provides a variety of read and write operations, and works seamlessly with our built-in serialization and deserialization library. DtCraft provides two types of stream buffers, OutputStreamBuffer and InputStreamBuffer. Each stream buffer has a mutex to protect the underlying queue from race condition. The source code of stream buffer can be found in include/dtc/ipc/streambuf.hpp and src/ipc/streambuf.cpp.

Input Stream Buffer

An input stream buffer contains a linear character array to buffer input operations. It can synchronize data with a given device and perform read operations on the buffer. By default, it is safe to access an input stream buffer from multiple threads. The code below shows a list of common methods provided by the input stream buffer.

class InputStreamBuffer {
  std::streamsize in_avail() const;
  std::streamsize sync();
  std::streamsize read(void* array, std::streamsize n);
  std::streamsize copy(void* array, std::streamsize n) const;
  std::streamsize drop(std::streamsize n);
  std::string_view string_view() const;
  void device(Device* ptr);
  Device* device() const;
};

These methods are self-explanatory by their names.

The input stream buffer dynamically increases its buffer size during sync to accommondate data from the device.

Example: Create an Input Stream Buffer

The example below creates an input stream buffer and associates it with a device.

InputStreamBuffer isbuf (device.get());
Example: Read all Device Data to an Input Stream Buffer

The example below tries to drain all data from a device to an input stream buffer.

while(1) {
  try {
    if(auto ret = isbuf.sync(); ret == -1) {  // EAGAIN or EWOULDBLOCK
      std::cout << "Device has no more data to read.\n";
      break;
    }
  }
  catch (const std::exception& e) {  // Error occurs.
    std::cout << e.what() << '\n';
  }
}
Example: Inspect an Input Stream Buffer

The example below demonstrates how to copy and inspect data from an input stream buffer.

auto view = isbuf.string_view();
std::vector<char> vec(isbuf.in_avail());
auto n = isbuf.copy(vec.data(), vec.size());
assert(memcmp(view.data(), vec.data(), sizeof(char)*n) == 0);

Note 1: The input stream buffer shares NO ownership of the device. It is user's responsibility to verify the lifetime of the device to prevent synchronization error.

Note 2: The string view of an input stream buffer is a snapshot. Subsequent read operations can invalidate its pointer value.

Output Stream Buffer

A output stream buffer contains a linear character array to buffer output operations. It defines methods to write data to the buffer and synchronize the buffer with the associated device. By default, it is safe to access a output stream buffer from multiple threads. The code below shows a list of common methods provided by a output stream buffer.

class OutputStreamBuffer {
  std::streamsize out_avail() const;
  std::streamsize flush();
  std::streamsize sync();
  std::streamsize write(const void* array, std::streamsize n);
  std::streamsize copy(void* array, std::streamsize n) const;
  std::string_view string_view() const;
  void device(Device* ptr);
  Device* device() const;
};

These methods are self-explanatory by their names.

The output stream buffer dynamically doubles its size during write to ensure enough space accommodate written data.

Example: Create a Output Stream Buffer

The example below creates a output stream buffer and associates it with a device.

OutputStreamBuffer osbuf (device.get());
Example: Flush a Output Stream Buffer

The example below writes all data in a output stream buffer via its underlying write function.

try {
  if(auto ret = osbuf.flush(); ret == -1) {  // EAGAIN or EWOULDBLOCK
    std::cout << "Device is busy. Try later.\n";
  }
  else assert(osbuf.out_avail() == 0);
}
catch (const std::exception& e) {  // Error occurs.
  std::cout << e.what() << '\n';
}
Example: Write Data to a Output Stream Buffer

The example below writes a character sequence to a output stream buffer.

std::string data {"a character sequence"};
auto n = osbuf.write(data.c_str(), data.size());
assert(osbuf.out_avail() == n && n == data.size() && osbuf.string_view() == data);

Note 1: The output stream buffer shares NO ownership of the device. It is user's responsibility to verify the lifetime of the device to prevent synchronization error.

Note 2: The string view of a output stream buffer is a snapshot. Subsequent write operations can invalidate its pointer value.

Archiver

DtCraft has a built-in library, archiver, to serialize data and deserialize data from an input or a output stream buffer. Our archiver directly works on top of our stream buffer objects, requiring no extra copy in user space. It supports most of C++ standard containers and classes, and can be extended to handle your own types. The source code can be found in include/dtc/archive/.

POD and STL Supports

By default, our archiver supports the following types:

Binary Input/Output Archiver

We use BinaryOutputArchiver to serialize data and BinaryInputArchiver to deserialize data in binary format. Binary archiver is designed to produce compact data representation at bit level and is not human readable. It operates on either OutputStreamBuffer or InputStreamBuffer. We use operator () to perform serialization and deserialization on a parameter pack. The return is a value of type std::streamsize telling how many bytes are transmitted.

Example: Archive Data in Binary Format

The example below shows how to serialize and deserialize data in binary format.

dtc::OutputStreamBuffer osbuf;
dtc::InputStreamBuffer isbuf;
dtc::BinaryOutputArchiver oar(osbuf);
dtc::BinaryInputArchiver iar(isbuf);

int o1 {1}, i1;
std::string o2 {"test"}, i2;
std::variant<int, double, std::vector<char>> o3 {1.2}, i3;

std::streamsize num_obytes = oar(o1, o2, o3);
std::streamsize num_ibytes = iar(i1, i2, i3);

assert(num_obytes == num_ibytes && o1 == i1 && o2 == i2 && o3 == i3);

Archive a Custom Type

To make your own type work with our archiver, add a template method archive in the public scope to your class. This lets our archiver know which data members to serialize and deserialize.

Example: Add Archive Method to Your Classes

The example below shows how to adapt your own type to our archiver interface.

struct MyType {
  int a;
  double b;
  std::tuple<std::string, char> c;

  template <typename ArchiverT> 
  auto archive(ArchiverT& ar) {
    return ar(a, b, c);
  }
};

Note: If you do not apply the archiver operator to multiple data at one time, you must return the sum of individual returns.

Where to Go from Here?

Congratulations! You have studied one core component in DtCraft. In the next chapter, we will go over the stream graph and key graph components to get your distributed applications up and running.