Rust: Lifecycle of Futures

April 3, 2024

At the heart of Rust asynchronous programming are objects called futures, which represent ongoing operations. User code can easily create one by using existing library-provided implementations, such as std::future::ready(), futures::future::try_join(), or by manually implementing the Future trait. Furthermore, async closures (async { ... }) and async functions (async fn) allow programmers to implement a future in a procedural fashion (i.e., like normal code).

While the abundance of tools makes futures incredibly easy to use in practice, they have surprising corner cases, which library code, especially unsafe one, should take into consideration.

Meta

The intention of this article is not to discourage the uses of async Rust but rather to point out specific corner cases that I felt developers should be aware of when writing intricate algorithms (like my unsuccessful attempt at safely making a reference 'static) and provide solutions to them. While I do recognize various issues with async Rust, my personal experience is that the composability (as in the ability to apply select or join on arbitrary operations) offered by the async paradigm is indispensable in writing responsive (not just "fast") applications.


πŸ”—Normal Lifecycle

Normally, a future goes through the following lifecycle: creation, pinning, pending, polling, and then completion.

CreatedPendingPollingCompletepinPinned(Dropped)poll()Poll::Pendingdrop()Poll::Ready

πŸ”—Creation

An object of a type implementing Future is created somewhere, just like any other object.

use tokio::time::{sleep, Duration};

let fut = sleep(Duration::from_secs(1));

πŸ”—Pinning

The future is then stored in a pinned location. Once pinned, the future can only be accessed through a pinned reference (Pin<impl DerefMut<Target = T>>). Safe code is prevented from creating a mutable borrow, which enables the future implementation to take advantage of the pinning guarantee.

use std::pin::{Pin, pin};
use tokio::time::Sleep;

// `pin!` macro creates a pinned location on the stack
let fut: Pin<&mut Sleep> = pin!(fut);

// Since `Sleep: !Unpin`, safe code is prevented from mutably borrowing it
let _: &mut Sleep = unsafe { fut.get_unchecked_mut() };

// Immutably borrowing it is allowed because safe code cannot move it out
// from an immutable reference
let _: &Sleep = &*fut;

The pinning guarantee holds that the object remains in a stable (pinned) memory location until the object is dropped. In the above example, the Sleep object is moved into a (logical1) stack location, which is deallocated when the scope ends. But before this happens, Sleep's destructor is always called, and there is no way to circumvent this2.

The marker trait Unpin indicates that the type does not depend on the pinning guarantee. If the future implements Unpin, this pinning step can be omitted, as safe code can create a Pin pointing to it anytime by calling Pin::new().

πŸ”—Polling and Pending

An executor or another future's poll method calls the future's poll method (Future::poll()) to make progress (check the state of an inner operation and move on to the next step). If it returns Poll::Pending, this means the result is not available yet (the future is in a pending state), and the poll should be retried later. The executor can pass a Waker to get notified when it's meaningful to call the poll method again.

The following code illustrates how the executor calls the poll method:

use futures::task::noop_waker;
use std::{future::Future, pin::{Pin, pin}, task::{Context, Poll}};

/// An impractical `block_on` implementation based on busy polling
pub fn block_on<F: Future>(fut: F) -> F::Output {
    // Pin the future
    let fut: Pin<&mut F> = pin!(fut);

    // No-op waker
    let mut context = Context::from_waker(&noop_waker());

    loop {
        if let Poll::Ready(output) = fut.poll(&mut context) {
            return output;
        }

        // Still pending... A practical implementation would block the
        // current thread until woken up by a `Waker` passed by `Context`
    }
}

The poll method enforces pinning (section Pinning) by requiring the caller to pass a pinned reference (Pin<&mut Self>).

πŸ”—Completion

If the poll method returns Poll::Ready, this means the future has completed. The result is contained within Poll::Ready.

Once the future has completed, it's illegal to call the poll method again, and doing so will result in an unspecified behavior, such as panics, blocking forever, or returning a meaningless value.

Futures implementing futures::future::FusedFuture can indicate whether they have completed. Note that this is quite different from FusedIterator, which is a marker trait indicating that after Iterator::next returns None it's guaranteed to return None again.


πŸ”—Exceptional Cases

A future's life can take unexpected turns.

DroppingForgottenCreatedPendingPollingCompleteUnpolledpinPinned(Dropped)drop()forget()(cancellation)poll()Poll::Pendingdrop()Poll::Readypoll()(Unspecified)

πŸ”—Unpolled

A pinned future can be considered to be in a distinct state before the first poll. This is because some future implementations exert their first significant sideeffects on the first poll. One example is the future returned by tokio::sync::Notify::notified, which will not insert itself into a receiver list until it's polled.

This first poll behavior is necessary because it's usually the first opportunity for a future to receive a pinned reference to itself (Pin<&mut Self>). Before that happens, future implementations are unable to safely perform address-sensitive operations, such as linking *const Self to a linked list.

πŸ”—Post-Completion Polls

While it's illegal to poll a completed future, doing it will not cause a UB as Future::poll() is not marked as unsafe. For implementors, this means that unsafe code running in Future::poll() must be designed to avoid UB even if this precondition is violated.

Futures created by async functions and closures handle post-completion polls by panicking.

πŸ”—Cancellation

The future can be dropped before the poll method returns Poll::Ready. This is called cancellation.

use tokio::{io::{prelude::*, BufReader, stdin}, time::{timeout, Duration}};

let mut stdin = BufReader::new(stdin());
let mut buf = String::new();

match timeout(Duration::from_secs(10), stdin.read_line(&mut buf)).await {
    // `read_line` completed in time
    Ok(_) => println!("Hello, {buf}!"),
    // `read_line` didn't complete in time and got cancelled
    Err(_) => println!("You're a silent type, aren't you?"),
}

For a manually-defined future, the effect is obvious: all contained values are recursively dropped in order.

// Simplified from `tokio::io::util::read_line::ReadLine`
pub struct ReadLine<'a, R: ?Sized> {
    reader: &'a mut R,
    output: &'a mut String,
    buf: Vec<u8>,
    read: usize,
    _pin: PhantomPinned,
}

