How to Use C++20 Coroutines
for Networking

Jim (James) Pascoe
http://www.james-pascoe.com
james@james-pascoe.com

http://jamespascoe.github.io/accu2022
https://github.com/jamespascoe/accu2022-example-code.git

How Familiar are you with Coroutines?

  1. I am a coroutine expert
  2. I have a strong grasp but need to fill in some details
  3. I have some understanding but want to learn more
  4. I am just starting to learn about them

Overview

Demystify Using C++20 Coroutines for Networking
Emphasis on practical examples and code

  • Coroutines: fundamentals, benefits and usage
  • How to Write Networking Code Using Coroutines
  • C++23/26: the future of Coroutines

Example Code: Tools & Build

  • C++ examples all compile with GCC 11.2:
  • Lua examples run with Lua 5.4.4:
    • Requires luaposix and luasocket
    • sudo luarocks install luaposix
    • sudo luarocks install luasocket
  • Tested on Linux Mint 19 and Mac OS X (Mojave, Big Sur)

Fundamentals

Coroutines

Coroutines are subroutines with enhanced semantics

  • Invoked by a caller (and return to a caller) ...
  • Can suspend execution
  • Can resume execution (at a later time)

Benefits

Write asynchronous code ...
with the readability of synchronous code

  • Useful for networking
  • Lots of blocking operations (connect, send, receive)
  • Multi-threading (send and receive threads)
  • Asynchronous operations mean callbacks
  • Control flow fragments

Echo Server Build & Run

Blocking Echo Server


//
// echo_server_blocking.cpp
//

#include <boost/asio.hpp>
#include <iostream>

using boost::asio::buffer;
using boost::asio::io_context;
using boost::asio::ip::tcp;
using boost::system::error_code;

static const int buf_len = 1000;

