Applied C++20 Coroutines

Jim (James) Pascoe
http://www.james-pascoe.com
james@james-pascoe.com

http://jamespascoe.github.io/accu2023
https://github.com/jamespascoe/accu2023-example-code.git

ACCU Bristol and Bath Meetup Coordinator

Coroutines ... What Next?

  1. Fit within the wider concurrency framework
  2. More examples (real-world and learning)
  3. Empirical measurements
  4. Library support and the future

Outline

Example Code: Tools & Build

Concurrency
Back-to-Basics

Concurrency vs. Parallelism

  • Concurrency exists when:
    • multiple items of work are 'in progress'
    • e.g. processes, threads or coroutines
    • harnessing windows of latency

  • Parallelism exists when:
    • multiple items of work execute simultaneously
    • e.g. threads running on separate CPU cores
    • execution occurs at the same instant in time

Concurrency Granularity

  1. Multiple processes run on a single computer
  2. Multiple threads run within a single process
  3. Multiple coroutines run within a single thread

Concurrency allows us to harness latency

Processes

  • OS 'multitasks' by forking processes
  • Context switch occurs when:
    • a process is blocked (e.g. semaphore)
    • or a pre-emptive time slice expires
  • Overhead is high:
    • VM tables, program code, heap, stack, fds, signals
    • Sharing data, synchronisation and scaling are hard

Threads

  • Light-weight threads in a heavy-weight process
  • Lower overhead (faster context switch):
    • ... stack, program counter, signal table
  • C++03: OS, C++11: std::thread, C++20: std::jthread
  • Reentrancy: multiple invocations run concurrently
  • Thread safety: the avoidance of race conditions
  • Green Threads: scheduled by a runtime library / VM

Coroutines (as Fibers)

  • Multiple coroutines in a single thread
  • Scheduled by a 'dispatcher' (same thread)
  • No races, synchronisation or data sharing issues
  • Allows work when part of the thread is blocked
  • See Boost.Fiber for details

Coroutines in the Field

Blu Wireless: Mobile Mesh

  • IP networking over 5G mmWave (60 GHz) modems
    • 802.11ad MAC + PHY (Hydra) + software
  • High-bandwidth, low latency mobile Internet
    • Up to 3 Gbps wireless links (up to 4 km)
  • Embedded quad-core ARMv8 NPUs

Mobile Connection Management

  • L1 management implemented using coroutines
    • Combination of Modern C++ (17/20) and Lua
  • Lots of asynchronous operations
    • Scan, Connect, Disconnect
    • Around 40 primitives (called 'Actions')
  • Groups of coroutines operate in threads
    • No race conditions or data sharing limitations
    • Concurrency combined with Parallelism

Example: Lua Networking Fibers

  • Lua behaviour: two nodes sending messages
  • Includes three actions: 'Connector', 'Timer' and 'Log'
  • Also provides: SWIG, CMake, Lua 'main' code
  • Other bindings exist: e.g. the PhD's Sol3

Lua Behavior


--[[

 lua_fiber.lua

 This behaviour provides an example of networked fibers.

]]

function ping_fiber (connector, remote_port)

  Actions.Log.info(
    "ping_fiber: connecting to port: " .. remote_port
  )

  local timer = Actions.Timer()

  -- Connect to a node and send a 'ping' message
  while true do

    connector:Send("localhost", remote_port, "PING")

    repeat
      coroutine.yield()
    until (connector:IsMessageAvailable())

    Actions.Log.info(
      "ping_fiber: received: " .. connector:GetNextMessage()
    )

    timer(Actions.Timer.WaitType_NOBLOCK, 1, "s", 1)
    while timer:IsWaiting() do
      coroutine.yield()
    end

  end

end

function pong_fiber (connector, remote_port)

  Actions.Log.info(
    "pong_fiber: connecting to port: " .. remote_port
  )

  local timer = Actions.Timer()

  -- Connect to a node and send a 'pong' message
  while true do

    repeat
      coroutine.yield()
    until (connector:IsMessageAvailable())

    Actions.Log.info(
      "pong_fiber: received: " .. connector:GetNextMessage()
    )

    connector:Send("localhost", remote_port, "PONG")

    timer(Actions.Timer.WaitType_NOBLOCK, 1, "s", 2)
    while timer:IsWaiting() do
      coroutine.yield()
    end

  end