For an async closure or function, each suspension point (.await) can be thought of as having an implicit return path that is taken on cancellation. See the following example:

async fn foo(a: A) {
    let b = a.foo().await;
    let c = b.bar().await;
}

// Visualizing the return paths:
fn foo(a: A) -> Result<(), Cancelled> {
    let b = match block_on(a.foo()) {
        Ok(x) => x,
        Err(Cancelled) => return Err(Cancelled), // drop `a`
    };
    let c = match block_on(b.bar()) {
        Ok(x) => x,
        Err(Cancelled) => return Err(Cancelled), // drop `a` and `b`
    };
    // drop `a`, `b`, and `c`
}

πŸ”—Forgotten

If the future's owner destroys the reference to the future with forget(), the future becomes inaccessible, yet it continues to exist.

use futures::task::noop_waker;
use std::{future::{Future, pending}, mem::forget, task::Context};

struct SetOnDrop<'a>(&'a mut bool);

impl Drop for SetOnDrop<'_> {
    fn drop(&mut self) {
        *self.0 = true;
    }
}

async fn foo(x: &mut bool) {
    let _guard = SetOnDrop(x);
    pending().await // <------ SUSPENSION POINT
}

let mut flag = false;

let mut foo_fut = Box::pin(foo(&mut flag));

// Run the future until it encounters the first suspension point
_ = foo_fut.as_mut().poll(&mut Context::from_waker(&noop_waker()));

// Skip the future's desturctor, ending the mutable borrow of `flag`
forget(foo_fut);

// Check that `SetOnDrop::drop` didn't run
assert_eq!(flag, false);

A forgotten future continues to exist even after the references contained within it become invalid. The pinning guarantee says the future stays on memory until it's dropped, but doesn't say it remains valid.


πŸ”—Complications

Those exceptional cases cause some complications in the implementations or uses of futures. Some can be addressed by a proper handling, while others require taking different approaches.

πŸ”—Cancel Safety

It's crucial to note that not all futures are designed to handle cancellation. Even if they are, they might not guarantee to (fully) nullify the effect of the underlying async operations in case of cancellation. Tokio's documentation refers to this guarantee as cancel safety. Cancel safety is a bit fuzzy concept, but it broadly means that the future exhibits a desirable cancellation behavior, such as giving the next waiter a turn, not losing received data, or preferably behaving as if it wasn't called at all.

tokio::fs::remove_file is an example of an async function that is not cancel-safe. If you take a look at the source code, it's implemented by starting a background task to call std::fs::remove_file and then await-ing the task's join handle. Cancelling it will only drop the task's join handle and will not stop it, but can prevent the task from starting if it hasn't started yet3. The table below summarizes the possible outcomes:

Cancelled?Deletion ResultReturn Value
NoOKOk(())
NoErrorErr(_)
YesOKN/A
YesErrorN/A
YesCancelledN/A

By contrast, tokio::net::UdpSocket::recv guarantees by design that it does not consume an incoming message after cancellation, making itself cancel-safe.

Cancelled?Receive ResultReturn Value
NoOKOk(_)
NoErrorErr(_)
YesCancelledN/A

