Rust Async programming (Tokio)

Rust Asynchronous Programming

1. Sync programming

When a program encounters an operation that cannot complete immediately (e.g. establishing a TCP connection), the thread blocks and waits until the operation finishes. While blocked, the thread cannot do other work.

2. Async programming

In async programming, operations that cannot complete immediately are suspended and control returns to the executor; the thread is not blocked. The executor can poll other tasks while a suspended task waits for I/O, timers, or other readiness events. When the operation completes, the task is resumed and continues from where it left off. This is cooperative scheduling: tasks yield at await points, not via preemption.

async/await

  • async fn looks like a regular function but is compiled at compile time into a state machine that implements the Future trait. An async fn therefore returns impl Future<Output = T>.
  • Futures in Rust are lazy: calling an async fn constructs a Future value but does not execute the body. Work happens only when the Future is polled (e.g. via .await inside an async context, or by an executor).
  • .await awaits a Future to complete. It yields the current task back to the executor (cooperative), allowing the executor to poll other tasks. .await does not preempt the thread — it only yields at defined suspension points.
  • Executors/runtimes drive (poll) Futures. Without an executor, Futures do nothing.

Example:

async fn compute() -> u32 {
    // async body is compiled into a state machine
    42
}

#[tokio::main]
async fn main() {
    let fut = compute(); // constructs a Future (no work yet)
    let value = fut.await; // runtime polls and runs it to completion
    println!("{}", value);
}

Tokio

  • Tokio provides a runtime (executor), evented non-blocking I/O, timers, and task utilities.
  • The #[tokio::main] macro transforms an async fn main() into a synchronous fn main() that initializes a Tokio runtime and runs the async main function. Tokio supports both runtimes: multi-threaded (default) and single-threaded ("current_thread"). You can configure the runtime:
    • Default is multi-threaded; use #[tokio::main(flavor = "current_thread")] for a single-threaded runtime.
    • #[tokio::main(flavor = "multi_thread", worker_threads = N)] for multi-thread.
    • Multi-thread: tasks can run on other threads and spawned futures must be Send. Single-thread: everything runs on one thread, allows !Send futures and spawn_local/LocalSet, but no parallelism.

Spawn concurrent tasks:

Tokio tasks are lightweight asynchronous units of work scheduled by the runtime. Create one with tokio::spawn(async { ... }). tokio::spawn returns a JoinHandle<T>. Awaiting the JoinHandle yields a Result<T, tokio::task::JoinError>: Ok contains the task's return value; Err means the task panicked or was cancelled (for example, due to runtime shutdown).

Tasks are scheduled onto the runtime's worker threads; on the default multi-threaded runtime a task may run on a different thread than the spawner.

On a multi-threaded runtime the future passed to tokio::spawn must be Send. For non-Send futures use spawn_local or run on a current-thread runtime. Tasks are Send when all data that is held across .await calls is Send. This is a bit subtle. When .await is called, the task yields back to the scheduler. The next time the task is executed, it resumes from the point it last yielded. To make this work, all state that is used after .await must be saved by the task. If this state is Send, i.e. can be moved across threads, then the task itself can be moved across threads. Note: only state that lives across .await points must be Send; temporaries dropped before an .await need not be.

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // runs concurrently (requires Send on multi-threaded runtime)
        1 + 2
    });

    let res = handle.await.unwrap();
    println!("{}", res);
}
  • Run blocking/CPU-bound code on a blocking thread pool:
let result = tokio::task::spawn_blocking(|| {
    // expensive synchronous work
    expensive_sync_work()
}).await.unwrap();
  • Run multiple futures concurrently:
let (a, b) = tokio::join!(fut1(), fut2()); // waits for both
  • Wait for the first to complete:
tokio::select! {
    res = fut1() => { /* fut1 finished first */ }
    res = fut2() => { /* fut2 finished first */ }
}

Guidance and caveats:

  • Avoid blocking inside async tasks; blocking will block the executor's worker thread and degrade performance. Use spawn_blocking for blocking work.
  • tokio::spawn requires Send futures on the multi-threaded runtime; use spawn_local or a current-thread runtime for !Send futures.
  • .await only yields at suspension points in the Future; long-running synchronous loops inside async tasks will block progress if they don't yield.
  • Use Tokio primitives (non-blocking I/O, timers, channels) to write efficient async code.

Shared States

  1. Guard the shared state using Mutex

  2. Spawn a task to manage the state and use message passing to operate on it

A synchronous mutex will block the current thread when waiting to acquire the lock. This, in turn, will block other tasks from processing. Switching to tokio::sync::Mutex will cause the task to yield control back to the executor, but this will usually not help with performance as the asynchronous mutex uses a synchronous mutex internally.

In std::sync::Mutex, std::sync::MutexGuard type is not Send. This means that you can't send a mutex lock to another thread, and the error happens because the Tokio runtime can move a task between threads at every .await. To avoid this, you should restructure your code such that the mutex lock's destructor runs before the .await.

The tokio::sync::Mutex type provided by Tokio can be used as an alternative. The primary feature of the Tokio mutex is that it can be held across an .await without any issues. That said, an asynchronous mutex is more expensive than an ordinary mutex, and it is typically better to use one of the two other approaches.

Using a blocking mutex to guard short critical sections is an acceptable strategy when contention is minimal. When a lock is contended, the thread executing the task must block and wait on the mutex. This will not only block the current task but it will also block all other tasks scheduled on the current thread (performance penalty and against ideas of async).

By default, the Tokio runtime uses a multi-threaded scheduler. Tasks are scheduled on any number of threads managed by the runtime. If a large number of tasks are scheduled to execute and they all require access to the mutex, then there will be contention. On the other hand, if the current_thread runtime flavor is used, then the mutex will never be contended.

Message Passing

Tokio provides a number of channels, each serving a different purpose.

  • mpsc: multi-producer, single-consumer channel. Many values can be sent.

  • oneshot: single-producer, single consumer channel. A single value can be sent.

  • broadcast: multi-producer, multi-consumer. Many values can be sent. Each receiver sees every value.

  • watch: multi-producer, multi-consumer. Many values can be sent, but no history is kept. Receivers only see the most recent value.

  • Send from multiple tasks by cloning Sender in mpsc

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await.unwrap();
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await.unwrap();
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}
  • Pass response using oneshot
use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

use tokio::sync::oneshot;
use bytes::Bytes;

/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Bytes,
        resp: Responder<()>,
    },
}

/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "foo".to_string(),
        resp: resp_tx,
    };

    // Send the GET request
    tx.send(cmd).await.unwrap();

    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
        resp: resp_tx,
    };

    // Send the SET request
    tx2.send(cmd).await.unwrap();

    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

I/O

I/O in Tokio operates in much the same way as in std, but asynchronously. There is a trait for reading (AsyncRead) and a trait for writing (AsyncWrite). Specific types implement these traits as appropriate (TcpStream, File, Stdout). AsyncReadand AsyncWrite are also implemented by a number of data structures, such as Vec<u8> and &[u8]. This allows using byte arrays where a reader or writer is expected.

Summary (key points)

  • async fn → compiled to a state machine that returns a lazy Future.
  • .await yields the current task (cooperative) and allows the executor to run others.
  • A runtime/executor (e.g., Tokio) is required to poll Futures.
  • Use tokio::spawn for concurrent tasks and tokio::task::spawn_blocking for blocking work.
  • Avoid blocking inside async tasks; prefer non-blocking I/O and explicit yielding via .await.

This document was drafted with the assistance of GitHub Copilot.



Back to blog main page