end

-- Coroutine dispatcher (see Section 9.4 of 'Programming in Lua')
function dispatcher (coroutines)

  local timer = Actions.Timer()

  while true do
    if next(coroutines) == nil then break end

    for name, co in pairs(coroutines) do
      local status, res = coroutine.resume(co)

      if res then -- coroutine has finished

        if type(res) == "string" then  -- runtime error
          Actions.Log.critical(
            "Lua coroutine '" .. tostring(name) ..
            "' has exited with runtime error " .. res
          )
        else
          Actions.Log.warn(
            "Lua coroutine '" .. tostring(name) ..
            "' exited"
          )
        end

        coroutines[name] = nil

        break
      end
    end

    -- Run the dispatcher every 1 ms. Note, that a blocking timer
    -- is required to prevent lua_mesh from consuming 100% of
    -- a core.
    timer(Actions.Timer.WaitType_BLOCK, 1, "ms", 3)

  end
end

local function main(args)

  print("Welcome to Lua Fiber !")

  if (not args or not args["port"]) then
    print("Usage: lua_fiber lua_fiber.lua -a port=")
    os.exit(1)
  end

  print(
    string.format("Starting Lua Fiber:\n" ..
                  "  listen port: %d\n", tonumber(args["port"]))
  )

  local connector = Actions.Connector(args["port"])
  local remote_port = args["port"] == 7777 and "8888" or "7777"

  -- Create co-routines
  local coroutines = {}
  coroutines["ping"] = coroutine.create(ping_fiber)
  coroutine.resume(coroutines["ping"], connector, remote_port)

  coroutines["pong"] = coroutine.create(pong_fiber)
  coroutine.resume(coroutines["pong"], connector, remote_port)

  -- Run the main loop
  dispatcher(coroutines)

end

local behaviour = {
  name = "lua_fiber",
  description = "A Lua behaviour to demonstrate fibers",
  entry_point = main
}

return behaviour
          

Connector Action


//
// lua_fiber_connector_action.hpp
//

#include "asio/asio.hpp"

class Connector {
public:
  enum class ErrorType { SUCCESS, RESOLVE_FAILED, CONNECT_FAILED };

  inline static int const default_port = 7777;

  Connector(unsigned short port = default_port);

  ~Connector();

  // Do not allow instances to be copied or moved
  Connector(Connector const& rhs) = delete;
  Connector(Connector&& rhs) = delete;
  Connector& operator=(Connector const& rhs) = delete;
  Connector& operator=(Connector&& rhs) = delete;

  // Send a message to a remote behaviour
  ErrorType Send(std::string const& hostname_or_ip,
                 std::string const& port,
                 std::string const& message);

  // Returns whether a message is available to be read
  bool IsMessageAvailable(void) { return !m_messages.empty(); }

  // Returns the most recent message (or an empty string if none
  // are available)
  std::string GetNextMessage(void);

private:
  using tcp = asio::ip::tcp;

  // Max number of messages to retain
  inline static const int max_messages = 32;

  // Encapsulate a TCP connection and the data sent over it. Note
  // that this is based on the ASIO Asychronous TCP server tutorial.
  class tcp_connection {
  public:
    using pointer = std::shared_ptr<tcp_connection>

    static pointer create(asio::io_context& io_context) {
      return pointer(new tcp_connection(io_context));
    }

    tcp::socket& socket() { return m_socket; }

    std::string& data() { return m_data; }

  private:
    tcp_connection(asio::io_context& io_context) :
      m_socket(io_context) {}

    tcp::socket m_socket;
    std::string m_data;
  };

  // Asynchronous handlers for reading and accepting connections
  void handle_read(asio::error_code const& error,
                   std::size_t bytes_transferred,
                   tcp_connection::pointer connection);

  void handle_write(asio::error_code const& error,
                    std::size_t bytes_transferred,
                    tcp_connection::pointer connection);

  void handle_accept(tcp_connection::pointer new_connection,
                     asio::error_code const& error);

  void start_accept();

  // Member variables
  asio::io_context m_io_context;
  asio::ip::tcp::acceptor m_acceptor;

  std::thread m_thread;

  // FIFO vector to store received messages
  std::vector<std::string> m_messages;
};
          

