Async from scratch 2: Wake me maybe
So. You've read my last post.
You got inspired. Excited, even. Deployed
SimpleFuture
to
production. Spun up a few worker threads to share the load. Called it a
friday. This is Rust after all, what could go wrong?
...aaand then someone took a look at the CPU usage.

Maybe we should look into some of those asterisks we left unresolved, after all. We won't get through all of them today1, but we've got to start somewhere.
When is poll
o'clock, anyway?
So this is the part where I start to pull back the curtain, and unravel
the first lie: that
poll
is only responsible for one job (attempting to make progress).
It actually has a secret second job: to ensure that whatever is running
the Future
is notified the next time that it would make sense to
poll
it again. This is where
wakers
(and, by extension, the
Context
that I handwaved away before) come in.2 It looks, roughly3, like
this:
use std::sync::Arc;
trait Wake: Send + Sync {
// If you haven't seen `self: Foo<Self>` before, it lets you define methods that apply to certain wrapper types instead.
// If it helps, `&self` is the same as `self: &Self`.
fn wake(self: Arc<Self>);
}
struct Context {
waker: Arc<dyn Wake>,
// and some other stuff we don't really care about right now
}
The Future
is responsible for ensuring that
wake
is called once there is something new to do, and the runtime is free to
not bother polling the Future
until that happens.
To manage this, we'll need to change our (Simple)Future
trait to
propagate the context:
use std::task::Poll;
trait SleepyFuture<Output> {
fn poll(
&mut self,
// Our new and shiny
context: &mut Context,
) -> Poll<Output>;
}
We've got to walk sleep before we can run
Now, our old runner is
still basically legal.4 We could just keep polling constantly and
provide a no-op Wake
and to shut the compiler up. It's always fine to
poll our Future
without being awoken.. the Future
just can't rely
on it.
struct InsomniacWaker;
impl Wake for InsomniacWaker {
fn wake(self: Arc<Self>) {
// Who needs to wake up if you never managed to fall asleep?
}
}
fn insomniac_runner<Output, F: SleepyFuture<Output>>(mut fut: F) -> Output {
let mut context = Context {
waker: Arc::new(InsomniacWaker),
};
loop {
if let Poll::Ready(out) = fut.poll(&mut context) {
return out;
}
}
}
But... that's not particularly useful. We're passing around the context now, but.. we're still burning all that CPU time.
Instead, we should provide a Waker
that pauses the thread when there
is nothing to do:
use std::sync::{Condvar, Mutex};
#[derive(Default)]
struct SleepWaker {
awoken: Mutex<bool>,
wakeup_cond: Condvar,
}
impl SleepWaker {
fn sleep_until_awoken(&self) {
let mut awoken = self.wakeup_cond
.wait_while(
self.awoken.lock().unwrap(),
|awoken| !*awoken,
)
.unwrap();
*awoken = false;
}
}
impl Wake for SleepWaker {
fn wake(self: Arc<Self>) {
*self.awoken.lock().unwrap() = true;
self.wakeup_cond.notify_one();
}
}
Condvar
s
are a whole rabbit hole of their own, but the idea here is basically
that Condvar::wait_while
runs some test on a Mutex
-locked value
every time notify_one
is called (as well as on the initial
wait_while
call), but unlocks the Mutex
in between5.
sleep_until_awoken
waits for wake
to be called, and then resets
itself so that it's ready for the next call.6
Now we just need to change our runner to call sleep_until_awoken
between each poll
:
fn run_sleepy_future<Output, F: SleepyFuture<Output>>(mut fut: F) -> Output {
let waker = Arc::<SleepWaker>::default();
let mut context = Context { waker: waker.clone() };
loop {
match fut.poll(&mut context) {
Poll::Ready(out) => return out,
Poll::Pending => waker.sleep_until_awoken(),
}
}
}
Just to be sure.. let's try it out before continuing. To make sure that our wakeup works, and that we're actually sleeping when we can:
struct ImmediatelyAwoken(bool);
impl SleepyFuture<()> for ImmediatelyAwoken {
fn poll(&mut self, context: &mut Context) -> Poll<()> {
if self.0 {
Poll::Ready(())
} else {
self.0 = true;
context.waker.clone().wake();
Poll::Pending
}
}
}
struct BackgroundAwoken(bool);
impl SleepyFuture<()> for BackgroundAwoken {
fn poll(&mut self, context: &mut Context) -> Poll<()> {
if self.0 {
Poll::Ready(())
} else {
self.0 = true;
let waker = context.waker.clone();
std::thread::spawn(|| {
std::thread::sleep(std::time::Duration::from_millis(200));
waker.wake();
});
Poll::Pending
}
}
}
let before_immediate = std::time::Instant::now();
run_sleepy_future(ImmediatelyAwoken(false));
println!("=> immediate: {:?}", before_immediate.elapsed());
let before_background = std::time::Instant::now();
run_sleepy_future(BackgroundAwoken(false));
println!("=> background: {:?}", before_background.elapsed());
=> immediate: 7µs
=> background: 200.148889ms
Whew! That looks reasonable to me, at least. Let's move on, before the
eye of Sauron insomnia sees us...

