So. You've read my last post. You got inspired. Excited, even. Deployed SimpleFuture to production. Spun up a few worker threads to share the load. Called it a friday. This is Rust after all, what could go wrong?

...aaand then someone took a look at the CPU usage.

A screenshot of htop showing the process "target/release/awesome-luggage-code-server" keeping 24 cores busy.
Oops. Good thing we're not paying for those CPU hours anyway, right?

Maybe we should look into some of those asterisks we left unresolved, after all. We won't get through all of them today1, but we've got to start somewhere.

When is poll o'clock, anyway?

So this is the part where I start to pull back the curtain, and unravel the first lie: that poll is only responsible for one job (attempting to make progress).

It actually has a secret second job: to ensure that whatever is running the Future is notified the next time that it would make sense to poll it again. This is where wakers (and, by extension, the Context that I handwaved away before) come in.2 It looks, roughly3, like this:

use std::sync::Arc;

trait Wake: Send + Sync {
    // If you haven't seen `self: Foo<Self>` before, it lets you define methods that apply to certain wrapper types instead.
    // If it helps, `&self` is the same as `self: &Self`.
    fn wake(self: Arc<Self>);
}
struct Context {
    waker: Arc<dyn Wake>,
    // and some other stuff we don't really care about right now
}

The Future is responsible for ensuring that wake is called once there is something new to do, and the runtime is free to not bother polling the Future until that happens.

To manage this, we'll need to change our (Simple)Future trait to propagate the context:

use std::task::Poll;

trait SleepyFuture<Output> {
    fn poll(
        &mut self,
        // Our new and shiny
        context: &mut Context,
    ) -> Poll<Output>;
}

We've got to walk sleep before we can run

Now, our old runner is still basically legal.4 We could just keep polling constantly and provide a no-op Wake and to shut the compiler up. It's always fine to poll our Future without being awoken.. the Future just can't rely on it.

struct InsomniacWaker;
impl Wake for InsomniacWaker {
    fn wake(self: Arc<Self>) {
        // Who needs to wake up if you never managed to fall asleep?
    }
}

fn insomniac_runner<Output, F: SleepyFuture<Output>>(mut fut: F) -> Output {
    let mut context = Context {
        waker: Arc::new(InsomniacWaker),
    };
    loop {
        if let Poll::Ready(out) = fut.poll(&mut context) {
            return out;
        }
    }
}

But... that's not particularly useful. We're passing around the context now, but.. we're still burning all that CPU time.

Instead, we should provide a Waker that pauses the thread when there is nothing to do:

use std::sync::{Condvar, Mutex};

#[derive(Default)]
struct SleepWaker {
    awoken: Mutex<bool>,
    wakeup_cond: Condvar,
}
impl SleepWaker {
    fn sleep_until_awoken(&self) {
        let mut awoken = self.wakeup_cond
            .wait_while(
                self.awoken.lock().unwrap(),
                |awoken| !*awoken,
            )
            .unwrap();
        *awoken = false;
    }
}
impl Wake for SleepWaker {
    fn wake(self: Arc<Self>) {
        *self.awoken.lock().unwrap() = true;
        self.wakeup_cond.notify_one();
    }
}

Condvars are a whole rabbit hole of their own, but the idea here is basically that Condvar::wait_while runs some test on a Mutex-locked value every time notify_one is called (as well as on the initial wait_while call), but unlocks the Mutex in between5. sleep_until_awoken waits for wake to be called, and then resets itself so that it's ready for the next call.6

Now we just need to change our runner to call sleep_until_awoken between each poll:

fn run_sleepy_future<Output, F: SleepyFuture<Output>>(mut fut: F) -> Output {
    let waker = Arc::<SleepWaker>::default();
    let mut context = Context { waker: waker.clone() };
    loop {
        match fut.poll(&mut context) {
            Poll::Ready(out) => return out,
            Poll::Pending => waker.sleep_until_awoken(),
        }
    }
}

Just to be sure.. let's try it out before continuing. To make sure that our wakeup works, and that we're actually sleeping when we can:

struct ImmediatelyAwoken(bool);
impl SleepyFuture<()> for ImmediatelyAwoken {
    fn poll(&mut self, context: &mut Context) -> Poll<()> {
        if self.0 {
            Poll::Ready(())
        } else {
            self.0 = true;
            context.waker.clone().wake();
            Poll::Pending
        }
    }
}

struct BackgroundAwoken(bool);
impl SleepyFuture<()> for BackgroundAwoken {
    fn poll(&mut self, context: &mut Context) -> Poll<()> {
        if self.0 {
            Poll::Ready(())
        } else {
            self.0 = true;
            let waker = context.waker.clone();
            std::thread::spawn(|| {
                std::thread::sleep(std::time::Duration::from_millis(200));
                waker.wake();
            });
            Poll::Pending
        }
    }
}

let before_immediate = std::time::Instant::now();
run_sleepy_future(ImmediatelyAwoken(false));
println!("=> immediate: {:?}", before_immediate.elapsed());