Connector Action


//
// lua_fiber_connector_action.cpp
//

#include "lua_fiber_action_connector.hpp"

#include "lua_fiber_log_manager.hpp"

Connector::Connector(unsigned short port)
    : m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port)) {
  start_accept();

  m_thread = std::thread([this]() { m_io_context.run(); });

  log_trace("Connector action starting");
}

Connector::~Connector() {
  log_trace("Cleaning up in Connector action");

  m_io_context.stop();
  m_thread.join();

  log_trace("Connector action exiting");
}

// Send a message to a remote behaviour
Connector::ErrorType Connector::Send(std::string const& host,
                                     std::string const& port,
                                     std::string const& message) {
  // Resolve the destination endpoint
  tcp::resolver resolver(m_io_context);
  tcp::resolver::results_type endpoints;

  try {
    endpoints = resolver.resolve(host, port);
  } catch (asio::system_error& e) {
    log_error(
        "Connector send failed: unable to resolve {}:{}",
        host, port);

    return ErrorType::RESOLVE_FAILED;
  }

  // Open a connection
  tcp_connection::pointer connection =
    tcp_connection::create(m_io_context);

  try {
    asio::connect(connection->socket(), endpoints);
  } catch (asio::system_error& e) {
    log_error("Connector send failed: could not connect to {}:{}",
              host, port);

    return ErrorType::CONNECT_FAILED;
  }

  asio::async_write(
    connection->socket(),
    asio::buffer(message),
    [this, connection](const asio::error_code& error,
                       std::size_t bytes_transferred) {
      handle_write(error, bytes_transferred, connection);
    }
  );

  return ErrorType::SUCCESS;
}

// Returns the most recent message (or an empty string
// if none are available)
std::string Connector::GetNextMessage(void) {
  if (!IsMessageAvailable())
    return "";

  std::string ret = m_messages.front();

  m_messages.erase(m_messages.begin());

  return ret;
}

// Asynchronous handlers for reading and accepting connections
void Connector::handle_read(asio::error_code const> error,
                            std::size_t bytes_transferred,
                            tcp_connection::pointer connection) {
  if (!error || error == asio::error::eof) {
    // Prevent the message array from growing uncontrollably
    if (m_messages.size() > max_messages)
      m_messages.erase(m_messages.begin());

    m_messages.emplace_back(connection-> data());

    log_info("Received message ({} bytes): {}",
             bytes_transferred,
             connection->data());
  } else
    log_error("Connector read error: {}", error.message());
}

// Note the 'maybe_unused' attribute for the TCP connection. This
// ensures that the underlying TCP socket is not closed until the
// write handler has exited.
void Connector::handle_write(
    asio::error_code const& error,
    std::size_t bytes_transferred,
    [[maybe_unused]] tcp_connection::pointer connection) {
  if (!error)
    log_info("Sent message ({} bytes)", bytes_transferred);
  else
    log_error("Connector send error: {}", error.message());
}

void Connector::handle_accept(tcp_connection::pointer connection,
                              asio::error_code const& error) {
  if (!error) {
    log_debug("Accepted message connection");

    asio::async_read(
      connection->socket(),
      asio::dynamic_buffer(connection->data()),
      [this, connection](const asio::error_code& error,
                         std::size_t bytes_transferred) {
        handle_read(error, bytes_transferred, connection);
      }
    );
  } else
    log_error("Connector accept error: {}", error.message());

  start_accept();
}

void Connector::start_accept() {
  tcp_connection::pointer connection =
      tcp_connection::create(m_acceptor.get_executor().context());

  m_acceptor.async_accept(
    connection->socket(),
    [this, connection](const asio::error_code& error) {
      handle_accept(connection, error);
    }
  );
}
          

C++20 Coroutines

Coroutines

Coroutines are subroutines with enhanced semantics

  • Invoked by a caller (and return to a caller) ...
  • Can suspend execution
  • Can resume execution (at a later time)

Benefits

Write asynchronous code ...
with the readability of synchronous code

  • Useful for networking
  • Lots of blocking operations (connect, send, receive)
  • Multi-threading (send and receive threads)
  • Asynchronous operations mean callbacks
  • Control flow fragments

Coroutine Support in C++20

Key Talks and References