int main() {
  try {
    io_context ctx;

    tcp::acceptor acceptor(ctx, tcp::endpoint(tcp::v4(), 6666));

    for (;;) {
      tcp::socket peer_socket(ctx);
      acceptor.accept(peer_socket);

      std::array<char, buf_len> buf;

      for (;;) {
        error_code error;
        std::size_t len =
            peer_socket.read_some(buffer(buf), error);
        if (error == boost::asio::error::eof)
          break;

        write(peer_socket, buffer(buf, len));
      }
    }
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}
          

Asynchronous Echo Server


//
// echo_server_async.cpp
//

#include <boost/asio.hpp>
#include <iostream>

using boost::asio::buffer;
using boost::asio::io_context;
using boost::asio::ip::tcp;
using boost::system::error_code;

static const int buf_len = 1000;

void accept_handler(error_code const &error,
                    tcp::socket &peer_socket,
                    std::array<char, buf_len> &buf,
                    tcp::acceptor &acceptor);

void read_handler(error_code const &error,
                  std::size_t bytes_transferred,
                  tcp::socket &peer_socket,
                  std::array<char, buf_len> &buf,
                  tcp::acceptor &acceptor);

void write_handler(error_code const &error, std::size_t length,
                   tcp::socket &peer_socket,
                   std::array<char, buf_len> &buf,
                   tcp::acceptor &acceptor) {
  if (!error) {
    peer_socket.async_read_some(
        buffer(buf, length), [&](error_code const &error,
                                 std::size_t bytes_transferred) {
          read_handler(error, bytes_transferred, peer_socket, buf,
                       acceptor);
        });
  } else
    std::cerr << "Write handler error: " << error.message()
              << std::endl;
}

void read_handler(error_code const &error,
                  std::size_t bytes_transferred,
                  tcp::socket &peer_socket,
                  std::array<char, buf_len> &buf,
                  tcp::acceptor &acceptor) {
  if (!error) {
    async_write(peer_socket, buffer(buf, bytes_transferred),
                [&](error_code const &error, std::size_t length) {
                  write_handler(error, length, peer_socket, buf,
                                acceptor);
                });
  } else {
    if (error == boost::asio::error::eof) {
      peer_socket.close();

      acceptor.async_accept(
          peer_socket, [&](error_code const &error) {
            accept_handler(error, peer_socket, buf, acceptor);
          });
    } else
      std::cerr << "Read handler error: " << error.message()
                << std::endl;
  }
}

void accept_handler(error_code const &error,
                    tcp::socket &peer_socket,
                    std::array<char, buf_len> &buf,
                    tcp::acceptor &acceptor) {

  if (!error) {
    peer_socket.async_read_some(
        buffer(buf, buf_len), [&](error_code const &error,
                                  std::size_t bytes_transferred) {
          read_handler(error, bytes_transferred, peer_socket, buf,
                       acceptor);
        });
  } else
    std::cerr << "Accept handler error: " << error.message()
              << std::endl;
}

int main() {
  io_context ctx;
  tcp::socket peer_socket(ctx);
  tcp::acceptor acceptor(ctx, tcp::endpoint(tcp::v4(), 6666));

  std::array<char, buf_len> buf;

  acceptor.async_accept(peer_socket, [&](error_code const &error) {
    accept_handler(error, peer_socket, buf, acceptor);
  });

  try {
    ctx.run();
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}
          

Coroutine Echo Server


//
// echo_server_coroutine.cpp
//

#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>

#include <iostream>

using boost::asio::buffer;
using boost::asio::detached;
using boost::asio::io_context;
using boost::asio::use_awaitable;
using boost::asio::experimental::as_tuple;
using boost::asio::ip::tcp;
using boost::system::error_code;

boost::asio::awaitable<void> echo(tcp::socket peer_socket,
                                  tcp::acceptor acceptor) {
  std::array<char, 1000> buf;

  for (;;) {
    co_await acceptor.async_accept(peer_socket, use_awaitable);

    for (;;) {
      auto [error, len] = co_await peer_socket.async_read_some(
          buffer(buf), as_tuple(use_awaitable));
      if (error == boost::asio::error::eof)
        break;

      co_await async_write(peer_socket, buffer(buf, len),
                           use_awaitable);
    }

    peer_socket.close();
  }
}

int main() {
  try {
    io_context ctx;

    tcp::socket socket(ctx);
    tcp::acceptor acceptor(ctx, tcp::endpoint(tcp::v4(), 6666));

    co_spawn(ctx, echo(std::move(socket), std::move(acceptor)),
             detached);

    ctx.run();
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << std::endl;
  }
}
          

C++20 Coroutines

Coroutine Support in C++20

Tips for Learning

  • Principally for library development (not user-code)
  • Libraries with coroutine support:
    • Boost Asio:
    • Lewis Baker's CppCoro:
      • Contains coroutine types, awaitables etc.
  • References to 'promise' and 'future' are not std::promise and std::future!

Key References

Tutorial Overview

Lua

  • Lightweight embeddable scripting language
  • Good way of building a mental model of coroutines
  • Single threaded so lock-free, no races etc.
  • Implement your own dispatcher (executor) in Lua
  • Lua coroutines are stackful
  • C++20 coroutines are stackless

LuaChat

  • Sender coroutine: sends user input to peer
  • Receiver coroutine: prints received messages
  • Dispatcher: schedules sender and receiver
  • main: processes arguments and creates coroutines

LuaChat Screen Capture

Sender Coroutine


-- Connect to the peer and send messages read from stdin
function sender (host, port)

  while true do

    local remote, err = socket.connect(host, port)
    while not remote do
      coroutine.yield()

      remote, err = socket.connect(host, port)
    end

    print("Connected to " .. host .. ":" .. port)

    while err ~= "closed" do
      -- Read from stdin (non-blocking - 1s timeout)
      local ret = require "posix".rpoll(0, 1000)
      if (ret == 1) then
        local message = io.read()
        if (message ~= "") then
          _, err = remote:send(message .. "\n")
        end
      else -- read timeout: update connection status
        _, err = remote:send("\0")
      end

      coroutine.yield()
    end
  end
end
            

Receiver Coroutine


-- Receive messages from our peer and print them
function receiver (port)

  local server = assert(socket.bind("*", port))
  server:settimeout(0.1) -- set non-blocking (100 ms timeout)

  while true do

    local _, port = server:getsockname()
    print("Waiting for connection on port " .. port);

    local client, err = server:accept()
    if (not client and err == "timeout") then
      coroutine.yield()
    else
      local peer_ip, peer_port = client:getpeername()

      client:send("Connected to LuaChat!\n")
      client:settimeout(0.1)

      while err ~= "closed" do
        local line
        line, err = client:receive("*l")

        if not err then
          print(
            string.format("%s:%d> %s", peer_ip, peer_port, line)
          )
        else
          coroutine.yield()
        end
      end
    end
  end
end
            

Dispatcher


function dispatcher (coroutines)

  while true do
    if next(coroutines) == nil then break end -- no more coroutines

    for name, co in pairs(coroutines) do
      local status, res = coroutine.resume(co)

      if res then -- coroutine has returned a result (i.e. finished)
        if type(res) == "string" then -- runtime error
          print("Lua coroutine '" .. name ..
                "' has exited with error: " .. res)
        else
          print("Lua coroutine '" .. name .. "' exited")
        end

        coroutines[name] = nil
      end
    end
  end
end
            

Main Code


local port, remote_ip, remote_port = tonumber(arg[1]),
                                     arg[2],
                                     tonumber(arg[3])

print(
  string.format("Starting LuaChat:\n" ..
                "  local port: %d\n" ..
                "  remote IP: %s\n" ..
                "  remote port: %d\n\n", port, remote_ip, remote_port)
)

-- Create co-routines
local coroutines = {}
coroutines["receiver"] = coroutine.create(receiver)
coroutine.resume(coroutines["receiver"], port)

coroutines["sender"] = coroutine.create(sender)
coroutine.resume(coroutines["sender"], remote_ip, remote_port)

-- Run the main loop
dispatcher(coroutines)
            

Coroutine Details

Awaitable Type

  • Supports the co_await operator
  • Controls the semantics of an await-expression
  • Informs the compiler how to obtain the awaiter

co_await async_write(..., use_awaitable);
          

Awaiter Type

Coroutine Return Type

  • Declares the promise type to the compiler
    • Using coroutine_traits
  • E.g. 'task<T>' or 'generator<T>'
  • CppCoro defines several return types
  • Referred to as a 'future' in some WG21 papers
  • Not to be confused with std::future

Promise Type

  • Controls the coroutine's behaviour
    • ... example coming up
  • Implements methods that are called at specific points during the execution of the coroutine
  • Conveys coroutine result (or exception)
  • Again - not to be confused with std::promise

Customising Co_Await

  • The await_transform method:
    • Defined in the promise_type
    • Enables types that are not awaitable
    • Disables co_await on certain types
    • Modify the behaviour of awaitable values
  • Also possible to customise co_yield
  • See Lewis Baker's excellent Blog Post for details

Coroutine Handles

  • Handle to a coroutine frame on the heap
  • Means through which coroutines are resumed
  • Also provide access to the promise type
  • Non-owning - have to be destroyed explicitly
    • Often through RAII in the coroutine return type

Generator Example


//
// card_dealer.cpp
// ---------------
//

#include <coroutine>
#include <array>
#include <random>
#include <string>

#include <iostream>

template <typename T> struct generator {
  struct promise_type;
  using coroutine_handle = std::coroutine_handle<promise_type>

  struct promise_type {
    T current_value;

    auto get_return_object() {
      return generator{coroutine_handle::from_promise(*this)};
    }

    // Start 'lazily' i.e. suspend ('eagerly' == 'suspend_never').
    auto initial_suspend() { return std::suspend_always{}; }

    // Opportunity to publish results, signal completion etc.
    // Suspend so that destroy() is called via RAII from outside
    // the coroutine.
    auto final_suspend() noexcept { return std::suspend_always{}; }

    void unhandled_exception() { std::terminate(); }

    auto yield_value(T const &value) {
      current_value = value;
      return std::suspend_always{};
    }
  };

  bool next() {
    return coroutine ? (coroutine.resume(), !coroutine.done())
                     : false;
  }

  T value() const { return coroutine.promise().current_value; }

  // Range support, class design etc.

  ~generator() {
    if (coroutine)
      coroutine.destroy();
  }

private:
  generator(coroutine_handle h) : coroutine(h) {}
  coroutine_handle coroutine;
};

generator<std::string> card_dealer(int deck_size) {
  std::default_random_engine rng;
  std::uniform_int_distribution<int> card(0, 12);
  std::uniform_int_distribution<int> suit(0, 3);

  std::array<std::string, 13> cards = {
      "Ace", "2", "3", "4", "5", "6", "7",
      "8", "9", "10", "Jack", "Queen", "King"};

  std::array<std::string, 4> suits = {"Clubs", "Diamonds",
                                      "Spades", "Hearts"};

  for (int i = 0; i < deck_size; i++)
    co_yield(cards[card(rng)] + " of " + suits[suit(rng)]);
}

int main() {
  generator<std::string> dealer = card_dealer(100);

  while (dealer.next())
    std::cout << dealer.value() << std::endl;
}
        

Traffic Generator

  • A networking example would be a traffic generator
  • Coroutine generates packets
    • Sequence number, checksums, encryption
  • One coroutine transmits with co_await
  • A second coroutine builds the next packet
    • ... during the latency window

Chat Example

Coroutine Chat

  • Prototyped in Lua - implement in C++20
  • Retain coroutine design:
    • Sender: reads keyboard and sends
    • Receiver: prints messages
    • Dispatcher: Boost IO context
  • Must be non-blocking
  • Users should be able to reconnect

Build & Run

Chat Class


class chat {

public:
  chat(char *addr, char *port, char *remote_addr,
       char *remote_port) {

    tcp::resolver resolver(ctx);
    basic_resolver_entry<tcp> listen_endpoint;
    basic_resolver_entry<tcp> remote_endpoint;

    listen_endpoint = *resolver.resolve(addr, port, passive);
    remote_endpoint = *resolver.resolve(remote_addr, remote_port);

    co_spawn(ctx, sender(remote_endpoint), detached);
    co_spawn(ctx, receiver(listen_endpoint), detached);

    ctx.run();
  }

private:
  awaitable<void> sender(tcp::endpoint remote) {
    // ...
  }

  awaitable<void> receiver(tcp::endpoint listen) {
    // ...
  }

  // Member variables (shared between coroutines)
  io_context ctx;
  bool connected = false;
};
          

Sender Coroutine


awaitable<void> sender(tcp::endpoint remote) {

  for (;;) {
    tcp::socket remote_sock(ctx);

    auto [error] = co_await remote_sock.async_connect(
        remote, as_tuple(use_awaitable));
    if (!error) {
      std::cout << "Connected to: " << remote << std::endl;
      connected = true;
    } else {
      std::cout << "Could not connect to: " << remote
                << " - retrying in 1s " << std::endl;

      steady_timer timer(
          co_await boost::asio::this_coro::executor);
      timer.expires_after(std::chrono::seconds(1));
      co_await timer.async_wait(use_awaitable);

      continue;
    }

    std::string data;
    while (connected) {
      // Read a string from stdin (non-blocking)
      struct pollfd input[1] = {{.fd = 0, .events = POLLIN}};
      if (poll(input, 1, 100 /* timeout in ms */)) {
        char c;
        while (std::cin.get(c) && c != '\n')
          data += c;

        data += "\r\n";
      }

      co_await async_write(remote_sock, buffer(data),
                           as_tuple(use_awaitable));

      data.clear();
    }
  }
}
          

Receiver Coroutine


awaitable<void> receiver(tcp::endpoint listen) {

  tcp::acceptor acceptor(ctx, listen);

  for (;;) {
    auto [error, client] =
        co_await acceptor.async_accept(as_tuple(use_awaitable));

    if (!error) {
      std::string data;

      for (;;) {
        auto [error, len] = co_await async_read_until(
            client, dynamic_buffer(data), boost::regex("\r\n"),
            as_tuple(use_awaitable));

        if (error == boost::asio::error::eof) {
          // remote has disconnected
          connected = false;
          break;
        }

        std::cout << client.remote_endpoint() << "> " << data;
        data.clear();
      }
    } else {
      std::cerr << "Accept failed: " << error.message() << "\n";
    }
  }
}
          

Complete Listing


#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/regex.hpp>

#include <iostream>

using boost::asio::awaitable;
using boost::asio::buffer;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::io_context;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using boost::asio::experimental::as_tuple;
using boost::asio::ip::basic_resolver_entry;
using boost::asio::ip::tcp;
using boost::asio::ip::tcp::resolver::passive;

using boost::system::error_code;

class chat {

public:
  chat(char *addr, char *port, char *remote_addr,
       char *remote_port) {

    tcp::resolver resolver(ctx);
    basic_resolver_entry<tcp> listen_endpoint;
    basic_resolver_entry<tcp> remote_endpoint;

    listen_endpoint = *resolver.resolve(addr, port, passive);
    remote_endpoint = *resolver.resolve(remote_addr, remote_port);

    co_spawn(ctx, sender(remote_endpoint), detached);
    co_spawn(ctx, receiver(listen_endpoint), detached);

    ctx.run();
  }

private:
  awaitable<void> sender(tcp::endpoint remote) {

    for (;;) {
      tcp::socket remote_sock(ctx);

      auto [error] = co_await remote_sock.async_connect(
          remote, as_tuple(use_awaitable));
      if (!error) {
        std::cout << "Connected to: " << remote << std::endl;
        connected = true;
      } else {
        std::cout << "Could not connect to: " << remote
                  << " - retrying in 1s " << std::endl;

        steady_timer timer(
            co_await boost::asio::this_coro::executor);
        timer.expires_after(std::chrono::seconds(1));
        co_await timer.async_wait(use_awaitable);
        continue;
      }

      std::string data;
      while (connected) {
        // Read a string from stdin (non-blocking)
        struct pollfd input[1] = {{.fd = 0, .events = POLLIN}};
        if (poll(input, 1, 100 /* timeout in ms */)) {
          char c;
          while (std::cin.get(c) && c != '\n')
            data += c;

          data += "\r\n";
        }

        co_await async_write(remote_sock, buffer(data),
                             as_tuple(use_awaitable));

        data.clear();
      }
    }
  }

  awaitable<void> receiver(tcp::endpoint listen) {

    tcp::acceptor acceptor(ctx, listen);

    for (;;) {
      auto [error, client] =
          co_await acceptor.async_accept(as_tuple(use_awaitable));

      if (!error) {
        std::string data;

        for (;;) {
          auto [error, len] = co_await async_read_until(
              client, dynamic_buffer(data), boost::regex("\r\n"),
              as_tuple(use_awaitable));

          if (error == boost::asio::error::eof) {
            // remote has disconnected
            connected = false;
            break;
          }

          std::cout << client.remote_endpoint() << "> " << data;
          data.clear();
        }
      } else {
        std::cerr << "Accept failed: " << error.message() << "\n";
      }
    }
  }

  // Member variables (shared between coroutines)
  io_context ctx;
  bool connected = false;
};

int main(int argc, char *argv[]) {

  try {
    if (argc != 5) {
      std::cerr << "Usage: " << argv[0];
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <remote_address> <remote_port>\n";
      return 1;
    }

    chat(argv[1], argv[2], argv[3], argv[4]);

  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}
          

Conclusion

What's Coming in C++23/26?

  • Library support for coroutines was planned
  • P2502: standardised generator std::generator
    • Models std::ranges::input_range
    • Accepted by LEWG in Jan 2022 - sent to LWG
    • But, appears to have missed C++23
  • P2300: standardised execution std::execution

Conclusion

  • C++20 coroutines provide a language capability
    • Functions can be suspended and resumed
  • Coroutines allow asynchronous code to be written
    • With the readability of synchronous code
  • Using coroutines in user-code:
    • Library support is improving

Questions?

http://www.james-pascoe.com
james@james-pascoe.com

http://jamespascoe.github.io/accu2022
https://github.com/jamespascoe/accu2022-example-code.git