Jim (James) Pascoe
http://www.james-pascoe.com
james@james-pascoe.com
http://jamespascoe.github.io/accu2023
https://github.com/jamespascoe/accu2023-example-code.git
ACCU Bristol and Bath Meetup Coordinator
Concurrency allows us to harness latency
--[[
lua_fiber.lua
This behaviour provides an example of networked fibers.
]]
function ping_fiber (connector, remote_port)
Actions.Log.info(
"ping_fiber: connecting to port: " .. remote_port
)
local timer = Actions.Timer()
-- Connect to a node and send a 'ping' message
while true do
connector:Send("localhost", remote_port, "PING")
repeat
coroutine.yield()
until (connector:IsMessageAvailable())
Actions.Log.info(
"ping_fiber: received: " .. connector:GetNextMessage()
)
timer(Actions.Timer.WaitType_NOBLOCK, 1, "s", 1)
while timer:IsWaiting() do
coroutine.yield()
end
end
end
function pong_fiber (connector, remote_port)
Actions.Log.info(
"pong_fiber: connecting to port: " .. remote_port
)
local timer = Actions.Timer()
-- Connect to a node and send a 'pong' message
while true do
repeat
coroutine.yield()
until (connector:IsMessageAvailable())
Actions.Log.info(
"pong_fiber: received: " .. connector:GetNextMessage()
)
connector:Send("localhost", remote_port, "PONG")
timer(Actions.Timer.WaitType_NOBLOCK, 1, "s", 2)
while timer:IsWaiting() do
coroutine.yield()
end
end
end
-- Coroutine dispatcher (see Section 9.4 of 'Programming in Lua')
function dispatcher (coroutines)
local timer = Actions.Timer()
while true do
if next(coroutines) == nil then break end
for name, co in pairs(coroutines) do
local status, res = coroutine.resume(co)
if res then -- coroutine has finished
if type(res) == "string" then -- runtime error
Actions.Log.critical(
"Lua coroutine '" .. tostring(name) ..
"' has exited with runtime error " .. res
)
else
Actions.Log.warn(
"Lua coroutine '" .. tostring(name) ..
"' exited"
)
end
coroutines[name] = nil
break
end
end
-- Run the dispatcher every 1 ms. Note, that a blocking timer
-- is required to prevent lua_mesh from consuming 100% of
-- a core.
timer(Actions.Timer.WaitType_BLOCK, 1, "ms", 3)
end
end
local function main(args)
print("Welcome to Lua Fiber !")
if (not args or not args["port"]) then
print("Usage: lua_fiber lua_fiber.lua -a port=")
os.exit(1)
end
print(
string.format("Starting Lua Fiber:\n" ..
" listen port: %d\n", tonumber(args["port"]))
)
local connector = Actions.Connector(args["port"])
local remote_port = args["port"] == 7777 and "8888" or "7777"
-- Create co-routines
local coroutines = {}
coroutines["ping"] = coroutine.create(ping_fiber)
coroutine.resume(coroutines["ping"], connector, remote_port)
coroutines["pong"] = coroutine.create(pong_fiber)
coroutine.resume(coroutines["pong"], connector, remote_port)
-- Run the main loop
dispatcher(coroutines)
end
local behaviour = {
name = "lua_fiber",
description = "A Lua behaviour to demonstrate fibers",
entry_point = main
}
return behaviour
//
// lua_fiber_connector_action.hpp
//
#include "asio/asio.hpp"
class Connector {
public:
enum class ErrorType { SUCCESS, RESOLVE_FAILED, CONNECT_FAILED };
inline static int const default_port = 7777;
Connector(unsigned short port = default_port);
~Connector();
// Do not allow instances to be copied or moved
Connector(Connector const& rhs) = delete;
Connector(Connector&& rhs) = delete;
Connector& operator=(Connector const& rhs) = delete;
Connector& operator=(Connector&& rhs) = delete;
// Send a message to a remote behaviour
ErrorType Send(std::string const& hostname_or_ip,
std::string const& port,
std::string const& message);
// Returns whether a message is available to be read
bool IsMessageAvailable(void) { return !m_messages.empty(); }
// Returns the most recent message (or an empty string if none
// are available)
std::string GetNextMessage(void);
private:
using tcp = asio::ip::tcp;
// Max number of messages to retain
inline static const int max_messages = 32;
// Encapsulate a TCP connection and the data sent over it. Note
// that this is based on the ASIO Asychronous TCP server tutorial.
class tcp_connection {
public:
using pointer = std::shared_ptr<tcp_connection>
static pointer create(asio::io_context& io_context) {
return pointer(new tcp_connection(io_context));
}
tcp::socket& socket() { return m_socket; }
std::string& data() { return m_data; }
private:
tcp_connection(asio::io_context& io_context) :
m_socket(io_context) {}
tcp::socket m_socket;
std::string m_data;
};
// Asynchronous handlers for reading and accepting connections
void handle_read(asio::error_code const& error,
std::size_t bytes_transferred,
tcp_connection::pointer connection);
void handle_write(asio::error_code const& error,
std::size_t bytes_transferred,
tcp_connection::pointer connection);
void handle_accept(tcp_connection::pointer new_connection,
asio::error_code const& error);
void start_accept();
// Member variables
asio::io_context m_io_context;
asio::ip::tcp::acceptor m_acceptor;
std::thread m_thread;
// FIFO vector to store received messages
std::vector<std::string> m_messages;
};
//
// lua_fiber_connector_action.cpp
//
#include "lua_fiber_action_connector.hpp"
#include "lua_fiber_log_manager.hpp"
Connector::Connector(unsigned short port)
: m_acceptor(m_io_context, tcp::endpoint(tcp::v4(), port)) {
start_accept();
m_thread = std::thread([this]() { m_io_context.run(); });
log_trace("Connector action starting");
}
Connector::~Connector() {
log_trace("Cleaning up in Connector action");
m_io_context.stop();
m_thread.join();
log_trace("Connector action exiting");
}
// Send a message to a remote behaviour
Connector::ErrorType Connector::Send(std::string const& host,
std::string const& port,
std::string const& message) {
// Resolve the destination endpoint
tcp::resolver resolver(m_io_context);
tcp::resolver::results_type endpoints;
try {
endpoints = resolver.resolve(host, port);
} catch (asio::system_error& e) {
log_error(
"Connector send failed: unable to resolve {}:{}",
host, port);
return ErrorType::RESOLVE_FAILED;
}
// Open a connection
tcp_connection::pointer connection =
tcp_connection::create(m_io_context);
try {
asio::connect(connection->socket(), endpoints);
} catch (asio::system_error& e) {
log_error("Connector send failed: could not connect to {}:{}",
host, port);
return ErrorType::CONNECT_FAILED;
}
asio::async_write(
connection->socket(),
asio::buffer(message),
[this, connection](const asio::error_code& error,
std::size_t bytes_transferred) {
handle_write(error, bytes_transferred, connection);
}
);
return ErrorType::SUCCESS;
}
// Returns the most recent message (or an empty string
// if none are available)
std::string Connector::GetNextMessage(void) {
if (!IsMessageAvailable())
return "";
std::string ret = m_messages.front();
m_messages.erase(m_messages.begin());
return ret;
}
// Asynchronous handlers for reading and accepting connections
void Connector::handle_read(asio::error_code const> error,
std::size_t bytes_transferred,
tcp_connection::pointer connection) {
if (!error || error == asio::error::eof) {
// Prevent the message array from growing uncontrollably
if (m_messages.size() > max_messages)
m_messages.erase(m_messages.begin());
m_messages.emplace_back(connection-> data());
log_info("Received message ({} bytes): {}",
bytes_transferred,
connection->data());
} else
log_error("Connector read error: {}", error.message());
}
// Note the 'maybe_unused' attribute for the TCP connection. This
// ensures that the underlying TCP socket is not closed until the
// write handler has exited.
void Connector::handle_write(
asio::error_code const& error,
std::size_t bytes_transferred,
[[maybe_unused]] tcp_connection::pointer connection) {
if (!error)
log_info("Sent message ({} bytes)", bytes_transferred);
else
log_error("Connector send error: {}", error.message());
}
void Connector::handle_accept(tcp_connection::pointer connection,
asio::error_code const& error) {
if (!error) {
log_debug("Accepted message connection");
asio::async_read(
connection->socket(),
asio::dynamic_buffer(connection->data()),
[this, connection](const asio::error_code& error,
std::size_t bytes_transferred) {
handle_read(error, bytes_transferred, connection);
}
);
} else
log_error("Connector accept error: {}", error.message());
start_accept();
}
void Connector::start_accept() {
tcp_connection::pointer connection =
tcp_connection::create(m_acceptor.get_executor().context());
m_acceptor.async_accept(
connection->socket(),
[this, connection](const asio::error_code& error) {
handle_accept(connection, error);
}
);
}
co_await
operator
co_await async_write(..., use_awaitable);
await_ready
: is suspend required?await_suspend
: schedule resumeawait_resume
: co_await
return resultstd::promise
//
// card_dealer.cpp
// ---------------
//
#include <coroutine>
#include <array>
#include <random>
#include <string>
#include <iostream>
template <typename T> struct generator {
struct promise_type;
using coroutine_handle = std::coroutine_handle<promise_type>
struct promise_type {
T current_value;
auto get_return_object() {
return generator{coroutine_handle::from_promise(*this)};
}
// Start 'lazily' i.e. suspend ('eagerly' == 'suspend_never').
auto initial_suspend() { return std::suspend_always{}; }
// Opportunity to publish results, signal completion etc.
// Suspend so that destroy() is called via RAII from outside
// the coroutine.
auto final_suspend() noexcept { return std::suspend_always{}; }
void unhandled_exception() { std::terminate(); }
auto yield_value(T const &value) {
current_value = value;
return std::suspend_always{};
}
};
bool next() {
return coroutine ? (coroutine.resume(), !coroutine.done())
: false;
}
T value() const { return coroutine.promise().current_value; }
// Range support, class design etc.
~generator() {
if (coroutine)
coroutine.destroy();
}
private:
generator(coroutine_handle h) : coroutine(h) {}
coroutine_handle coroutine;
};
generator<std::string> card_dealer(int deck_size) {
std::default_random_engine rng;
std::uniform_int_distribution<int> card(0, 12);
std::uniform_int_distribution<int> suit(0, 3);
std::array<std::string, 13> cards = {
"Ace", "2", "3", "4", "5", "6", "7",
"8", "9", "10", "Jack", "Queen", "King"};
std::array<std::string, 4> suits = {"Clubs", "Diamonds",
"Spades", "Hearts"};
for (int i = 0; i < deck_size; i++)
co_yield(cards[card(rng)] + " of " + suits[suit(rng)]);
}
int main() {
generator<std::string> dealer = card_dealer(100);
while (dealer.next())
std::cout << dealer.value() << std::endl;
}
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/asio/strand.hpp>
#include <iostream>
#include <thread>
#include <format>
namespace beast = boost::beast;
namespace http = beast::http;
namespace asio = boost::asio;
using tcp = boost::asio::ip::tcp;
void error(beast::error_code ec, char const* what)
{
std::cerr << std::format("Error: {} : {}\n", what, ec.message());
return;
};
// Handles an HTTP server connection
class session : public std::enable_shared_from_this<session>
{
beast::tcp_stream stream_;
beast::flat_buffer buffer_;
http::request<http::string_body> req_;
public:
// Take ownership of the stream
session(
tcp::socket&& socket)
: stream_(std::move(socket))
{ }
// Start the asynchronous operation
void run()
{
asio::dispatch(stream_.get_executor(),
beast::bind_front_handler(
&session::do_read,
shared_from_this()));
}
void do_read()
{
// Make the request empty before reading,
// otherwise the operation behavior is undefined.
req_ = {};
// Set the timeout.
stream_.expires_after(std::chrono::seconds(30));
// Read a request
http::async_read(stream_, buffer_, req_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
}
void on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if(ec == http::error::end_of_stream) {
stream_.socket().shutdown(tcp::socket::shutdown_send, ec);
return;
}
if(ec) return error(ec, "read");
// Send the response
auto handle_request = [this]() -> http::message_generator {
http::response<http::string_body> res{
http::status::ok, req_.version()};
res.set(http::field::server, "Boost.Beast");
res.body() = "Hello ACCU 2023 from the Asynchronous Server!";
res.prepare_payload();
res.keep_alive(req_.keep_alive());
return res;
};
beast::async_write(
stream_,
handle_request(),
beast::bind_front_handler(
&session::on_write, shared_from_this()));
}
void on_write(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec) return error(ec, "write");
stream_.socket().shutdown(tcp::socket::shutdown_send, ec);
// Read another request
do_read();
}
};
// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
{
asio::io_context& ioc_;
tcp::acceptor acceptor_;
public:
listener(
asio::io_context& ioc,
tcp::endpoint endpoint)
: ioc_(ioc)
, acceptor_(asio::make_strand(ioc))
{
beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if(ec) error(ec, "open");
// Allow address reuse
acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
if(ec) error(ec, "set_option");
// Bind to the server address
acceptor_.bind(endpoint, ec);
if(ec) error(ec, "bind");
// Start listening for connections
acceptor_.listen(asio::socket_base::max_listen_connections, ec);
if(ec) error(ec, "listen");
}
// Start accepting incoming connections
void run()
{
do_accept();
}
private:
void do_accept()
{
// The new connection gets its own strand
acceptor_.async_accept(
asio::make_strand(ioc_),
beast::bind_front_handler(
&listener::on_accept,
shared_from_this()));
}
void on_accept(beast::error_code ec, tcp::socket socket)
{
if(ec) error(ec, "accept");
// Create the session and run it
std::make_shared<session>(
std::move(socket))->run();
// Accept another connection
do_accept();
};
};
int main(int argc, char *argv[])
{
if (argc != 4) {
std::cerr << std::format(
"Usage: {} <ip-address> <port> <threads>\n"
"E.g.: {} 0.0.0.0 8080 2\n",
argv[0], argv[0]
);
return EXIT_FAILURE;
}
auto const address = asio::ip::make_address(argv[1]);
auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
auto const num_threads = std::max<int>(1, std::atoi(argv[3]));
asio::io_context ioc{num_threads};
// Create and launch a listening port
std::make_shared<listener>(
ioc, tcp::endpoint{address, port}
)->run();
// Run the IO service with the requested number of threads
std::vector<std::thread> v(num_threads-1);
for (auto i = num_threads - 1; i; --i)
v.emplace_back([&ioc]{ ioc.run(); });
// Use the main thread as well
ioc.run();
return EXIT_SUCCESS;
}
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <format>
namespace beast = boost::beast;
namespace http = beast::http;
namespace asio = boost::asio;
using tcp = boost::asio::ip::tcp;
// Report an error
void error(beast::error_code ec, char const* msg)
{
std::cerr << std::format("Error: {} - {}\n", msg, ec.message());
}
void do_session(
beast::tcp_stream& stream,
asio::yield_context yield)
{
beast::flat_buffer buffer;
beast::error_code ec;
for(;;)
{
// Set a timeout (in case the client stops responding)
stream.expires_after(std::chrono::seconds(30));
// Read a request
http::request<http::string_body> req;
http::async_read(stream, buffer, req, yield[ec]);
if(ec == http::error::end_of_stream) break;
if(ec) return error(ec, "read request");
// Handle the request
auto handle_request = [&req]() -> http::message_generator {
http::response<http::string_body> res{
http::status::ok, req.version()
};
res.set(http::field::server, "Beast");
res.body() = "Hello ACCU 2023 from the Stackful Coro Server!";
res.prepare_payload();
res.keep_alive(req.keep_alive());
return res;
};
// Send the response
beast::async_write(stream, handle_request(), yield[ec]);
if(ec) return error(ec, "write response");
// Determine if we should close the connection
if(!req.keep_alive()) break;
}
// Close the connection
stream.socket().shutdown(tcp::socket::shutdown_send, ec);
}
// Accepts incoming connections and launches the sessions
void do_listen(
asio::io_context& ioc,
tcp::endpoint endpoint,
asio::yield_context yield)
{
beast::error_code ec;
// Open the acceptor
tcp::acceptor acceptor(ioc);
acceptor.open(endpoint.protocol(), ec);
if(ec) return error(ec, "open");
// Allow address reuse
acceptor.set_option(asio::socket_base::reuse_address(true), ec);
if(ec) return error(ec, "set_option");
// Bind to the server address
acceptor.bind(endpoint, ec);
if(ec) return error(ec, "bind");
// Start listening for connections
acceptor.listen(asio::socket_base::max_listen_connections, ec);
if(ec) return error(ec, "listen");
for(;;)
{
tcp::socket socket(ioc);
acceptor.async_accept(socket, yield[ec]);
if(ec)
error(ec, "accept");
else
boost::asio::spawn(
acceptor.get_executor(),
std::bind(
do_session,
beast::tcp_stream(std::move(socket)),
std::placeholders::_1));
}
}
int main(int argc, char *argv[])
{
if (argc != 4) {
std::cerr << std::format(
"Usage: {} <ip-address> <port> <num_threads>\n"
"E.g.: {} 0.0.0.0 8080 2\n",
argv[0], argv[0]
);
return EXIT_FAILURE;
}
auto const address = asio::ip::make_address(argv[1]);
auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
auto const num_threads = std::max<int>(1, std::atoi(argv[3]));
asio::io_context ioc{num_threads};
// Spawn a stackful coroutine
boost::asio::spawn(ioc,
std::bind(
&do_listen,
std::ref(ioc),
tcp::endpoint{address, port},
std::placeholders::_1));
// Run the IO service with the requested number of threads
std::vector<std::thread> v(num_threads-1);
for (auto i = num_threads - 1; i; --i)
v.emplace_back([&ioc]{ ioc.run(); });
// Use the main thread as well
ioc.run();
return EXIT_SUCCESS;
}
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <format>
namespace beast = boost::beast;
namespace http = beast::http;
namespace asio = boost::asio;
using tcp = boost::asio::ip::tcp;
using tcp_stream = typename beast::tcp_stream::rebind_executor<
asio::use_awaitable_t<>::
executor_with_default<asio::any_io_executor>>::other;
// Handles an HTTP server connection
asio::awaitable<void> do_session(tcp_stream stream)
{
beast::error_code ec;
// This buffer is required to persist across reads
beast::flat_buffer buffer;
for(;;) {
try
{
// Set the timeout.
stream.expires_after(std::chrono::seconds(30));
// Read a request
http::request<http::string_body> req;
co_await http::async_read(stream, buffer, req);
// Handle the request
auto handle_request = [&req]() -> http::message_generator {
http::response<http::string_body> res{
http::status::ok,
req.version()
};
res.set(http::field::server, "Beast");
res.body() = "Hello ACCU 2023 from the Awaitable Server!";
res.prepare_payload();
res.keep_alive(req.keep_alive());
return res;
};
// Send response
co_await beast::async_write(
stream, handle_request(), asio::use_awaitable
);
// Determine if we should close the connection
if(!req.keep_alive()) break;
}
catch (boost::system::system_error & se)
{
if (se.code() != http::error::end_of_stream)
throw;
}
}
// Send a TCP shutdown
stream.socket().shutdown(tcp::socket::shutdown_send, ec);
}
// Accepts incoming connections and launches the sessions
asio::awaitable<void> do_listen(tcp::endpoint endpoint)
{
// Open the acceptor
auto acceptor = asio::use_awaitable.as_default_on(
tcp::acceptor(co_await asio::this_coro::executor)
);
acceptor.open(endpoint.protocol());
// Allow address reuse
acceptor.set_option(asio::socket_base::reuse_address(true));
// Bind to the server address
acceptor.bind(endpoint);
// Start listening for connections
acceptor.listen(asio::socket_base::max_listen_connections);
for(;;)
boost::asio::co_spawn(
acceptor.get_executor(),
do_session(tcp_stream(co_await acceptor.async_accept())),
[](std::exception_ptr e)
{
try
{
if (e) std::rethrow_exception(e);
}
catch (std::exception &e) {
std::cerr << "Error in session: " << e.what() << "\n";
}
}
);
}
int main(int argc, char* argv[])
{
// Check command line arguments.
if (argc != 4) {
std::cerr << std::format(
"Usage: {} <ip-address> <port> <threads>\n"
"E.g.: {} 0.0.0.0 8080 1\n",
argv[0], argv[0]
);
return EXIT_FAILURE;
}
auto const address = asio::ip::make_address(argv[1]);
auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
auto const num_threads = std::max<int>(1, std::atoi(argv[3]));
// The io_context is required for all I/O
asio::io_context ioc{num_threads};
// Spawn a listening port
boost::asio::co_spawn(ioc,
do_listen(tcp::endpoint{address, port}),
[](std::exception_ptr e)
{
try
{
if (e) std::rethrow_exception(e);
}
catch(std::exception & e)
{
std::cerr << "Error in acceptor: " << e.what() << "\n";
}
}
);
// Run the I/O service on the requested number of threads
std::vector<std::thread> v(num_threads-1);
for (auto i = num_threads - 1; i; --i)
v.emplace_back([&ioc]{ ioc.run(); });
// Use the main thread as well
ioc.run();
return EXIT_SUCCESS;
}
std::generator
std::ranges::input_range
std::execution
http://www.james-pascoe.com
james@james-pascoe.com
http://jamespascoe.github.io/accu2023
https://github.com/jamespascoe/accu2023-example-code.git
ACCU Bristol and Bath Meetup Coordinator