Awaitable Type

  • Supports the co_await operator
  • Controls the semantics of an await-expression
  • Informs the compiler how to obtain the awaiter

co_await async_write(..., use_awaitable);
          

Awaiter Type

Coroutine Return Type

  • Declares the promise type to the compiler
    • Using coroutine_traits
  • E.g. 'task<T>' or 'generator<T>'
  • CppCoro defines several return types
  • Referred to as a 'future' in some WG21 papers
  • Not to be confused with std::future

Promise Type

  • Controls the coroutine's behaviour
    • ... example coming up
  • Implements methods that are called at specific points during the execution of the coroutine
  • Conveys coroutine result (or exception)
  • Again - not to be confused with std::promise

Coroutine Handles

  • Handle to a coroutine frame on the heap
  • Means through which coroutines are resumed
  • Also provide access to the promise type
  • Non-owning - have to be destroyed explicitly
    • Often through RAII in the coroutine return type

Generator Example


//
// card_dealer.cpp
// ---------------
//

#include <coroutine>
#include <array>
#include <random>
#include <string>

#include <iostream>

template <typename T> struct generator {
  struct promise_type;
  using coroutine_handle = std::coroutine_handle<promise_type>

  struct promise_type {
    T current_value;

    auto get_return_object() {
      return generator{coroutine_handle::from_promise(*this)};
    }

    // Start 'lazily' i.e. suspend ('eagerly' == 'suspend_never').
    auto initial_suspend() { return std::suspend_always{}; }

    // Opportunity to publish results, signal completion etc.
    // Suspend so that destroy() is called via RAII from outside
    // the coroutine.
    auto final_suspend() noexcept { return std::suspend_always{}; }

    void unhandled_exception() { std::terminate(); }

    auto yield_value(T const &value) {
      current_value = value;
      return std::suspend_always{};
    }
  };

  bool next() {
    return coroutine ? (coroutine.resume(), !coroutine.done())
                     : false;
  }

  T value() const { return coroutine.promise().current_value; }

  // Range support, class design etc.

  ~generator() {
    if (coroutine)
      coroutine.destroy();
  }

private:
  generator(coroutine_handle h) : coroutine(h) {}
  coroutine_handle coroutine;
};

generator<std::string> card_dealer(int deck_size) {
  std::default_random_engine rng;
  std::uniform_int_distribution<int> card(0, 12);
  std::uniform_int_distribution<int> suit(0, 3);

  std::array<std::string, 13> cards = {
      "Ace", "2", "3", "4", "5", "6", "7",
      "8", "9", "10", "Jack", "Queen", "King"};

  std::array<std::string, 4> suits = {"Clubs", "Diamonds",
                                      "Spades", "Hearts"};

  for (int i = 0; i < deck_size; i++)
    co_yield(cards[card(rng)] + " of " + suits[suit(rng)]);
}

int main() {
  generator<std::string> dealer = card_dealer(100);

  while (dealer.next())
    std::cout << dealer.value() << std::endl;
}
        

Coroutines Applied

Observations

  • C++20 coroutines are powerful ... but complex
  • At the application level, how do we:
    • Compare different forms of asynchrony
    • Evaluate/benchmark performance
    • Understand what's going on at the hardware level
  • What is a practical methodology for doing this?

Boost.Beast

Apache Bench

  • 'ab' is a tool for benchmarking HTTP servers
  • Mature implementation with extensive set of options
  • Number of concurrent requests is configurable
  • Measures 'requests per second' that can be serviced

HTTP Server: Asynchronous


#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/asio/strand.hpp>

#include <iostream>
#include <thread>
#include <format>

namespace beast = boost::beast;
namespace http = beast::http;
namespace asio = boost::asio;
using tcp = boost::asio::ip::tcp;

void error(beast::error_code ec, char const* what)
{
  std::cerr << std::format("Error: {} : {}\n", what, ec.message());
  return;
};

// Handles an HTTP server connection
class session : public std::enable_shared_from_this<session>
{
  beast::tcp_stream stream_;
  beast::flat_buffer buffer_;
  http::request<http::string_body> req_;

public:
  // Take ownership of the stream
  session(
      tcp::socket&& socket)
    : stream_(std::move(socket))
  { }

  // Start the asynchronous operation
  void run()
  {
    asio::dispatch(stream_.get_executor(),
        beast::bind_front_handler(
          &session::do_read,
          shared_from_this()));
  }

