Jim (James) Pascoe
http://www.james-pascoe.com
james@james-pascoe.com
http://jamespascoe.github.io/accu2022
https://github.com/jamespascoe/accu2022-example-code.git
luaposix
and luasocket
sudo luarocks install luaposix
sudo luarocks install luasocket
//
// echo_server_blocking.cpp
//
#include <boost/asio.hpp>
#include <iostream>
using boost::asio::buffer;
using boost::asio::io_context;
using boost::asio::ip::tcp;
using boost::system::error_code;
static const int buf_len = 1000;
int main() {
try {
io_context ctx;
tcp::acceptor acceptor(ctx, tcp::endpoint(tcp::v4(), 6666));
for (;;) {
tcp::socket peer_socket(ctx);
acceptor.accept(peer_socket);
std::array<char, buf_len> buf;
for (;;) {
error_code error;
std::size_t len =
peer_socket.read_some(buffer(buf), error);
if (error == boost::asio::error::eof)
break;
write(peer_socket, buffer(buf, len));
}
}
} catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << "\n";
}
}
//
// echo_server_async.cpp
//
#include <boost/asio.hpp>
#include <iostream>
using boost::asio::buffer;
using boost::asio::io_context;
using boost::asio::ip::tcp;
using boost::system::error_code;
static const int buf_len = 1000;
void accept_handler(error_code const &error,
tcp::socket &peer_socket,
std::array<char, buf_len> &buf,
tcp::acceptor &acceptor);
void read_handler(error_code const &error,
std::size_t bytes_transferred,
tcp::socket &peer_socket,
std::array<char, buf_len> &buf,
tcp::acceptor &acceptor);
void write_handler(error_code const &error, std::size_t length,
tcp::socket &peer_socket,
std::array<char, buf_len> &buf,
tcp::acceptor &acceptor) {
if (!error) {
peer_socket.async_read_some(
buffer(buf, length), [&](error_code const &error,
std::size_t bytes_transferred) {
read_handler(error, bytes_transferred, peer_socket, buf,
acceptor);
});
} else
std::cerr << "Write handler error: " << error.message()
<< std::endl;
}
void read_handler(error_code const &error,
std::size_t bytes_transferred,
tcp::socket &peer_socket,
std::array<char, buf_len> &buf,
tcp::acceptor &acceptor) {
if (!error) {
async_write(peer_socket, buffer(buf, bytes_transferred),
[&](error_code const &error, std::size_t length) {
write_handler(error, length, peer_socket, buf,
acceptor);
});
} else {
if (error == boost::asio::error::eof) {
peer_socket.close();
acceptor.async_accept(
peer_socket, [&](error_code const &error) {
accept_handler(error, peer_socket, buf, acceptor);
});
} else
std::cerr << "Read handler error: " << error.message()
<< std::endl;
}
}
void accept_handler(error_code const &error,
tcp::socket &peer_socket,
std::array<char, buf_len> &buf,
tcp::acceptor &acceptor) {
if (!error) {
peer_socket.async_read_some(
buffer(buf, buf_len), [&](error_code const &error,
std::size_t bytes_transferred) {
read_handler(error, bytes_transferred, peer_socket, buf,
acceptor);
});
} else
std::cerr << "Accept handler error: " << error.message()
<< std::endl;
}
int main() {
io_context ctx;
tcp::socket peer_socket(ctx);
tcp::acceptor acceptor(ctx, tcp::endpoint(tcp::v4(), 6666));
std::array<char, buf_len> buf;
acceptor.async_accept(peer_socket, [&](error_code const &error) {
accept_handler(error, peer_socket, buf, acceptor);
});
try {
ctx.run();
} catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << "\n";
}
}
//
// echo_server_coroutine.cpp
//
#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <iostream>
using boost::asio::buffer;
using boost::asio::detached;
using boost::asio::io_context;
using boost::asio::use_awaitable;
using boost::asio::experimental::as_tuple;
using boost::asio::ip::tcp;
using boost::system::error_code;
boost::asio::awaitable<void> echo(tcp::socket peer_socket,
tcp::acceptor acceptor) {
std::array<char, 1000> buf;
for (;;) {
co_await acceptor.async_accept(peer_socket, use_awaitable);
for (;;) {
auto [error, len] = co_await peer_socket.async_read_some(
buffer(buf), as_tuple(use_awaitable));
if (error == boost::asio::error::eof)
break;
co_await async_write(peer_socket, buffer(buf, len),
use_awaitable);
}
peer_socket.close();
}
}
int main() {
try {
io_context ctx;
tcp::socket socket(ctx);
tcp::acceptor acceptor(ctx, tcp::endpoint(tcp::v4(), 6666));
co_spawn(ctx, echo(std::move(socket), std::move(acceptor)),
detached);
ctx.run();
} catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
-- Connect to the peer and send messages read from stdin
function sender (host, port)
while true do
local remote, err = socket.connect(host, port)
while not remote do
coroutine.yield()
remote, err = socket.connect(host, port)
end
print("Connected to " .. host .. ":" .. port)
while err ~= "closed" do
-- Read from stdin (non-blocking - 1s timeout)
local ret = require "posix".rpoll(0, 1000)
if (ret == 1) then
local message = io.read()
if (message ~= "") then
_, err = remote:send(message .. "\n")
end
else -- read timeout: update connection status
_, err = remote:send("\0")
end
coroutine.yield()
end
end
end
-- Receive messages from our peer and print them
function receiver (port)
local server = assert(socket.bind("*", port))
server:settimeout(0.1) -- set non-blocking (100 ms timeout)
while true do
local _, port = server:getsockname()
print("Waiting for connection on port " .. port);
local client, err = server:accept()
if (not client and err == "timeout") then
coroutine.yield()
else
local peer_ip, peer_port = client:getpeername()
client:send("Connected to LuaChat!\n")
client:settimeout(0.1)
while err ~= "closed" do
local line
line, err = client:receive("*l")
if not err then
print(
string.format("%s:%d> %s", peer_ip, peer_port, line)
)
else
coroutine.yield()
end
end
end
end
end
function dispatcher (coroutines)
while true do
if next(coroutines) == nil then break end -- no more coroutines
for name, co in pairs(coroutines) do
local status, res = coroutine.resume(co)
if res then -- coroutine has returned a result (i.e. finished)
if type(res) == "string" then -- runtime error
print("Lua coroutine '" .. name ..
"' has exited with error: " .. res)
else
print("Lua coroutine '" .. name .. "' exited")
end
coroutines[name] = nil
end
end
end
end
local port, remote_ip, remote_port = tonumber(arg[1]),
arg[2],
tonumber(arg[3])
print(
string.format("Starting LuaChat:\n" ..
" local port: %d\n" ..
" remote IP: %s\n" ..
" remote port: %d\n\n", port, remote_ip, remote_port)
)
-- Create co-routines
local coroutines = {}
coroutines["receiver"] = coroutine.create(receiver)
coroutine.resume(coroutines["receiver"], port)
coroutines["sender"] = coroutine.create(sender)
coroutine.resume(coroutines["sender"], remote_ip, remote_port)
-- Run the main loop
dispatcher(coroutines)
co_await
operator
co_await async_write(..., use_awaitable);
await_ready
: is suspend required?await_suspend
: schedule resumeawait_resume
: co_await
return resultstd::promise
await_transform
method:promise_type
co_await
on certain typesco_yield
//
// card_dealer.cpp
// ---------------
//
#include <coroutine>
#include <array>
#include <random>
#include <string>
#include <iostream>
template <typename T> struct generator {
struct promise_type;
using coroutine_handle = std::coroutine_handle<promise_type>
struct promise_type {
T current_value;
auto get_return_object() {
return generator{coroutine_handle::from_promise(*this)};
}
// Start 'lazily' i.e. suspend ('eagerly' == 'suspend_never').
auto initial_suspend() { return std::suspend_always{}; }
// Opportunity to publish results, signal completion etc.
// Suspend so that destroy() is called via RAII from outside
// the coroutine.
auto final_suspend() noexcept { return std::suspend_always{}; }
void unhandled_exception() { std::terminate(); }
auto yield_value(T const &value) {
current_value = value;
return std::suspend_always{};
}
};
bool next() {
return coroutine ? (coroutine.resume(), !coroutine.done())
: false;
}
T value() const { return coroutine.promise().current_value; }
// Range support, class design etc.
~generator() {
if (coroutine)
coroutine.destroy();
}
private:
generator(coroutine_handle h) : coroutine(h) {}
coroutine_handle coroutine;
};
generator<std::string> card_dealer(int deck_size) {
std::default_random_engine rng;
std::uniform_int_distribution<int> card(0, 12);
std::uniform_int_distribution<int> suit(0, 3);
std::array<std::string, 13> cards = {
"Ace", "2", "3", "4", "5", "6", "7",
"8", "9", "10", "Jack", "Queen", "King"};
std::array<std::string, 4> suits = {"Clubs", "Diamonds",
"Spades", "Hearts"};
for (int i = 0; i < deck_size; i++)
co_yield(cards[card(rng)] + " of " + suits[suit(rng)]);
}
int main() {
generator<std::string> dealer = card_dealer(100);
while (dealer.next())
std::cout << dealer.value() << std::endl;
}
co_await
class chat {
public:
chat(char *addr, char *port, char *remote_addr,
char *remote_port) {
tcp::resolver resolver(ctx);
basic_resolver_entry<tcp> listen_endpoint;
basic_resolver_entry<tcp> remote_endpoint;
listen_endpoint = *resolver.resolve(addr, port, passive);
remote_endpoint = *resolver.resolve(remote_addr, remote_port);
co_spawn(ctx, sender(remote_endpoint), detached);
co_spawn(ctx, receiver(listen_endpoint), detached);
ctx.run();
}
private:
awaitable<void> sender(tcp::endpoint remote) {
// ...
}
awaitable<void> receiver(tcp::endpoint listen) {
// ...
}
// Member variables (shared between coroutines)
io_context ctx;
bool connected = false;
};
awaitable<void> sender(tcp::endpoint remote) {
for (;;) {
tcp::socket remote_sock(ctx);
auto [error] = co_await remote_sock.async_connect(
remote, as_tuple(use_awaitable));
if (!error) {
std::cout << "Connected to: " << remote << std::endl;
connected = true;
} else {
std::cout << "Could not connect to: " << remote
<< " - retrying in 1s " << std::endl;
steady_timer timer(
co_await boost::asio::this_coro::executor);
timer.expires_after(std::chrono::seconds(1));
co_await timer.async_wait(use_awaitable);
continue;
}
std::string data;
while (connected) {
// Read a string from stdin (non-blocking)
struct pollfd input[1] = {{.fd = 0, .events = POLLIN}};
if (poll(input, 1, 100 /* timeout in ms */)) {
char c;
while (std::cin.get(c) && c != '\n')
data += c;
data += "\r\n";
}
co_await async_write(remote_sock, buffer(data),
as_tuple(use_awaitable));
data.clear();
}
}
}
awaitable<void> receiver(tcp::endpoint listen) {
tcp::acceptor acceptor(ctx, listen);
for (;;) {
auto [error, client] =
co_await acceptor.async_accept(as_tuple(use_awaitable));
if (!error) {
std::string data;
for (;;) {
auto [error, len] = co_await async_read_until(
client, dynamic_buffer(data), boost::regex("\r\n"),
as_tuple(use_awaitable));
if (error == boost::asio::error::eof) {
// remote has disconnected
connected = false;
break;
}
std::cout << client.remote_endpoint() << "> " << data;
data.clear();
}
} else {
std::cerr << "Accept failed: " << error.message() << "\n";
}
}
}
#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/regex.hpp>
#include <iostream>
using boost::asio::awaitable;
using boost::asio::buffer;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::io_context;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using boost::asio::experimental::as_tuple;
using boost::asio::ip::basic_resolver_entry;
using boost::asio::ip::tcp;
using boost::asio::ip::tcp::resolver::passive;
using boost::system::error_code;
class chat {
public:
chat(char *addr, char *port, char *remote_addr,
char *remote_port) {
tcp::resolver resolver(ctx);
basic_resolver_entry<tcp> listen_endpoint;
basic_resolver_entry<tcp> remote_endpoint;
listen_endpoint = *resolver.resolve(addr, port, passive);
remote_endpoint = *resolver.resolve(remote_addr, remote_port);
co_spawn(ctx, sender(remote_endpoint), detached);
co_spawn(ctx, receiver(listen_endpoint), detached);
ctx.run();
}
private:
awaitable<void> sender(tcp::endpoint remote) {
for (;;) {
tcp::socket remote_sock(ctx);
auto [error] = co_await remote_sock.async_connect(
remote, as_tuple(use_awaitable));
if (!error) {
std::cout << "Connected to: " << remote << std::endl;
connected = true;
} else {
std::cout << "Could not connect to: " << remote
<< " - retrying in 1s " << std::endl;
steady_timer timer(
co_await boost::asio::this_coro::executor);
timer.expires_after(std::chrono::seconds(1));
co_await timer.async_wait(use_awaitable);
continue;
}
std::string data;
while (connected) {
// Read a string from stdin (non-blocking)
struct pollfd input[1] = {{.fd = 0, .events = POLLIN}};
if (poll(input, 1, 100 /* timeout in ms */)) {
char c;
while (std::cin.get(c) && c != '\n')
data += c;
data += "\r\n";
}
co_await async_write(remote_sock, buffer(data),
as_tuple(use_awaitable));
data.clear();
}
}
}
awaitable<void> receiver(tcp::endpoint listen) {
tcp::acceptor acceptor(ctx, listen);
for (;;) {
auto [error, client] =
co_await acceptor.async_accept(as_tuple(use_awaitable));
if (!error) {
std::string data;
for (;;) {
auto [error, len] = co_await async_read_until(
client, dynamic_buffer(data), boost::regex("\r\n"),
as_tuple(use_awaitable));
if (error == boost::asio::error::eof) {
// remote has disconnected
connected = false;
break;
}
std::cout << client.remote_endpoint() << "> " << data;
data.clear();
}
} else {
std::cerr << "Accept failed: " << error.message() << "\n";
}
}
}
// Member variables (shared between coroutines)
io_context ctx;
bool connected = false;
};
int main(int argc, char *argv[]) {
try {
if (argc != 5) {
std::cerr << "Usage: " << argv[0];
std::cerr << " <listen_address> <listen_port>";
std::cerr << " <remote_address> <remote_port>\n";
return 1;
}
chat(argv[1], argv[2], argv[3], argv[4]);
} catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << "\n";
}
}
http://www.james-pascoe.com
james@james-pascoe.com
http://jamespascoe.github.io/accu2022
https://github.com/jamespascoe/accu2022-example-code.git