Async from scratch 1: What's in a Future, anyway?
There are a lot of guides about how to use async Rust from a "user's
perspective", but I think it's also worth understanding how it
works, what those async
blocks actually mean.
Why you get all those weird pinning errors.
This is the first post in a series where we're going to slowly build our way up to reinventing the modern async Rust environment, in an attempt to explain the whys and the hows. It's not going to end up being a competitor to Tokio or anything, but hopefully it should make understanding it a bit less daunting afterwards.
I'm writing the series targeted at people who've written a trait
and
an async fn
(or two), but don't worry if "polling", "pinning", or
"wakers" mean nothing to you. That's what we're going to try to
untangle, one step at a time!
Now... If you've written any async Rust code, it probably looked something like this:
async fn trick_or_treat() {
for house in &STREET {
match demand_treat(house).await {
Ok(candy) => eat(candy).await,
Err(_) => play_trick(house).await,
}
}
}
But, uh, what does that do? Why do I need to await
things, how is an
async fn
different from any other fn
, and what does any of that
actually... do, anyway?
In the Future
...
Well, to understand that, we're going to need to rewind the tape a bit.
We're going to have to meet a trait that you probably haven't really
seen before. We're going to have to deal with...
Future
.
Just like
Add
defines
whether a + b
is valid, Future
defines "something that can be
.await
-ed".1 It looks like this:
use std::{task::{Context, Poll}, pin::Pin};
trait Future {
type Output;
fn poll(
self: Pin<&mut Self>,
context: &mut Context<'_>,
) -> Poll<Self::Output>;
}
..Y'know, for a trait with only one function, that's a pretty spicy one signature. It could even be called a bit overwhelming. Especially if you're new to Rust in general.
But most of that doesn't really matter, so we can make a few simplifications for now. Don't worry, we'll get back to all of them later. But for now, we can strip most of that away, and just pretend that it looks like this instead:
use std::task::Poll;
trait SimpleFuture<Output> {
fn poll(&mut self) -> Poll<Output>;
}
So what does this (Simple)Future::poll
thing do?
Let's take a stroll down to the poll
box
At its core, a Future
is a function call that can pause itself when it
needs to wait for something.2
poll
asks the Future
to try to continue, returning Poll::Ready
if
it was able to finish, or Poll::Pending
if it had to pause itself
again.3
This can start out pretty simple. We could have a Future
that is
always ready to produce some extremely random
numbers:
struct FairDice;
impl SimpleFuture<u8> for FairDice {
fn poll(&mut self) -> Poll<u8> {
Poll::Ready(4) // chosen by fair dice roll
}
}
We could also just wait forever, grabbing some breathing room:
struct LookBusy;
impl SimpleFuture<()> for LookBusy {
fn poll(&mut self) -> Poll<()> {
Poll::Pending
}
}
These have all been pretty trivial problems, but to be able to pause things midway we'll need to save all the context that should be kept.
This is where our Future
becomes relevant as a type, and not just a
marker for which poll
function to call. We could have a Future
that
needs to be polled 10 times before it completes:
struct Stubborn {
counter: u8,
}
impl SimpleFuture<()> for Stubborn {
fn poll(&mut self) -> Poll<()> {
self.counter += 1;
if self.counter == 10 {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
Or a wrapper that delegates to another Future
:
struct LoadedDice {
inner: FairDice,
}
impl SimpleFuture<u8> for LoadedDice {
fn poll(&mut self) -> Poll<u8> {
match self.inner.poll() {
Poll::Ready(x) => Poll::Ready(x + 1),
Poll::Pending => Poll::Pending,
}
}
}
Now.. writing all those "match poll
, if pending then return, if ready
then continue" blocks can also get pretty tedious. Thankfully, Rust
provides the
ready!
macro that does it for us.4
The example above could also be written like this:
use std::task::ready;
struct LoadedDice {
inner: FairDice,
}
impl SimpleFuture<u8> for LoadedDice {
fn poll(&mut self) -> Poll<u8> {
let x = ready!(self.inner.poll());
Poll::Ready(x + 1)
}
}
But eventually we'll want to be able to await multiple times, and to save stuff between them. For example, we might want to sum up pairs of our dice:
async fn fair_dice() -> u8 {
4 // still guaranteed to be completely fair
}
async fn fair_dice_pair() -> u8 {
let first_dice = fair_dice().await;
let second_dice = fair_dice().await;
first_dice + second_dice
}
We can do this by saving the shared state in an enum
instead, with a
variant for each await
point. This kind of rearrangement is called a
"state machine", and this is also effectively what async fn
does for
us behind the scenes. That ends up looking like this:
enum FairDicePair {
Init,
RollingFirstDice {
first_dice: FairDice,
},
RollingSecondDice {
first_dice: u8,
second_dice: FairDice,
}
}
impl SimpleFuture<u8> for FairDicePair {
fn poll(&mut self) -> Poll<u8> {
// The loop lets us continue running the state machine
// until one of the ready! clauses pauses us.
loop {
match self {
Self::Init => {
*self = Self::RollingFirstDice {
first_dice: FairDice,
};
},
Self::RollingFirstDice { first_dice } => {
// Every time we're poll()ed, we'll do _everything_ up to the
// next ready! again (poll() is just another method, after all),
// so it should be the first (non-trivial) thing we do every time
// it's called.
let first_dice = ready!(first_dice.poll());
*self = Self::RollingSecondDice {
first_dice,
second_dice: FairDice,
}
}
Self::RollingSecondDice { first_dice, second_dice } => {
let second_dice = ready!(second_dice.poll());
return Poll::Ready(*first_dice + second_dice)
}
}
}
}
}
This is.. just a bit.. more verbose.
But on the flip side, a raw poll
lets us do things that async fn
can't really express. For example, we can build a timeout that only
lets us poll some arbitrary wrapped Future
so many times:5
struct Timeout {
inner: Stubborn,
polls_left: u8,
}
struct TimedOut;
impl SimpleFuture<Result<(), TimedOut>> for Timeout {
fn poll(&mut self) -> Poll<Result<(), TimedOut>> {
match self.polls_left.checked_sub(1) {
Some(x) => self.polls_left = x,
None => return Poll::Ready(Err(TimedOut)),
}
let inner = ready!(self.inner.poll());
Poll::Ready(Ok(inner))
}
}
Let's dance run
So.. we've defined our (Simple)Future
. A few, in fact. But they're
not really worth much unless we can actually run them. How do we do
that?
Simple. We just keep calling poll
until it returns Ready
.6
fn run_future<Output, F: SimpleFuture<Output>>(mut fut: F) -> Output {
loop {
if let Poll::Ready(out) = fut.poll() {
return out;
}
}
}
For example:
println!("=> {}", run_future(FairDice));
=> 4
Now, this does have a catch. Just a tiny one. A teeny-tiny one. A teeny tiny toy catch.
While waiting for our Future
to complete we're wasting a lot of CPU
cycles, just calling poll
over and over.7 That's not ideal, but
for now, let's just put a pin in that. We'll come back to it soon
enough.
Enter the combinatrix
As we can see, trying to write all of our logic as a poll
quickly
grows out of control, but sometimes we do need to express things that
regular sequences of function calls.. can't.8
Is there a way to let us combine them, so we can use whatever fits the job best?
Well, yes. We can write combinators, generalizing our special logic
into new building blocks that our async fn
can then reuse.
For example, our Timeout
example can be changed to accept any
arbitrary Future
, instead of only Stubborn
:
struct Timeout<F> {
inner: F,
polls_left: u8,
}
struct TimedOut;
impl<F, Output> SimpleFuture<Result<Output, TimedOut>> for Timeout<F>
where
F: SimpleFuture<Output>,
{
fn poll(&mut self) -> Poll<Result<Output, TimedOut>> {
match self.polls_left.checked_sub(1) {
Some(x) => self.polls_left = x,
None => return Poll::Ready(Err(TimedOut)),
}
let inner = ready!(self.inner.poll());
Poll::Ready(Ok(inner))
}
}
fn with_timeout<F, Output>(inner: F, max_polls: u8) -> impl SimpleFuture<Result<Output, TimedOut>> {
Timeout {
inner,
polls_left: max_polls,
}
}
Which we could then use in our async fn
, by wrapping the sub-Future
before await
-ing it:9
async fn send_email(target: &str, msg: &str) {}
struct TimedOut;
async fn with_timeout<F: Future>(inner: F, max_polls: u8) -> Result<F::Output, TimedOut> { Ok(inner.await) }
async fn send_email_with_retry() {
for _ in 0..5 {
if with_timeout(send_email("nat@nullable.se", "message"), 10).await.is_ok() {
return;
}
}
panic!("repeatedly timed out trying to send email, giving up...");
}
Input, output
We've spent some time working out how to combine our Futures... but
they don't really.. do anything yet. If a Future
runs in the
forest computer, but nobody was around to run it.. we haven't
really done much more than burn some electricity.
To be useful we'll need to be able to interact with external systems. Network calls, and so on.
Let's try reading something from a TCP socket, for example. We'll provide a server that provides our luggage code whenever we connect. For safekeeping, of course.
let listener = std::net::TcpListener::bind("127.0.0.1:9191").unwrap();
std::thread::spawn(move || {
use std::{io::Write, time::Duration};
let (mut conn, _) = listener.accept().unwrap();
// Ensure that the client needs to wait for
std::thread::sleep(Duration::from_millis(200));
conn.write_all(&[1, 2, 3, 4, 5]).unwrap();
});
To do this, we'll need to do a few things:
- Create the socket (this happens implicitly in Rust's API)
- Connect to the remote destination
- Configure the socket to be non-blocking (since otherwise the receive itself would just wait for the message, preventing any other Futures from running on the same thread)10
- Try to read the message
- If the read returned
WouldBlock
, returnPending
and retry from step 4 on the next poll
Putting it together looks something like this:
use std::{io::Read, net::TcpStream};
struct TcpRead<'a> {
socket: &'a mut TcpStream,
buffer: &'a mut [u8],
}
impl<'a> SimpleFuture<usize> for TcpRead<'a> {
fn poll(&mut self) -> Poll<usize> {
match self.socket.read(self.buffer) {
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
size => Poll::Ready(size.unwrap()),
}
}
}
let luggage_code_server_address = "127.0.0.1:9191";
let mut socket = TcpStream::connect(luggage_code_server_address).unwrap();
socket.set_nonblocking(true).unwrap();
let mut buffer = [0; 16];
let received = run_future(TcpRead {
socket: &mut socket,
buffer: &mut buffer,
});
println!("=> The luggage code is {:?}", &buffer[..received]);
=> The luggage code is [1, 2, 3, 4, 5]
Until next time...
Hopefully, you now have a bit of a handle on the general idea of how
Futures
interact. We've seen how to define, run, combine them, and
used then to communicate with a network service.
But as I mentioned, we've only really talked about our
simplified SimpleFuture
variant. Through the rest of
the series, I'll focus on pulling back those curtains, one by one,
until we arrive back at the real Future
trait.
First up, our SimpleFuture
is pretty wasteful since we
need to keep polling constantly, not just when there is anything useful
for us to do. The solution to that is called a waker. But that's a
topic for next time...
Well actually, .await
is defined by IntoFuture
.. but that's
just a thin conversion wrapper.
Like waiting for a timer, receiving a message over the network, that sort of thing.
If that sounds like an
Option
..
It basically is! Except the code often becomes clearer when our
types embed the meaning that they represent. An Option
could be
None
for many reasons, but a Pending
is always a work in
progress.
If this reminds you of the
?
operator.. Yeah, this is another parallel.
In reality, you'd want to use time instead of trying to count poll calls.. but dealing with time brings in more moving parts that I don't want to deal with right now.
Calling poll
again after that point is undefined, but usually
it'll either panic or keep returning Pending
forever.
Someone once said something about the sanity that that would imply...
And even those regular sequences need to call into primitives that actually do things eventually. Futures don't just come fully formed out of the ether, after all.
In our imaginary world where Rust supports await
-ing
SimpleFuture
rather than Future
, anyway.
Yes, in reality this should happen before connecting, since we don't want to keep our thread busy while waiting for that. For now, let's pretend that the connection already existed. For complicated reasons that are out of scope for right now, the Rust standard library doesn't really handle asynchronous connect operations. Tokio does this for you.