  void do_read()
  {
    // Make the request empty before reading,
    // otherwise the operation behavior is undefined.
    req_ = {};

    // Set the timeout.
    stream_.expires_after(std::chrono::seconds(30));

    // Read a request
    http::async_read(stream_, buffer_, req_,
        beast::bind_front_handler(
          &session::on_read,
          shared_from_this()));
  }

  void on_read(
    beast::error_code ec,
    std::size_t bytes_transferred)
  {
    boost::ignore_unused(bytes_transferred);

    // This means they closed the connection
    if(ec == http::error::end_of_stream) {
      stream_.socket().shutdown(tcp::socket::shutdown_send, ec);
      return;
    }

    if(ec) return error(ec, "read");

    // Send the response
    auto handle_request = [this]() -> http::message_generator {
      http::response<http::string_body> res{
        http::status::ok, req_.version()};
      res.set(http::field::server, "Boost.Beast");
      res.body() = "Hello ACCU 2023 from the Asynchronous Server!";
      res.prepare_payload();
      res.keep_alive(req_.keep_alive());
      return res;
    };

    beast::async_write(
        stream_,
        handle_request(),
        beast::bind_front_handler(
          &session::on_write, shared_from_this()));
  }

  void on_write(
    beast::error_code ec,
    std::size_t bytes_transferred)
  {
    boost::ignore_unused(bytes_transferred);

    if(ec) return error(ec, "write");

    stream_.socket().shutdown(tcp::socket::shutdown_send, ec);

    // Read another request
    do_read();
  }
};

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
{
  asio::io_context& ioc_;
  tcp::acceptor acceptor_;

public:
  listener(
      asio::io_context& ioc,
      tcp::endpoint endpoint)
    : ioc_(ioc)
      , acceptor_(asio::make_strand(ioc))
  {
    beast::error_code ec;

    // Open the acceptor
    acceptor_.open(endpoint.protocol(), ec);
    if(ec) error(ec, "open");

    // Allow address reuse
    acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
    if(ec) error(ec, "set_option");

    // Bind to the server address
    acceptor_.bind(endpoint, ec);
    if(ec) error(ec, "bind");

    // Start listening for connections
    acceptor_.listen(asio::socket_base::max_listen_connections, ec);
    if(ec) error(ec, "listen");
  }

  // Start accepting incoming connections
  void run()
  {
    do_accept();
  }

private:
  void do_accept()
  {
    // The new connection gets its own strand
    acceptor_.async_accept(
        asio::make_strand(ioc_),
        beast::bind_front_handler(
          &listener::on_accept,
          shared_from_this()));
  }

  void on_accept(beast::error_code ec, tcp::socket socket)
  {
    if(ec) error(ec, "accept");

    // Create the session and run it
    std::make_shared<session>(
        std::move(socket))->run();

    // Accept another connection
    do_accept();
  };
};

int main(int argc, char *argv[])
{
  if (argc != 4) {
    std::cerr << std::format(
        "Usage: {} <ip-address> <port> <threads>\n"
        "E.g.: {} 0.0.0.0 8080 2\n",
        argv[0], argv[0]
        );
    return EXIT_FAILURE;
  }

  auto const address = asio::ip::make_address(argv[1]);
  auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
  auto const num_threads = std::max<int>(1, std::atoi(argv[3]));

  asio::io_context ioc{num_threads};

  // Create and launch a listening port
  std::make_shared<listener>(
    ioc, tcp::endpoint{address, port}
  )->run();

  // Run the IO service with the requested number of threads
  std::vector<std::thread> v(num_threads-1);
  for (auto i = num_threads - 1; i; --i)
    v.emplace_back([&ioc]{ ioc.run(); });

  // Use the main thread as well
  ioc.run();

  return EXIT_SUCCESS;
}
            

HTTP Server: Stackful Coroutines


#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/asio/spawn.hpp>

#include <iostream>
#include <thread>
#include <vector>
#include <format>

namespace beast = boost::beast;
namespace http = beast::http;
namespace asio = boost::asio;
using tcp = boost::asio::ip::tcp;

// Report an error
void error(beast::error_code ec, char const* msg)
{
  std::cerr << std::format("Error: {} - {}\n", msg, ec.message());
}