Return of the combinators
This also "just works" for most
combinators, as long
as they make sure to pass the Context
down the tree. Here's the the
with_timeout
example
from last time; all we need to change is adding the context
arguments
and search/replacing7 SimpleFuture
-> SleepyFuture
:
use std::task::ready;
struct Timeout<F> {
inner: F,
polls_left: u8,
}
#[derive(Debug)]
struct TimedOut;
impl<F, Output> SleepyFuture<Result<Output, TimedOut>> for Timeout<F>
where
F: SleepyFuture<Output>,
{
fn poll(
&mut self,
context: &mut Context,
) -> Poll<Result<Output, TimedOut>> {
match self.polls_left.checked_sub(1) {
Some(x) => self.polls_left = x,
None => return Poll::Ready(Err(TimedOut)),
}
let inner = ready!(self.inner.poll(context));
Poll::Ready(Ok(inner))
}
}
fn with_timeout<F, Output>(
inner: F,
max_polls: u8,
) -> impl SleepyFuture<Result<Output, TimedOut>>
where
F: SleepyFuture<Output>,
{
Timeout {
inner,
polls_left: max_polls,
}
}
Sleepy I/O (or: Showing our Interest
)
But this has all been (relatively) easy mode. It's all useless, if we
aren't actually woken up for our I/O routines. Sadly... operating
systems don't officially support our (or Rust's) Wake
trait out of
the box.
Building on our old
TcpRead
example from last time, the Future
itself is still pretty
simple:
use std::{io::Read, net::TcpStream};
fn wake_when_readable(
socket: &mut std::net::TcpStream,
context: &mut Context,
) { todo!() }
struct TcpRead<'a> {
socket: &'a mut TcpStream,
buffer: &'a mut [u8],
}
impl SleepyFuture<usize> for TcpRead<'_> {
fn poll(&mut self, context: &mut Context) -> Poll<usize> {
match self.socket.read(self.buffer) {
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
wake_when_readable(self.socket, context);
Poll::Pending
}
size => Poll::Ready(size.unwrap()),
}
}
}
But.. uh.. how on earth do we define wake_when_readable
? That.. is
going to have to depend on your operating system, and is going outside
of what the Rust standard library really provides for us.
Here in Linux-land8, the9 API for this is
epoll
. It still
blocks, but it lets us ask the operating system to unpark us when
any of a set of "files"10 are ready. In the Rust world, we can
access this using the
nix crate, which
provides a safe but otherwise 1:1 mapping to the system API.11
The epoll
API is fairly simple to use: we need to create an
Epoll
,
register
the
events12
that we care about, and then
wait
for some events to occur. wait
returns when any of the registered
event(s) have occurred. When we're done, we
unregister
the event.
Now, in theory, we could wait
from our main loop.
It's not like it has anything better to do while it's waiting anyway.
But wakes could come from anywhere, not just direct I/O events.13
And we need to handle all of them. So that's out.
So, instead, we'll shove this off to a secondary I/O driver thread, which translates our epoll events into wakes. Which we already know how to handle!14
use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent};
use std::{collections::BTreeMap, sync::LazyLock};
static EPOLL: LazyLock<Epoll> =
LazyLock::new(|| Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC).unwrap());
static REGISTERED_WAKERS: Mutex<BTreeMap<u64, Arc<dyn Wake>>> = Mutex::new(BTreeMap::new());
fn io_driver() {
let mut events = [EpollEvent::empty(); 16];
loop {
let event_count = EPOLL.wait(&mut events, 1000u16).unwrap();
let wakers = REGISTERED_WAKERS.lock().unwrap();
for event in &events[..event_count] {
let waker_id = event.data();
if let Some(waker) = wakers.get(&waker_id) {
waker.clone().wake();
} else {
// This could also be an "innocent" race condition,
// if the event is delivered just as we're deregistering a waker.
println!("=> (Waker {waker_id} not found, bug?)")
}
}
}
}
Then, we need some way to register an interest in a "file" (and
unregister it when it isn't needed anymore). This just ensures that
it'll be seen by our io_driver
:
use nix::sys::epoll::EpollFlags;
use std::{ops::RangeFrom, os::fd::AsFd};
// We need some unique ID for each reason to be awoken..
// In reality you'd probably want some way to reuse these.
static NEXT_WAKER_ID: Mutex<RangeFrom<u64>> = Mutex::new(0..);
struct Interest<T: AsFd> {
// Interest needs to own the file (or borrow it),
// to make sure that the file stays alive for as long as our interest does.
fd: T,
registered_waker_id: Option<u64>,
}
impl<T: AsFd> Interest<T> {
fn new(fd: T) -> Self {
Interest {
fd,
registered_waker_id: None,
}
}
fn register(&mut self, mut flags: EpollFlags, context: &mut Context) {
let is_new = self.registered_waker_id.is_none();
let id = *self
.registered_waker_id
.get_or_insert_with(|| NEXT_WAKER_ID.lock().unwrap().next().unwrap());
REGISTERED_WAKERS
.lock()
.unwrap()
.insert(id, context.waker.clone());
// It's enough to get awoken once - if the Future is still interested then it should call `register`
// to renew its interest.
flags |= EpollFlags::EPOLLONESHOT;
let mut event = EpollEvent::new(flags, id);
if is_new {
EPOLL.add(&self.fd, event).unwrap()
} else {
EPOLL.modify(&self.fd, &mut event).unwrap()
}
}
}
impl<T: AsFd> Drop for Interest<T> {
fn drop(&mut self) {
if let Some(id) = self.registered_waker_id {
// what if we have multiple interests open on the same fd? (read+write? multiple reads?)
EPOLL.delete(&self.fd).unwrap();
REGISTERED_WAKERS.lock().unwrap().remove(&id).unwrap();
}
}
}
Finally, we can slot this all into our TcpRead
.. we'll need to change
it slightly to keep the Interest
's state, but.. it should still be
recognizable enough:
use std::{io::Read, net::TcpStream};
struct TcpRead<'a> {
socket: Interest<&'a mut TcpStream>,
buffer: &'a mut [u8],
}
impl SleepyFuture<usize> for TcpRead<'_> {
fn poll(&mut self, context: &mut Context) -> Poll<usize> {
match self.socket.fd.read(self.buffer) {
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// EPOLLIN is the event for when we're allowed to read
// from the "file".
self.socket.register(EpollFlags::EPOLLIN, context);
Poll::Pending
}
size => Poll::Ready(size.unwrap()),
}
}
}
Finally, we can put all the parts back together, and test it all against our old friend, the luggage code server:
std::thread::spawn(io_driver);
let luggage_code_server_address = "127.0.0.1:9191";
let mut socket = TcpStream::connect(luggage_code_server_address).unwrap();
socket.set_nonblocking(true).unwrap();
let mut buffer = [0; 16];
let received = run_sleepy_future(
// Let's limit the number of poll()s to make sure we're not cheating!
// This is just for demonstration; remember that the runtime is /allowed/
// to poll as often as it wants to.
with_timeout(
TcpRead {
socket: Interest::new(&mut socket),
buffer: &mut buffer,
},
// One initial poll to establish interest, then one poll once the data is ready for us.
2,
),
).unwrap();
println!("=> The luggage code is {:?}", &buffer[..received]);
=> The luggage code is [1, 2, 3, 4, 5]
Success! We're back to where we started.. but at least our computer15 can rest a bit easier.
Another wrap... for now
Hopefully, you have a bit more of an idea about what that weird
Context
thing is now.
But we're still not quite back at the real Future
trait16. So.. the next
entry will be about just that: clearing up the remaining concepts we
need to understand to be able to read the Future
.17
It's a series for a reason, after all.
Actually, Context
contains a Waker
instead. It's close
enough to our Arc<dyn Wake>
, but doesn't require using Arc
if
you're okay maintaining your own vtable. If the word "vtable"
tells you nothing, just
implementWake
and be done with it. If the word "vtable" does tell you
something... you should probably still just implement Wake
and be
done with it.
It's not self-plagiarism if we cite it! Oh, and I guess it fulfills the type contracts, too...
Otherwise, we'd never be able to modify the Mutex
-guarded
value!
If this looks like we're just reimplementing Condvar
.. we
are, kind of. Except Condvar::wait
isn't specified to return
immediately if notify_one
was was called before wait
. That's
a problem for us; it would silently prevent the Future
from waking
itself during the poll
.
I hear verbing is so hot this year. Can we verb a nouned verb?
Cross-platform support is left as an exercise for the reader.
Enjoy!
Not the only API, there are others. But it's the one that hits
the "standard" tradeoff between not being
too
slow or too
experimental.
Maybe io_uring
will be everywhere in a few years, when you're the
one writing the "Everything natkr got wrong" article. When you do,
please send it to me, I look forward to
reading it!
In the Unixy sense where "everything" is a "file", including network sockets.
For anyone following along at home, I'm going to be using nix
v0.29.0, since that's the latest version when I'm writing this.
It would've been nice if we could link to specifically the list of event flags, but alas...
Timers, background threads, and so on...
Sometimes this is called a reactor.
And power bill.
It'd sure be helpful if it ever felt like standing up.
Spoiler: This means
Pin
and associated types.