Cancel safety is not trivial to attain, as it demands that the future implementation commit all irreversible side effects in the last (Poll::Ready-returning) poll, synchronously. Serial composition of futures (FutureExt::then or a series of awaits in an async function) does not preserve this property.

πŸ”—Scoped Spawn

A ramification of the forgetting behavior (section Forgotten) is that it's impossible to safely write an async equivalent of scoped threads. To clarify: any attempts to poll a given non-'static future independently of the caller's polls will be unsound.

πŸ”—Scoped Threads

Scoped threads refer to threads capturing non-'static references to outer scopes. This can be safely implemented by forcing join at the end of the scope. The first implementation in Rust was std::thread::JoinGuard in the standard library, which turned out to be unsound ("Leakpocalypse 2k15") and thus didn't make it to Rust 1.0.

// If `std::thread::JoinGuard` existed in modern Rust
use std::thread;

let mut a = vec![1, 2, 3];

let thread: thread::JoinGuard<'_, ()> = thread::scoped(|| {
    // The scoped threads can borrow from outside the scope
    dbg!(&a);
});

// Skip join
forget(thread);

// UB: Unsynchronized read/write access to `a`
a.push(42);

An actually sound implementation of scoped threads later appeared in crossbeam (crossbeam::scope) and then finally in the standard library (RFC 3151, std::thread::scope).

// Based on the example from `std::thread::scope` documentation
let mut a = vec![1, 2, 3];

// `std::thread::scope` calls this closure, passing `&'scope Scope<'scope, 'env>`,
// by which inner code can spawn threads
std::thread::scope(|s| {
    s.spawn(|| {
        // The scoped threads can borrow from outside the scope (`'env: 'scope`)
        dbg!(&a);
    });

    // When the closure returns, `std::thread::scope` implicitly
    // joins on the spawned threads, ensuring they won't escape the scope
});

// After the scope, we can modify and access our variables again:
a.push(4);

Those sound implementations guarantee the scope guard call by creating a scope in their own functions and calling a user-provided closure from it, thus avoiding the issue with the early implementation.

// Pseudocode for `std::thread::scope` (panic handling omitted)
pub fn scope<'env, F, T>(f: F) -> T
// `'env' is a caller-chosen lifetime that can be thought of as whatever is
// borrowed by the scoped threads
where
    F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
    // `Scope`'s `'env: 'scope` bound is used to simulate the following bound,
    // which is not possible in current Rust:
    // for<'scope where 'env: 'scope> FnOnce(&'scope Scope<'scope>) -> T
{
    let scope = Scope::new();
    let result = f(&scope);
    // Wait for all scoped threads to complete before `'env` ends
    scope.join_all_spawned_threads();
    result
}

impl<'scope, 'env> Scope<'scope, 'env> {
    pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
    where
        F: FnOnce() -> T + Send + 'scope,
        T: Send + 'scope,
    {
        self.spawn_thread_and_keep_track_of_it(f)
    }
}

πŸ”—Scoped Tasks

Now the question is whether we can write an async version of it, such that async user code can spawn non-'static tasks on a global executor and await on all of them. For the sake of simplifying the discussion, we only consider spawning one task. As a starting point, a scoped thread implementation that can spawn only one thread4 would look like this:

// Panic handling omitted
// Uses `Box` to simplify lifetime mutation
pub fn spawn_and_join<'env>(f: Box<dyn FnOnce() + Send + 'env>) {
    // Transmute lifetime
    let f: Box<dyn FnOnce() + Send> = unsafe { transmute(f) };

    // Spawn a thread
    std::thread::spawn(move || f())
        // Wait for the scoped thread to complete before `'env` ends
        .join();
}

Based on the above, an async version could be implemented like this:

pub async fn spawn_and_join<'env>(f: Pin<Box<dyn Future<Output = ()> + Send + 'env>>) {
    // Transmute lifetime
    let f: Pin<Box<dyn Future<Output = ()> + Send>> = unsafe { transmute(f) };

    // Spawn a task
    tokio::spawn(f)
        // Wait for the scoped task to complete before `'env` ends
        .await;
}

The most obvious problem with this implementation is that .await can be skipped by cancellation (section Cancellation). This problem becomes more apparent when it's converted to a manual Future implementation:

pub fn spawn_and_join<'env>(f: Pin<Box<dyn Future<Output = ()> + Send + 'env>>)
    -> impl Future<Output = ()> + Send
{
    SpawnAndJoin::Init(f)
}

enum SpawnAndJoin<'env> {
    Init(Pin<Box<dyn Future<Output = ()> + Send + 'env>>),
    Joining(JoinHandle<()>),
    Invalid,
}