void do_session(
  beast::tcp_stream& stream,
  asio::yield_context yield)
{
  beast::flat_buffer buffer;
  beast::error_code ec;

  for(;;)
  {
    // Set a timeout (in case the client stops responding)
    stream.expires_after(std::chrono::seconds(30));

    // Read a request
    http::request<http::string_body> req;
    http::async_read(stream, buffer, req, yield[ec]);

    if(ec == http::error::end_of_stream) break;

    if(ec) return error(ec, "read request");

    // Handle the request
    auto handle_request = [&req]() -> http::message_generator {
      http::response<http::string_body> res{
        http::status::ok, req.version()
      };
      res.set(http::field::server, "Beast");
      res.body() = "Hello ACCU 2023 from the Stackful Coro Server!";
      res.prepare_payload();
      res.keep_alive(req.keep_alive());
      return res;
    };

    // Send the response
    beast::async_write(stream, handle_request(), yield[ec]);

    if(ec) return error(ec, "write response");

    // Determine if we should close the connection
    if(!req.keep_alive()) break;
  }

  // Close the connection
  stream.socket().shutdown(tcp::socket::shutdown_send, ec);
}

// Accepts incoming connections and launches the sessions
void do_listen(
  asio::io_context& ioc,
  tcp::endpoint endpoint,
  asio::yield_context yield)
{
  beast::error_code ec;

  // Open the acceptor
  tcp::acceptor acceptor(ioc);
  acceptor.open(endpoint.protocol(), ec);
  if(ec) return error(ec, "open");

  // Allow address reuse
  acceptor.set_option(asio::socket_base::reuse_address(true), ec);
  if(ec) return error(ec, "set_option");

  // Bind to the server address
  acceptor.bind(endpoint, ec);
  if(ec) return error(ec, "bind");

  // Start listening for connections
  acceptor.listen(asio::socket_base::max_listen_connections, ec);
  if(ec) return error(ec, "listen");

  for(;;)
  {
    tcp::socket socket(ioc);
    acceptor.async_accept(socket, yield[ec]);
    if(ec)
      error(ec, "accept");
    else
      boost::asio::spawn(
          acceptor.get_executor(),
          std::bind(
            do_session,
            beast::tcp_stream(std::move(socket)),
            std::placeholders::_1));
  }
}

int main(int argc, char *argv[])
{
  if (argc != 4) {
    std::cerr << std::format(
        "Usage: {} <ip-address> <port> <num_threads>\n"
        "E.g.: {} 0.0.0.0 8080 2\n",
        argv[0], argv[0]
        );
    return EXIT_FAILURE;
  }

  auto const address = asio::ip::make_address(argv[1]);
  auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
  auto const num_threads = std::max<int>(1, std::atoi(argv[3]));

  asio::io_context ioc{num_threads};

  // Spawn a stackful coroutine
  boost::asio::spawn(ioc,
      std::bind(
        &do_listen,
        std::ref(ioc),
        tcp::endpoint{address, port},
        std::placeholders::_1));

  // Run the IO service with the requested number of threads
  std::vector<std::thread> v(num_threads-1);
  for (auto i = num_threads - 1; i; --i)
    v.emplace_back([&ioc]{ ioc.run(); });

  // Use the main thread as well
  ioc.run();

  return EXIT_SUCCESS;
}
            

HTTP Server: C++20 Coroutines


#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/use_awaitable.hpp>

#include <iostream>
#include <thread>
#include <vector>
#include <format>

namespace beast = boost::beast;
namespace http = beast::http;
namespace asio = boost::asio;
using tcp = boost::asio::ip::tcp;

using tcp_stream = typename beast::tcp_stream::rebind_executor<
  asio::use_awaitable_t<>::
    executor_with_default<asio::any_io_executor>>::other;