let before_background = std::time::Instant::now();
run_sleepy_future(BackgroundAwoken(false));
println!("=> background: {:?}", before_background.elapsed());
=> immediate: 7µs
=> background: 200.148889ms

Whew! That looks reasonable to me, at least. Let's move on, before the eye of Sauron insomnia sees us...

A screenshot of a Detector Tower from the video game Helldivers 2, affectionately known as an "Eye of Sauron" for resembling a mechanical version of the Lord of the Rings "character".
We have enough problems as it is.

Return of the combinators

This also "just works" for most combinators, as long as they make sure to pass the Context down the tree. Here's the the with_timeout example from last time; all we need to change is adding the context arguments and search/replacing7 SimpleFuture -> SleepyFuture:

use std::task::ready;

struct Timeout<F> {
    inner: F,
    polls_left: u8,
}
#[derive(Debug)]
struct TimedOut;
impl<F, Output> SleepyFuture<Result<Output, TimedOut>> for Timeout<F>
where
    F: SleepyFuture<Output>,
{
    fn poll(
        &mut self,
        context: &mut Context,
    ) -> Poll<Result<Output, TimedOut>> {
        match self.polls_left.checked_sub(1) {
            Some(x) => self.polls_left = x,
            None => return Poll::Ready(Err(TimedOut)),
        }
        let inner = ready!(self.inner.poll(context));
        Poll::Ready(Ok(inner))
    }
}
fn with_timeout<F, Output>(
    inner: F,
    max_polls: u8,
) -> impl SleepyFuture<Result<Output, TimedOut>>
where
    F: SleepyFuture<Output>,
{
    Timeout {
        inner,
        polls_left: max_polls,
    }
}

Sleepy I/O (or: Showing our Interest)

But this has all been (relatively) easy mode. It's all useless, if we aren't actually woken up for our I/O routines. Sadly... operating systems don't officially support our (or Rust's) Wake trait out of the box.

Building on our old TcpRead example from last time, the Future itself is still pretty simple:

use std::{io::Read, net::TcpStream};

fn wake_when_readable(
    socket: &mut std::net::TcpStream,
    context: &mut Context,
) { todo!() }

struct TcpRead<'a> {
    socket: &'a mut TcpStream,
    buffer: &'a mut [u8],
}
impl SleepyFuture<usize> for TcpRead<'_> {
    fn poll(&mut self, context: &mut Context) -> Poll<usize> {
        match self.socket.read(self.buffer) {
            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
                wake_when_readable(self.socket, context);
                Poll::Pending
            }
            size => Poll::Ready(size.unwrap()),
        }
    }
}

But.. uh.. how on earth do we define wake_when_readable? That.. is going to have to depend on your operating system, and is going outside of what the Rust standard library really provides for us.

Here in Linux-land8, the9 API for this is epoll. It still blocks, but it lets us ask the operating system to unpark us when any of a set of "files"10 are ready. In the Rust world, we can access this using the nix crate, which provides a safe but otherwise 1:1 mapping to the system API.11

The epoll API is fairly simple to use: we need to create an Epoll, register the events12 that we care about, and then wait for some events to occur. wait returns when any of the registered event(s) have occurred. When we're done, we unregister the event.

Now, in theory, we could wait from our main loop. It's not like it has anything better to do while it's waiting anyway. But wakes could come from anywhere, not just direct I/O events.13 And we need to handle all of them. So that's out.

So, instead, we'll shove this off to a secondary I/O driver thread, which translates our epoll events into wakes. Which we already know how to handle!14

use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent};
use std::{collections::BTreeMap, sync::LazyLock};

static EPOLL: LazyLock<Epoll> =
    LazyLock::new(|| Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC).unwrap());
static REGISTERED_WAKERS: Mutex<BTreeMap<u64, Arc<dyn Wake>>> = Mutex::new(BTreeMap::new());

fn io_driver() {
    let mut events = [EpollEvent::empty(); 16];
    loop {
        let event_count = EPOLL.wait(&mut events, 1000u16).unwrap();
        let wakers = REGISTERED_WAKERS.lock().unwrap();
        for event in &events[..event_count] {
            let waker_id = event.data();
            if let Some(waker) = wakers.get(&waker_id) {
                waker.clone().wake();
            } else {
                // This could also be an "innocent" race condition,
                // if the event is delivered just as we're deregistering a waker.
                println!("=> (Waker {waker_id} not found, bug?)")
            }
        }
    }
}

