Input/Output Stream
In the previous chapter, we covered the basics to create a write event and a read event. Most of the time, an application needs to perform buffered data operations in addition to responding to events. Read events are normally passive while write events are proactive. When we want to read data, the control flow looks like:
- Register a read callback.
- Wait for the device to become readable.
- Read as much as data from the kernel buffer to our data buffer.
On the other hand, the control flow to write data has the following pattern:
- Write data to our write buffer.
- Wait for the device to become writable.
- Flush the data from our data buffer to the kernel buffer in the write callback.
InputStream
and OutputStream
for them.
DtCraft currently suppors only stream-oriented protocols such as pipe and TCP socket.
We may add support for datagram-oriented protocols like UDP in the future.
The source code can be found in include/dtc/ipc/ipc.hpp
and
src/ipc/ipc.cpp
.
Input Stream
InputStream
is a class derived from ReadEvent
and has an input buffer
InputStreamBuffer
in charge of read-related operations.
The structure of InputStream
is very similar to ReadEvent
.
When the associated device becomes readable, the reactor dispatches it to a worker thread to invoke
the callback.
There are many ways to read data from InputStream
.
First, you could get its raw file descriptor and stick with POSIX
read.
Second, you could use read operations defined in its public member isbuf
.
Third, you could call its thread-safe operator ()
to deserialize data in binary format.
We will discuss our built-in deserialization library in later section.
Example: Create an Input Stream Event
The example below creates an input stream event and synchronize its input buffer with the underlying device in its callback.
auto is = reactor.insert<InputStream>(
std::move(device),
[] (dtc::InputStream& istream) {
try {
istream.isbuf.sync();
}
catch(const std::exception& e) {
std::<< e.what() << '\n';
std::exit(EXIT_FAILURE);
}
}
);
Note: The input stream event is level-trigger. The reactor keeps invoking its callback until all data is drained out from the kernel buffer to the user-space buffer.
Output Stream
OutputStream
is a class derived from WriteEvent
and has a output buffer
OutputStreamBuffer
in charge of write-related operations.
Unlike WriteEvent
which requires users to explicitly call thaw
to
bring a write event back for polling,
OutputStreamBuffer
automatically notifies the reactor when the associated buffer has data.
In other words, when you perform write operations through OutputStream
,
it will unfreeze itself for polling until the buffer becomes empty.
There are many ways to write data through OutputStream
.
First, you could get its raw file descriptor and stick with POSIX
write.
Second, you could use write operations defined in its public member osbuf
.
Third, you could call its thread-safe operator ()
to serialize data in binary format.
We will discuss our built-in serialization library in later section.
Example: Create a Output Stream Event
The example below creates a output stream event and sends a test message. The callback synchronizes its output buffer with the underlying device.
auto os = reactor.insert<OutputStream>(
std::move(device),
[] (dtc::OutputStream& ostream) {
try {
ostream.osbuf.sync();
}
catch(const std::exception& e) {
std::<< e.what() << '\n';
std::exit(EXIT_FAILURE);
}
}
);
for(int i=0; i<4; ++i) {
std::thread t([os](){
(*os)(std::string{"hello world from thread "} + std::to_string(i));
});
}
Note: The reactor keeps invoking the callback of a output stream event when both of its output buffer has data pending and the associated device becomes writable, until all data are flushed.
Stream Buffer
Stream buffer implements a queue of bytes, adding data to the end and removing data from the front.
It provides a variety of read and write operations, and works seamlessly with our built-in serialization
and deserialization library.
DtCraft provides two types of stream buffers, OutputStreamBuffer
and
InputStreamBuffer
.
Each stream buffer has a mutex to protect the underlying queue from race condition.
The source code of stream buffer can be found in include/dtc/ipc/streambuf.hpp
and src/ipc/streambuf.cpp
.
Input Stream Buffer
An input stream buffer contains a linear character array to buffer input operations. It can synchronize data with a given device and perform read operations on the buffer. By default, it is safe to access an input stream buffer from multiple threads. The code below shows a list of common methods provided by the input stream buffer.
class InputStreamBuffer {
std::streamsize in_avail() const;
std::streamsize sync();
std::streamsize read(void* array, std::streamsize n);
std::streamsize copy(void* array, std::streamsize n) const;
std::streamsize drop(std::streamsize n);
std::string_view string_view() const;
void device(Device* ptr);
Device* device() const;
};
These methods are self-explanatory by their names.
in_avail
returns the number of available characters in the buffer.sync
performs an one-time synchronization with the associated device, returning the number of bytes synchronized or throwing any exception byDevice::read
. If no device is associated with the input stream buffer,sync
returns -1.read
extracts up ton
bytes from the input buffer toarray
, returning the actual number of bytes extracted.copy
copies up ton
bytes from the input buffer toarray
, returning the actual number of bytes copied.drop
removes up ton
bytes from the input buffer by simply advancing the buffer pointer.string_view
returns a constant character description over the input buffer.device
lets you associate a new device to or get the present device of an input stream buffer.
The input stream buffer dynamically increases its buffer size during sync
to
accommondate data from the device.
Example: Create an Input Stream Buffer
The example below creates an input stream buffer and associates it with a device.
InputStreamBuffer isbuf (device.get());
Example: Read all Device Data to an Input Stream Buffer
The example below tries to drain all data from a device to an input stream buffer.
while(1) {
try {
if(auto ret = isbuf.sync(); ret == -1) { // EAGAIN or EWOULDBLOCK
std::cout << "Device has no more data to read.\n";
break;
}
}
catch (const std::exception& e) { // Error occurs.
std::cout << e.what() << '\n';
}
}
Example: Inspect an Input Stream Buffer
The example below demonstrates how to copy and inspect data from an input stream buffer.
auto view = isbuf.string_view();
std::vector<char> vec(isbuf.in_avail());
auto n = isbuf.copy(vec.data(), vec.size());
assert(memcmp(view.data(), vec.data(), sizeof(char)*n) == 0);
Note 1: The input stream buffer shares NO ownership of the device. It is user's responsibility to verify the lifetime of the device to prevent synchronization error.
Note 2: The string view of an input stream buffer is a snapshot. Subsequent read operations can invalidate its pointer value.
Output Stream Buffer
A output stream buffer contains a linear character array to buffer output operations. It defines methods to write data to the buffer and synchronize the buffer with the associated device. By default, it is safe to access a output stream buffer from multiple threads. The code below shows a list of common methods provided by a output stream buffer.
class OutputStreamBuffer {
std::streamsize out_avail() const;
std::streamsize flush();
std::streamsize sync();
std::streamsize write(const void* array, std::streamsize n);
std::streamsize copy(void* array, std::streamsize n) const;
std::string_view string_view() const;
void device(Device* ptr);
Device* device() const;
};
These methods are self-explanatory by their names.
out_avail
returns the number of available characters in the buffer.flush
writes all buffered data to the associated device, returning the number of bytes flushed or throwing any exception byDevice::write
. If no device is associated with the output stream buffer,flush
returns -1.sync
performs an one-time synchronization with the associated device, returning the number of bytes synchronized or throwing any exception byDevice::write
. If no device is associated with the output stream buffer,sync
returns -1.write
puts up ton
bytes from thearray
to the output buffer, returning the actual number of bytes inserted.copy
copies up ton
bytes from the output buffer toarray
, returning the actual number of bytes copied.string_view
returns a constant character description over the output buffer.device
lets you associate a new device to or get the present device of a output stream buffer.
The output stream buffer dynamically doubles its size during write
to ensure enough space
accommodate written data.
Example: Create a Output Stream Buffer
The example below creates a output stream buffer and associates it with a device.
OutputStreamBuffer osbuf (device.get());
Example: Flush a Output Stream Buffer
The example below writes all data in a output stream buffer via its underlying write function.
try {
if(auto ret = osbuf.flush(); ret == -1) { // EAGAIN or EWOULDBLOCK
std::cout << "Device is busy. Try later.\n";
}
else assert(osbuf.out_avail() == 0);
}
catch (const std::exception& e) { // Error occurs.
std::cout << e.what() << '\n';
}
Example: Write Data to a Output Stream Buffer
The example below writes a character sequence to a output stream buffer.
std::string data {"a character sequence"};
auto n = osbuf.write(data.c_str(), data.size());
assert(osbuf.out_avail() == n && n == data.size() && osbuf.string_view() == data);
Note 1: The output stream buffer shares NO ownership of the device. It is user's responsibility to verify the lifetime of the device to prevent synchronization error.
Note 2: The string view of a output stream buffer is a snapshot. Subsequent write operations can invalidate its pointer value.
Archiver
DtCraft has a built-in library, archiver, to serialize data and deserialize data
from an input or a output stream buffer.
Our archiver directly works on top of our stream buffer objects, requiring no extra copy in user space.
It supports most of C++ standard containers and classes, and can be extended to handle your own types.
The source code can be found in include/dtc/archive/
.
POD and STL Supports
By default, our archiver supports the following types:
- Arithmetic POD (bool, char, int, float, double, etc).
- Enumeration type.
- C++ standard sequence containers,
array
,vector
,list
,deque
, andforward_list
. - C++ standard associative containers,
set
,multiset
,map
, andmultimap
. - C++ standard unordered associative containers,
unordered_set
andunordered_map
. - C++ standard chrono classes,
duration
andtime_point
. - C++ standard optional class,
optional
. - C++ standard variant class,
variant
. - C++ standard tuple class,
tuple
. - Eigen matrix type.
Binary Input/Output Archiver
We use BinaryOutputArchiver
to serialize data and
BinaryInputArchiver
to deserialize data in binary format.
Binary archiver is designed to produce compact data representation at bit level
and is not human readable. It operates on either OutputStreamBuffer
or InputStreamBuffer
. We use operator ()
to perform
serialization and deserialization on a parameter pack.
The return is a value of type std::streamsize
telling how many bytes are transmitted.
Example: Archive Data in Binary Format
The example below shows how to serialize and deserialize data in binary format.
dtc::OutputStreamBuffer osbuf;
dtc::InputStreamBuffer isbuf;
dtc::BinaryOutputArchiver oar(osbuf);
dtc::BinaryInputArchiver iar(isbuf);
int o1 {1}, i1;
std::string o2 {"test"}, i2;
std::variant<int, double, std::vector<char>> o3 {1.2}, i3;
std::streamsize num_obytes = oar(o1, o2, o3);
std::streamsize num_ibytes = iar(i1, i2, i3);
assert(num_obytes == num_ibytes && o1 == i1 && o2 == i2 && o3 == i3);
Archive a Custom Type
To make your own type work with our archiver, add a template method archive
in the public
scope to your class. This lets our archiver know which data members to serialize and deserialize.
Example: Add Archive Method to Your Classes
The example below shows how to adapt your own type to our archiver interface.
struct MyType {
int a;
double b;
std::tuple<std::string, char> c;
template <typename ArchiverT>
auto archive(ArchiverT& ar) {
return ar(a, b, c);
}
};
Note: If you do not apply the archiver operator to multiple data at one time, you must return the sum of individual returns.
Where to Go from Here?
Congratulations! You have studied one core component in DtCraft. In the next chapter, we will go over the stream graph and key graph components to get your distributed applications up and running.