// Handles an HTTP server connection
asio::awaitable<void> do_session(tcp_stream stream)
{
  beast::error_code ec;

  // This buffer is required to persist across reads
  beast::flat_buffer buffer;

  for(;;) {
    try
    {
      // Set the timeout.
      stream.expires_after(std::chrono::seconds(30));

      // Read a request
      http::request<http::string_body> req;
      co_await http::async_read(stream, buffer, req);

      // Handle the request
      auto handle_request = [&req]() -> http::message_generator {
        http::response<http::string_body> res{
          http::status::ok,
            req.version()
        };
        res.set(http::field::server, "Beast");
        res.body() = "Hello ACCU 2023 from the Awaitable Server!";
        res.prepare_payload();
        res.keep_alive(req.keep_alive());
        return res;
      };

      // Send response
      co_await beast::async_write(
        stream, handle_request(), asio::use_awaitable
      );

      // Determine if we should close the connection
      if(!req.keep_alive()) break;
    }
    catch (boost::system::system_error & se)
    {
      if (se.code() != http::error::end_of_stream)
        throw;
    }
  }

  // Send a TCP shutdown
  stream.socket().shutdown(tcp::socket::shutdown_send, ec);
}

// Accepts incoming connections and launches the sessions
asio::awaitable<void> do_listen(tcp::endpoint endpoint)
{
  // Open the acceptor
  auto acceptor = asio::use_awaitable.as_default_on(
    tcp::acceptor(co_await asio::this_coro::executor)
  );
  acceptor.open(endpoint.protocol());

  // Allow address reuse
  acceptor.set_option(asio::socket_base::reuse_address(true));

  // Bind to the server address
  acceptor.bind(endpoint);

  // Start listening for connections
  acceptor.listen(asio::socket_base::max_listen_connections);

  for(;;)
    boost::asio::co_spawn(
      acceptor.get_executor(),
      do_session(tcp_stream(co_await acceptor.async_accept())),
      [](std::exception_ptr e)
      {
        try
        {
          if (e) std::rethrow_exception(e);
        }
        catch (std::exception &e) {
          std::cerr << "Error in session: " << e.what() << "\n";
        }
      }
    );
}

int main(int argc, char* argv[])
{
  // Check command line arguments.
  if (argc != 4) {
    std::cerr << std::format(
      "Usage: {} <ip-address> <port> <threads>\n"
      "E.g.: {} 0.0.0.0 8080 1\n",
      argv[0], argv[0]
    );
    return EXIT_FAILURE;
  }

  auto const address = asio::ip::make_address(argv[1]);
  auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
  auto const num_threads = std::max<int>(1, std::atoi(argv[3]));

  // The io_context is required for all I/O
  asio::io_context ioc{num_threads};

  // Spawn a listening port
  boost::asio::co_spawn(ioc,
    do_listen(tcp::endpoint{address, port}),
    [](std::exception_ptr e)
    {
      try
      {
        if (e) std::rethrow_exception(e);
      }
      catch(std::exception & e)
      {
        std::cerr << "Error in acceptor: " << e.what() << "\n";
      }
    }
  );

  // Run the I/O service on the requested number of threads
  std::vector<std::thread> v(num_threads-1);
  for (auto i = num_threads - 1; i; --i)
    v.emplace_back([&ioc]{ ioc.run(); });

  // Use the main thread as well
  ioc.run();

  return EXIT_SUCCESS;
}
            

Web Server Performance Comparison: x86-64

Time Per Request (ms): x86-64

Conclusions

Debugging Tips

  • Design concurrency before implementing
    • Eliminate bugs by design e.g. race conditions
  • Be careful with object lifetimes
    • Common idiom: RAII class that inherits from std::enable_shared_from_this
    • Check for resource exhaustion e.g. lsof -p

C++23 Stacktrace

  • C++23 stacktrace can be very helpful:
    • Good support in GCC 12.2
    • Configure with: --enable-libstdcxx-backtrace=yes
    • Compile with: -std=c++23 -lstdc++_libbacktrace

C++23/26 Coroutine Update

  • P2502: standardised generator std::generator
    • Models std::ranges::input_range
    • Approved for C++23 (June 2022)
    • Not yet implemented in standard libraries
  • P2300: std::execution
    • Standardised asynchronous execution
    • ... on generic execution contexts

Conclusion

  • Coroutines allow asynchronous code to be written
    • With the readability of synchronous code
    • Fibers: a light-weight alternative to threading
    • Empirical insights are compelling
  • Using coroutines in user-code:

Questions?

http://www.james-pascoe.com
james@james-pascoe.com

http://jamespascoe.github.io/accu2023
https://github.com/jamespascoe/accu2023-example-code.git

ACCU Bristol and Bath Meetup Coordinator