impl<'env> Future for SpawnAndJoin<'env> {
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match std::mem::replace(&mut *self, SpawnAndJoin::Invalid) {
                SpawnAndJoin::Init(f) => {
                    // Transmute lifetime
                    // Safety: We'll join on the task before `'env` expires ...right?
                    let f: Pin<Box<dyn Future<Output = ()> + Send>> = unsafe { transmute(f) };

                    // Spawn a task
                    let join_handle = tokio::spawn(f);
                    *self = SpawnAndJoin::Joining(join_handle);
                }
                SpawnAndJoin::Joining(mut join_handle) => {
                    match Pin::new(&mut join_handle).poll(cx) {
                        Poll::Ready(_) => {}
                        Poll::Pending => {
                            *self = SpawnAndJoin::Joining(join_handle);
                            return Poll::Pending;
                        }
                    }

                    return Poll::Ready(());
                }
                SpawnAndJoin::Invalid => panic!("polled after completion"),
            }
        }
    }
}

The owner of SpawnAndJoin could just stop calling poll and drop it, unborrowing 'env, despite the spawned task is still running and possibly dereferencing &'env _.

A possible solution to this problem is to terminate the program on cancellation so that no threads will ever observe expired 'env:

impl<'env> Drop for SpawnAndJoin<'env> {
    fn drop(&mut self) {
        if matches!(self, SpawnAndJoin::Joining(_)) {
            std::process::abort();
        }
    }
}

Problem solved then! Not quite, actually. The owner of SpawnAndJoin could forget it (section Forgotten), skipping the destructor altogether.

As far as I know, it's impossible to guarantee the execution of any code piece before the unborrow of 'env. The only safe moment to use 'env in SpawnAndJoin is during the calls to its methods (poll and drop) that already take 'env as part of their arguments. When the owner is not calling those methods, it can end 'env anytime.

useafterfreepollpollspawnpollpollfdropforgetOwner'envSpawnAndJoin<'env>

πŸ”—Remote Polls

What's safe to do is to poll the spawned task only within the original future's poll duration. This may seem like nothing more than select!, FuturesUnordered, or just await-ing the future directly, but it does have some uses when you need to simultaneously access borrowed values (consider &(dyn Trait + Sync)) and a non-Send value bound to a particular event loop thread.

This can be implemented by having SpawnAndJoin::poll poll the scoped task synchronously on the target executor.

pollpollExecutorBExectuorApollpollfShimtaskspawndropforgetOwner'envblock_onSpawnAndJoin<'env>

πŸ”—Foreign Language Interoperation

Rust's novel implementation of an async paradigm poses some challenges in interoperating with other implementations. In this section, we'll use GIO as an example because its Rust bindings are readily available (gio crate and gir-rust-code-generator5 FFI/API binding generator).

πŸ”—Who Calls Your Code?

What makes Rust's async implementation unique is that the callers of async methods are responsible for driving the callees' progress. Most underlying synchronous function calls (i.e., poll method calls) occur in the same direction as the async calls and can be implemented by direct calls in most cases (hence the "zero-cost" futures). The only time callbacks are involved is when the underlying I/O operation of some future completes and wakes up the executor through Wakerβ€”a form of type-erased smart reference to the originating executorβ€”to poll the task again. The future finally completes when it's polled again and returns Poll::Ready from its poll method. This design completely decouples this language feature from any specific event loop implementations and dynamic memory allocators while being a perfect fit for Rust's ownership system.

BarFooExecutorLoopWrite<File>polldirectcallindirectcallpollpollwake

This approach of async implementation is hard to find in other languages/runtimes6. Most implementations communicate the completion of async operations by callbacks. For instance, in GIO, callers pass a GAsyncReadyCallback and gpointer (a la Box<dyn FnOnce(&glib::Object, gio::AsyncResult)>) to make an async method call. When the operation is complete, the async callee calls the callback function on the original "main context" (a main loop defined by GLib) and passes a GAsyncResult object. To extract the result from it, the callback function needs to call the paired *_finish method. Unlike Rust futures, the control won't return to the async caller until completion.

GFileBarFooreadyready

GIO async methods can optionally support asynchronous cancellation (see section Asynchronous Cancellation) by taking a parameter of type GCancellable.

πŸ”—Rust to GIO

Rust async methods can call a GIO async method by using a one-shot channel and sending the result back through that channel. The safe API bindings for GIO async methods generated by gir-rust-code-generator accept callback functions in the form of impl FnOnce(_). For example, gio::prelude::FileExt::trash_async (GFile.trash_async) can be called like this:

use futures::channel::oneshot;
use gio::{File, prelude::*};

async fn file_trash(file: File) -> Result<(), glib::Error> {
    let (send, recv) = oneshot::channel();

    file.trash_async(
        glib::Priority::DEFAULT,
        None::<&gio::Cancellable>,
        move |result| _ = send.send(result),
    );

    // This will never panic because `GAsyncReadyCallback` supports no
    // operations other than calling it
    recv.await.expect("callback dropped unexpectedly")
}

