Rust Async programming (Tokio)
When a program encounters an operation that cannot complete immediately (e.g. establishing a TCP connection), the thread blocks and waits until the operation finishes. While blocked, the thread cannot do other work.
In async programming, operations that cannot complete immediately are suspended and control returns to the executor; the thread is not blocked. The executor can poll other tasks while a suspended task waits for I/O, timers, or other readiness events. When the operation completes, the task is resumed and continues from where it left off. This is cooperative scheduling: tasks yield at await points, not via preemption.
async fn looks like a regular function but is compiled at compile time into a state machine that implements the Future trait. An async fn therefore returns impl Future<Output = T>.async fn constructs a Future value but does not execute the body. Work happens only when the Future is polled (e.g. via .await inside an async context, or by an executor)..await awaits a Future to complete. It yields the current task back to the executor (cooperative), allowing the executor to poll other tasks. .await does not preempt the thread — it only yields at defined suspension points.Futures. Without an executor, Futures do nothing.Example:
async fn compute() -> u32 {
// async body is compiled into a state machine
42
}
#[tokio::main]
async fn main() {
let fut = compute(); // constructs a Future (no work yet)
let value = fut.await; // runtime polls and runs it to completion
println!("{}", value);
}
#[tokio::main] macro transforms an async fn main() into a synchronous fn main() that initializes a Tokio runtime and runs the async main function. Tokio supports both runtimes: multi-threaded (default) and single-threaded ("current_thread"). You can configure the runtime:
#[tokio::main(flavor = "current_thread")] for a single-threaded runtime.#[tokio::main(flavor = "multi_thread", worker_threads = N)] for multi-thread.Tokio tasks are lightweight asynchronous units of work scheduled by the runtime. Create one with tokio::spawn(async { ... }). tokio::spawn returns a JoinHandle<T>. Awaiting the JoinHandle yields a Result<T, tokio::task::JoinError>: Ok contains the task's return value; Err means the task panicked or was cancelled (for example, due to runtime shutdown).
Tasks are scheduled onto the runtime's worker threads; on the default multi-threaded runtime a task may run on a different thread than the spawner.
On a multi-threaded runtime the future passed to tokio::spawn must be Send. For non-Send futures use spawn_local or run on a current-thread runtime. Tasks are Send when all data that is held across .await calls is Send. This is a bit subtle. When .await is called, the task yields back to the scheduler. The next time the task is executed, it resumes from the point it last yielded. To make this work, all state that is used after .await must be saved by the task. If this state is Send, i.e. can be moved across threads, then the task itself can be moved across threads. Note: only state that lives across .await points must be Send; temporaries dropped before an .await need not be.
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// runs concurrently (requires Send on multi-threaded runtime)
1 + 2
});
let res = handle.await.unwrap();
println!("{}", res);
}
let result = tokio::task::spawn_blocking(|| {
// expensive synchronous work
expensive_sync_work()
}).await.unwrap();
let (a, b) = tokio::join!(fut1(), fut2()); // waits for both
tokio::select! {
res = fut1() => { /* fut1 finished first */ }
res = fut2() => { /* fut2 finished first */ }
}
Guidance and caveats:
spawn_blocking for blocking work.tokio::spawn requires Send futures on the multi-threaded runtime; use spawn_local or a current-thread runtime for !Send futures..await only yields at suspension points in the Future; long-running synchronous loops inside async tasks will block progress if they don't yield.Guard the shared state using Mutex
Spawn a task to manage the state and use message passing to operate on it
A synchronous mutex will block the current thread when waiting to acquire the lock. This, in turn, will block other tasks from processing. Switching to tokio::sync::Mutex will cause the task to yield control back to the executor, but this will usually not help with performance as the asynchronous mutex uses a synchronous mutex internally.
In std::sync::Mutex, std::sync::MutexGuard type is not Send. This means that you can't send a mutex lock to another thread, and the error happens because the Tokio runtime can move a task between threads at every .await. To avoid this, you should restructure your code such that the mutex lock's destructor runs before the .await.
The tokio::sync::Mutex type provided by Tokio can be used as an alternative. The primary feature of the Tokio mutex is that it can be held across an .await without any issues. That said, an asynchronous mutex is more expensive than an ordinary mutex, and it is typically better to use one of the two other approaches.
Using a blocking mutex to guard short critical sections is an acceptable strategy when contention is minimal. When a lock is contended, the thread executing the task must block and wait on the mutex. This will not only block the current task but it will also block all other tasks scheduled on the current thread (performance penalty and against ideas of async).
By default, the Tokio runtime uses a multi-threaded scheduler. Tasks are scheduled on any number of threads managed by the runtime. If a large number of tasks are scheduled to execute and they all require access to the mutex, then there will be contention. On the other hand, if the current_thread runtime flavor is used, then the mutex will never be contended.
Tokio provides a number of channels, each serving a different purpose.
mpsc: multi-producer, single-consumer channel. Many values can be sent.
oneshot: single-producer, single consumer channel. A single value can be sent.
broadcast: multi-producer, multi-consumer. Many values can be sent. Each receiver sees every value.
watch: multi-producer, multi-consumer. Many values can be sent, but no history is kept. Receivers only see the most recent value.
Send from multiple tasks by cloning Sender in mpsc
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await.unwrap();
});
tokio::spawn(async move {
tx2.send("sending from second handle").await.unwrap();
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
oneshotuse tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
use tokio::sync::oneshot;
use bytes::Bytes;
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "foo".to_string(),
resp: resp_tx,
};
// Send the GET request
tx.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// Send the SET request
tx2.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
I/O in Tokio operates in much the same way as in std, but asynchronously. There is a trait for reading (AsyncRead) and a trait for writing (AsyncWrite). Specific types implement these traits as appropriate (TcpStream, File, Stdout). AsyncReadand AsyncWrite are also implemented by a number of data structures, such as Vec<u8> and &[u8]. This allows using byte arrays where a reader or writer is expected.
async fn → compiled to a state machine that returns a lazy Future..await yields the current task (cooperative) and allows the executor to run others.tokio::spawn for concurrent tasks and tokio::task::spawn_blocking for blocking work..await.This document was drafted with the assistance of GitHub Copilot.