Then, we need some way to register an interest in a "file" (and unregister it when it isn't needed anymore). This just ensures that it'll be seen by our io_driver:

use nix::sys::epoll::EpollFlags;
use std::{ops::RangeFrom, os::fd::AsFd};

// We need some unique ID for each reason to be awoken..
// In reality you'd probably want some way to reuse these.
static NEXT_WAKER_ID: Mutex<RangeFrom<u64>> = Mutex::new(0..);

struct Interest<T: AsFd> {
    // Interest needs to own the file (or borrow it),
    // to make sure that the file stays alive for as long as our interest does.
    fd: T,
    registered_waker_id: Option<u64>,
}
impl<T: AsFd> Interest<T> {
    fn new(fd: T) -> Self {
        Interest {
            fd,
            registered_waker_id: None,
        }
    }

    fn register(&mut self, mut flags: EpollFlags, context: &mut Context) {
        let is_new = self.registered_waker_id.is_none();
        let id = *self
            .registered_waker_id
            .get_or_insert_with(|| NEXT_WAKER_ID.lock().unwrap().next().unwrap());
        REGISTERED_WAKERS
            .lock()
            .unwrap()
            .insert(id, context.waker.clone());
        // It's enough to get awoken once - if the Future is still interested then it should call `register`
        // to renew its interest.
        flags |= EpollFlags::EPOLLONESHOT;
        let mut event = EpollEvent::new(flags, id);
        if is_new {
            EPOLL.add(&self.fd, event).unwrap()
        } else {
            EPOLL.modify(&self.fd, &mut event).unwrap()
        }
    }
}
impl<T: AsFd> Drop for Interest<T> {
    fn drop(&mut self) {
        if let Some(id) = self.registered_waker_id {
            // what if we have multiple interests open on the same fd? (read+write? multiple reads?)
            EPOLL.delete(&self.fd).unwrap();
            REGISTERED_WAKERS.lock().unwrap().remove(&id).unwrap();
        }
    }
}

Finally, we can slot this all into our TcpRead.. we'll need to change it slightly to keep the Interest's state, but.. it should still be recognizable enough:

use std::{io::Read, net::TcpStream};

struct TcpRead<'a> {
    socket: Interest<&'a mut TcpStream>,
    buffer: &'a mut [u8],
}
impl SleepyFuture<usize> for TcpRead<'_> {
    fn poll(&mut self, context: &mut Context) -> Poll<usize> {
        match self.socket.fd.read(self.buffer) {
            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
                // EPOLLIN is the event for when we're allowed to read
                // from the "file".
                self.socket.register(EpollFlags::EPOLLIN, context);
                Poll::Pending
            }
            size => Poll::Ready(size.unwrap()),
        }
    }
}

Finally, we can put all the parts back together, and test it all against our old friend, the luggage code server:

std::thread::spawn(io_driver);

let luggage_code_server_address = "127.0.0.1:9191";
let mut socket = TcpStream::connect(luggage_code_server_address).unwrap();
socket.set_nonblocking(true).unwrap();
let mut buffer = [0; 16];
let received = run_sleepy_future(
    // Let's limit the number of poll()s to make sure we're not cheating!
    // This is just for demonstration; remember that the runtime is /allowed/
    // to poll as often as it wants to.
    with_timeout(
        TcpRead {
            socket: Interest::new(&mut socket),
            buffer: &mut buffer,
        },
        // One initial poll to establish interest, then one poll once the data is ready for us.
        2,
    ),
).unwrap();
println!("=> The luggage code is {:?}", &buffer[..received]);
=> The luggage code is [1, 2, 3, 4, 5]

Success! We're back to where we started.. but at least our computer15 can rest a bit easier.

Another wrap... for now

Hopefully, you have a bit more of an idea about what that weird Context thing is now.

But we're still not quite back at the real Future trait16. So.. the next entry will be about just that: clearing up the remaining concepts we need to understand to be able to read the Future.17

1

It's a series for a reason, after all.

2

Not to be confused with the quakers or shakers.

3

Actually, Context contains a Waker instead. It's close enough to our Arc<dyn Wake>, but doesn't require using Arc if you're okay maintaining your own vtable. If the word "vtable" tells you nothing, just implementWake and be done with it. If the word "vtable" does tell you something... you should probably still just implement Wake and be done with it.

4

It's not self-plagiarism if we cite it! Oh, and I guess it fulfills the type contracts, too...

5

Otherwise, we'd never be able to modify the Mutex-guarded value!

6

If this looks like we're just reimplementing Condvar.. we are, kind of. Except Condvar::wait isn't specified to return immediately if notify_one was was called before wait. That's a problem for us; it would silently prevent the Future from waking itself during the poll.

7

I hear verbing is so hot this year. Can we verb a nouned verb?

8

Cross-platform support is left as an exercise for the reader. Enjoy!

9

Not the only API, there are others. But it's the one that hits the "standard" tradeoff between not being too slow or too experimental. Maybe io_uring will be everywhere in a few years, when you're the one writing the "Everything natkr got wrong" article. When you do, please send it to me, I look forward to reading it!

10

In the Unixy sense where "everything" is a "file", including network sockets.

11

For anyone following along at home, I'm going to be using nix v0.29.0, since that's the latest version when I'm writing this.

12

It would've been nice if we could link to specifically the list of event flags, but alas...

13

Timers, background threads, and so on...

14

Sometimes this is called a reactor.

15

And power bill.

16

It'd sure be helpful if it ever felt like standing up.

17

Spoiler: This means Pin and associated types.