Cancellation can be implemented by passing a gio::Cancellable and calling the cancel method later.

async fn file_trash(file: File) -> Result<(), glib::Error> {
    let (send, recv) = oneshot::channel();
    let cancel_guard = CancelOnDrop(Some(gio::Cancellable::new()));

    file.trash_async(
        glib::Priority::DEFAULT,
        cancel_guard.0.as_ref(),
        move |result| _ = send.send(result),
    );

    // This will never panic because `GAsyncReadyCallback` supports no
    // operations other than calling it
    let result = recv.await.expect("callback dropped unexpectedly");

    // Suppress cancellation
    cancel_guard.0 = None;
    
    result
}

struct CancelOnDrop(Option<gio::Cancellable>);

impl Drop for CancelOnDrop {
    fn drop(&mut self) {
        if let Some(cancellable) = &self.0 {
            cancellable.cancel();
        }
    }
}

Actually, gio crate provides a gio::GioFuture type implementing the above pattern, and gir-rust-code-generator automatically generates *_future methods for all async methods by default (e.g., gio::prelude::FileExt::trash_future), which means that as an application developer you don't usually have to implement it manually.

async fn file_trash(file: File) -> Result<(), glib::Error> {
    file.trash_future(glib::Priority::DEFAULT).await
}

Note though that synchronous cancellation is not supported in GIO; the operation will continue to run for a while after cancellation and may even complete. See section Asynchronous Cancellation for alternative approaches.

Last but not least, GIO async methods must be called from a thread owning a GLib "main context". Assuming you have a main context and access to it (glib::MainContext) from where the call is made, you can use its methods to call a closure or spawn an async task on it.

πŸ”—GIO to Rust

When implementing a GIO async method, using GTask is the typical approach. It works like this: The *_async method calls g_task_new, passing the caller-provided GAsyncReadCallback and gpointer values to it. Then, it kicks off whatever operation it's supposed to do, which then passes the result to one of GTask.return_* methods, which will then call the original GAsyncReadCallback, passing the GTask itself, upcasting it to GAsyncResult. The callback function finally calls the corresponding *_finish method, which then downcasts the GAsyncResult back to GTask and extracts the stored result by calling one of GTask.propagate_* methods.

Implementing in Rust can be done the same way, except that gio provides a type-safe binding for GTask: gio::[Local]Task. See the following example (a complete example can be found on Replit):

// Somewhat based on Vala compiled code as well as
// <https://gitlab.gnome.org/malureau/rdw/-/blob/dc195fe25d2038f6aba6450c1431dc3fdd7c2ac1/rdw4-rdp/src/capi.rs#L34>
use std::ffi::c_void;
use glib::{prelude::*, translate::*};

pub unsafe extern "C" fn foo_async(
    cancellable: *mut gio::ffi::GCancellable,
    callback: gio::ffi::GAsyncReadyCallback,
    user_data: *mut c_void
) {
    let task: gio::Task<i32> = from_glib_full(
        gio::ffi::g_task_new(
            std::ptr::null_mut(),
            std::ptr::null_mut(),
            callback,
            user_data,
        )  
    );

    foo_async_inner(
        from_glib_none(cancellable),
        move |result| task.return_result(result),
    );
}

pub unsafe extern "C" fn foo_finish(
    res: *mut gio::ffi::GAsyncResult,
    out_error: *mut *mut glib::ffi::GError
) -> i32 {
    let task = gio::Task::from_glib_none(res.cast());
    match task.propagate() {
        Ok(value) => value,
        Err(error) => {
            if !out_error.is_null() {
                *out_error = error.into_glib_ptr();
            }
            0
        }
    }
}

fn foo_async_inner(
    cancellable: Option<gio::Cancellable>,
    callback: impl FnOnce(Result<i32, glib::Error>) + Send + 'static,
) {
    todo!()
}

With a safe binding in place, calling a Rust async method from a GIO async method is straightforward: Spawn a task and call the async method in it.

fn foo_async_inner(
    cancellable: Option<gio::Cancellable>,
    callback: impl FnOnce(Result<i32, glib::Error>) + Send + 'static,
) {
    // Spawn a future on the thread-default `GMainContext`
    glib::spawn_future_local(async move {
        callback(foo_async_inner2().await);
    });
}

async fn foo_async_inner2() -> Result<i32, glib::Error> {
    glib::timeout_future_seconds(1).await;
    Ok(42)
}

You can spawn the task on Tokio if you want, although you need an existing runtime instance to spawn the task on (a complete example can be found on Replit):

fn foo_async_inner(
    cancellable: Option<gio::Cancellable>,
    callback: impl FnOnce(Result<i32, glib::Error>) + Send + 'static,
) {
    tokio_runtime().spawn(async move {
        callback(foo_async_inner2().await);
    });
}

async fn foo_async_inner2() -> Result<i32, glib::Error> {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    Ok(42)
}

fn tokio_runtime() -> tokio::runtime::Handle {
    todo!()
}

To cancel the future on GIO cancellation, wrap it with gio::CancellableFuture:

fn foo_async_inner(
    cancellable: Option<gio::Cancellable>,
    callback: impl FnOnce(Result<i32, glib::Error>) + 'static,
) {
    // Spawn a future on the thread-default `GMainContext`
    glib::spawn_future_local(async move {
        callback(
            gio::CancellableFuture::new(foo_async_inner2(), cancellable.unwrap_or_default())
                .await
                // `gio::Cancelled: Into<glib::Error>`
                .unwrap_or_else(|cancelled| Err(cancelled.into()))
        );
    });
}

Not wrapping specific futures is also an option if their cancellation behaviors are undesirable (see section Cancel Safety).

πŸ”—Macros

Macros should be prepared to be called inside an async closure that can be subject to cancellation or forgetting.

My crate cryo had a soundness bug in version 0.2.x that only occurs inside an async closure. The cryo::cryo! macro provided a way to safely create a 'static object referencing a local variable. To this end, it created a hidden local variable of type Cryo<'a> that, when going out of scope, blocked the current thread until all such references had been removed. This worked well for non-async code because local variables containing references with lifetime 'a are guaranteed to be dropped before 'a expires.

// for<'a> fn(&'a str)
fn hoge<'a>(foo: &'a str) {
    cryo!(let cryo: Cryo<_> = foo);
    // Expanded to (with some details omitted):
    //     let cryo = unsafe { Cryo::new(&foo) };   // binding 1
    //     pin_mut!(cryo);                          // binding 2
    //     let cryo = cryo.into_ref();              // binding 3

    let borrow = cryo.borrow();
    std::thread::spawn(move || dbg!(*borrow));

    // When binding 1 (`Cryo<'a>`) goes out of scope, its destructor
    // blocks until the spawned thread drops `borrow`
}

As it turned out, this doesn't hold inside async code. Take a look at the async version of the above code:

// for<'a> fn(&'a str) -> impl Future<Output = ()> + !Unpin + 'a
async fn hoge<'a>(foo: &'a str) {
    cryo!(let cryo: Cryo<_> = &foo);

    let borrow = cryo.borrow();
    std::thread::spawn(move || dbg!(*borrow));

    sleep(Duration::from_secs(1)).await; // <------ SUSPENSION POINT
}

The returned future contains Cryo<'a> in one of its compiler-generated fields. In a normal future lifecycle, this Cryo<'a> would be dropped before 'a expires whether the future completed or got cancelled, as the future's destructor would be called in both cases. However, as described in section Forgotten, it's possible to skip this destructor invocation, breaking cryo!'s safety invariants.

After all, Rust doesn't guarantee that Cryo<'a> is dropped before 'a expires; it only guarantees that the destructor doesn't observe a dangling reference. This problem is actually something that pre-1.0 Rust developers experienced at first hand with std::thread::JoinGuard, when they realized that safe code could skip its destructor by simulating forget, which was considered unsafe back then ("Leakpocalypse 2k15"). My cryo! seemed to work around Leakpocalypse by tucking Cryo behind a shadowed local variable, except this was powerless against async code.


πŸ”—Recommendations

πŸ”—Handling Cancellation

Handling cancellation properly (see sections Cancellation and Cancel Safety) is highly recommended because it can happen for a multitude of causesβ€”such as timeouts or user requestsβ€”penetrating through multiple layers of subsystems. Futures not supporting cancellation (not providing cancel safety) should be clearly marked as such.

In async functions and closures, every suspension point (.await) must be considered as a potential cancellation point. Cancellation is implicitly handled by dropping local variables, similarly to panics. Resources acquired with RAII guards (e.g., File, MutexGuard) are released automatically. For resources or transactions not managed by the RAII pattern, you need to implement cleanup code manually. scopeguard crate is useful for making one-off RAII guards.

Not supporting cancellation is a valid option. However, even such futures could still encounter cancellation due to user errors or panics. A possible error handling strategy is "poisoning" the relevant state data in those cases so that future operations would fail instead of succeed with incorrect results.

struct Bank {
    /// `None` means a poisoned state
    accounts: Option<Accounts>,
}

struct Accounts {
    a: u32,
    b: u32,
}

impl Bank {
    /// Transfer some amount of money between accounts.
    ///
    /// Cancellation is NOT supported.
    pub async fn transfer(&mut self) {
        let mut accounts = self.accounts.take().expect("state data corrupted");
        accounts.a -= 100;
        sleep(Duration::from_secs(1)).await;  // bathroom break
        accounts.b += 100;
        self.accounts = Some(accounts);
    }
}

Immediately panicking on unexpected cancellation should be avoided (if possible) as this could lead to a double panic, aborting an entire process.

πŸ”—Avoiding Cancellation

There are times when you are working on an async function that is meant to be cancellable, and need to call one that is not cancellable or have an undesirable cancellation behavior (see section Cancel Safety). One solution is calling it from a spawned task (spawned by, e.g., tokio::task::spawn) to shield it from cancellation. The task can continue to run even after the caller is cancelled.

As an example, let's suppose we're implementing a counter backed by a file. The first version looks like this (don't mind excessive unwrapping):

use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, File};

struct Counter {
    file: File,
}

impl Counter {
    pub fn new(path: &std::path::Path) -> Self { todo!() }

    pub async fn fetch_increment(&mut self) -> u32 {
        let mut buf = [0u8; 4];

        self.file.rewind().await.unwrap();
        self.file.read_exact(&mut buf).await.unwrap();

        let old = u32::from_le(buf);
        buf = u32::to_le(old + 1);

        self.file.rewind().await.unwrap();
        self.file.write_all(&buf).await.unwrap();
        self.file.flush().await.unwrap();

        old
    }
}

We want to make Counter::fetch_increment cancellable. The methods called inside are cancellable, but the result is not what we want. For example, if the write was aborted in the middle, the file could end up with a mix of new and old data. The write needs to be atomic.

We can solve the issue by moving the operation into a spawned task:

use std::sync::Arc;
use tokio::{io::{AsyncRead, AsyncSeek, AsyncWrite, File}, spawn, sync::Mutex};

struct Counter {
    file: Arc<Mutex<File>>,
}

impl Counter {
    pub fn new(path: &std::path::Path) -> Self { todo!() }

    pub async fn fetch_increment(&mut self) -> u32 {
        let file = Arc::clone(&self.file);
        tokio::spawn(async move {
            let mut file = file.lock().await;
            let mut buf = [0u8; 4];

            file.rewind().await.unwrap();
            file.read_exact(&mut buf).await.unwrap();

            let old = u32::from_le(buf);
            buf = u32::to_le(old + 1);

            file.rewind().await.unwrap();
            file.write_all(&buf).await.unwrap();
            file.flush().await.unwrap();

            old
        }).await.unwrap()
    }
}

πŸ”—Asynchronous Cancellation

The way futures are normally cancelled (section Cancellation) is called synchronous cancellation because it's implemented by a synchronous call to the future's drop glue, which calls Drop::drop for all contained values. Drop::drop implementations cannot use .await as it's not defined as an async method.

Most async runtimes provide block_on functions to perform async operations in non-async functions. However, calling block_on in Drop::drop implementations should be done with a great care as it's prone to deadlocks, unless you are certain that it's never called from a future.

Some variants of block_on, such as Tokio's, panic when such usage is detected. async-std's variant doesn't, and the usage of this pattern in the same library is known to cause a deadlock (async-rs/async-std#9007).

A few examples of deadlocks caused by block_on are shown below:

Deadlock Case 1

This program uses Tokio's current-thread executor. The program spawns an async task of type MyTask to perform some database operations, but then decides to cancel this task. The executor handles the cancellation request by dropping MyTask. MyTask maintains an RAII guard of type Transaction to cancel database transactions in such cases. Cancelling a database transaction requires network operations through TcpStream, so <Transaction as Drop>::drop uses futures::executor::block_on to perform asynchronous operations. This causes a deadlock because the executor is responsible for handling readiness notifications from sockets (which would allow operations in TcpStream to make progress), but it's currently blocked by the call to <MyTask as Drop>::drop.

block_ondropTransactiondropMyTaskI/O readinessExecutorLoopTcpStreamUnabletocheckuntilthedropcompletesWaitingtochange Deadlock Case 2

One shared mutable resource RR of type Arc<AsyncMutex<_>> (runtime-agnostic) is shared by two pending futures F1F_1 and F2F_2 . F1F_1 and F2F_2 are owned by a task (top-level future) F0F_0 and run concurrently by virtue of join. To handle cancellation, F1F_1 owns an RAII guard that would perform some operation on RR when dropped. Since RR is protected by an async-aware mutex, the RAII guard's drop implementation needs to use block_on.

The owner decides to cancel task F0F_0 . Because of the variable definition order, F1F_1 is dropped before F2F_2 . F1F_1 's RAII guard uses block_on to acquire a lock on RR . Turns out, RR is currently locked by F2F_2 . If F2F_2 was dropped, it would unlock RR , but for that to happen, the drop of F1F_1 must complete first. Now cancellation is in deadlock.

dropdropF0dropExecutorLoopforF2toreleaseRF1F2LockGuardWaitingWilldropafterF1isdoneR: Arc<Mutex<_>>

If performing async operations during cancellation is necessary, consider the following approaches instead:

Spawn a Task: drop can initiate (and forget) the async operation by spawning a task.

pub async fn foo(file: File) {
    let guard = FooCancelGuard(Some(file));

    // ...async operations
    
    // Suppress cancellation behavior
    guard.0 = None;
}

struct FooCancelGuard(Option<File>);

impl Drop for FooCancelGuard {
    fn drop(&mut self) {
        if let Some(file) = self.0.take() {
            tokio::spawn(async move {
                if let Err(error) = file.lock().await.flush().await {
                    tracing::error!(?error, "Failed to flush file");
                }
            })
        }
    }
}

Alternatively, by using scopeguard crate:

pub async fn foo(file: File) {
    let guard = scopeguard::guard(file, |file| {
        tokio::spawn(async move {
            if let Err(error) = file.lock().await.flush().await {
                tracing::error!(?error, "Failed to flush file");
            }
        });
    });


    // ...async operations
    
    // Suppress cancellation behavior
    scopeguard::ScopeGuard::into_inner(guard);
}
  • Pro: Same user-facing API as synchronous cancellation

  • Con: Requires an async runtime instance to spawn a task in

  • Con: Need to target a specific async runtime implementation

  • Con: The users might not be aware that cancellation triggers a background operation

  • Con: Unable to report cancellation result (section Cancel Safety)

Cancellation Request Channel: Introduce a messaging channel to request asynchronous cancellation.

// Before:
/// # Cancellation
///
/// Cancellation is supported.
pub async fn foo() -> u32 {
    todo!()
}

// After:
/// # Cancellation
///
/// Synchronous cancellation is NOT supported. Resolve `cancel_recv` to request
/// asynchronous cancellation. This function will then return `Err(Cancelled)`
/// if the request is accepted.
pub async fn foo(cancel_recv: impl Future<Output = ()>) -> Result<u32, Cancelled> {
    todo!()
}

pub struct Cancelled;
  • Pro: Explicit asynchronous cancellation, which the presence of that function parameter makes clear

  • Pro: Able to report cancellation result (section Cancel Safety)

  • Pro: Compatible with foreign async runtimes (e.g., GCancellable from GIO and CancellationToken from .NET; see section Foreign Language Interoperation)

  • Con: Incompatible with existing code that depends on synchronous cancellation (e.g., tokio::time::timeout)

  • Con: Unintentional synchronous cancellation is not handled

πŸ”—Macros

Macros must treat any user code (in the caller or metavariables) as potential suspension points, where any local variables can be forgotten (and have their destructors skipped).

As far as I know, the only way to create a trusted section of user code where shadowed local variables are subject to proper destruction is to wrap it in a closure.


1

"Logical" in two senses: (1) the compiler may change its physical representation as they fit, provided that this won't change the program's "observed behavior"; (2) local variables created inside an async closure are stored inside a future object created from the async closure, if they are carried across a suspension point (.await).

2

Well, technically, you could terminate the program to skip the destructor, but then there wouldn't be any code remaining that could observe the pinned location.

3

Async closures and functions do not execute the inner code until they are polled.

4

Spawning one scoped thread is not completely useless; it can be used to recover from stack exhaustion. See LLVM commit 26a92d5.

5

Although "gir" is the official name of this tool, this name is incredibly confusing as it's identical to the name of the very file format it takes as an input. To avoid confusion, I decided to call it "gir-rust-code-generator" in this article, following the suit of Debian.

6

In RFC 2592 the author stated that they were not aware of any futures libraries using this technique.

7

async-rs/async-std#900 is reported to only manifest on machines with few CPUs, which might suggest that the issue is only applicable to legacy hardware. However, this can be negated by applications that maximize hardware utilization (consider ripgrep as an example). Furthermore, this assumption about legacy hardware might not hold for embedded systems, where lowering production cost is prioritized over maximizing computing performance. In any case, relying solely on probabilistic measures without understanding the underlying probability is a poor development practice.

8

tokio::task::JoinHandle detaches the associated task when dropped. Other executors may behave differently; for example, futures::future::RemoteHandle instead cancels the task unless you explicitly detach it by calling the forget method. If you are using a runtime other than Tokio, make sure to check the documentation.