异步编程
Rust 的异步编程能够让你以一种高效且安全的方式处理 I/O 密集型或高并发任务,如 Web 服务、网络通信等.在 Rust 中,异步靠 async/await 语法、Future 和 Stream 特性实现.
async、await 和 Future
| 关键字 | 说明 |
|---|---|
async | 声明异步代码块、函数或闭包,返回一个实现了 Future trait 的类型 |
await | 等待一个 Future 完成并获取其结果,只能在异步上下文中使用 |
Future | 一个"现在也许还没准备好,但将来某时刻会准备好的值"(类似 JS 中的 Promise) |
在 Rust 里,
async只是在"打包"逻辑,.await才是"按下了播放键".只async不await的代码是不会执行的.
async 语法
Rust 中的 async 关键字可以用来定义异步函数、异步块和异步闭包.
1.async fn 异步函数
定义一个异步函数,返回一个实现了 Future trait 的类型.
async fn say_hello() {
println!("Hello from async!");
}调用
async fn会立即返回一个Future,实际的执行要靠 runtime 调度,可以通过.await获得最终结果.
2.async { ... } 异步块
定义一个异步块,返回一个实现了 Future trait 的匿名类型.
let future = async {
println!("Hello from async block!");
};异步块可以直接在需要 future 的地方使用,也可以赋值给变量.
3.async move { ... } 异步闭包
捕获外部环境变量并获得所有权,返回一个实现了 Future trait 的匿名类型.
let name = String::from("Alice");
let future = async move {
println!("Hello, {}!", name);
};
move关键字确保异步闭包捕获的变量被移动到闭包内部,使其在异步执行时仍然有效.
await 语法
await用于等待一个Future完成并获取其结果,必须在异步上下文中使用.- 与其他语言不同,Rust 的
await使用后缀.await语法,这样可以非常方便地进行链式调用来组合多个Future.
1.在异步函数中使用 await
async fn learn_async() -> u32 {
7
}
async fn async_main() {
let value = learn_async().await;
println!("value = {}", value);
}2.在异步块中使用 await
async fn learn_async() -> u32 {
7
}
let future = async {
let value = learn_async().await;
println!("value = {}", value);
};3.在异步闭包中使用 await
async fn learn_async() -> u32 {
7
}
let future = async move {
let value = learn_async().await;
println!("value = {}", value);
};Future Trait
Future 是一个代表"还未完成但最终会产生某个值"的 trait.标准库定义如下:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}- 编译器将
async代码转换为实现了Futuretrait 的等效代码,类似for循环被转换为使用Iteratortrait 的代码. - 开发者可以为自定义数据类型手动实现
Futuretrait,以便它们能够与async/await语法一起使用. poll方法是Futuretrait 的核心,它会被运行时调用来检查 future 是否完成:完成则返回Poll::Ready(value),否则返回Poll::Pending.
异步运行时(Runtime)
异步代码本身不会自动执行;async 块/函数只会返回一个 Future,必须由运行时不断 poll 才会推进执行.
async 块/函数调用 → 返回 Future → 运行时调度 poll → 任务推进⚠️ 标准库只提供
Futuretrait,不提供完整运行时.
main()不能直接写成裸async fn main()作为程序入口,因为 Rust 的可执行程序入口要求main是一个同步函数,返回()或Result<(), E>,不能返回Future.常见做法是使用运行时宏(如
#[tokio::main])或显式block_on,把异步入口包装成同步main.
使用 trpl
trpl 常用于《The Rust Programming Language》配套示例,便于学习 async 概念.
trpl::block_on:阻塞当前线程直到 future 完成(多线程运行时).trpl::block_on只能等待一个 future;若需等待多个,可结合trpl::join或trpl::join!使用.
在 trpl 库源码中,
trpl::run已被重命名为trpl::block_on,保留run仅为兼容旧版教程代码.
fn main() {
// trpl::block_on() 中只能有一个 future,也就是一个 async 块或 async fn 调用
// 但是这个 future 内部可以包含任意数量的 await 调用
trpl::block_on(async {
async_main().await;
});
}
async fn async_main() {
println!("Hello from trpl!");
}可简化为:
fn main() {
// 简写:直接传入异步函数返回的 Future
trpl::block_on(async_main());
}
async fn async_main() {
println!("Hello from trpl!");
}使用 Tokio
#[tokio::main] // 宏会生成一个 Tokio 运行时,并将 main 包装为异步入口
async fn main() {
async_main().await;
}
async fn async_main() {
println!("Hello from async world!");
}#[tokio::main] 近似等价于:
fn main() {
// 1. 初始化运行时
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
// 2. 将 main 内部逻辑作为异步任务并在运行时中执行
rt.block_on(async {
async_main().await;
});
}
async fn async_main() {
println!("Hello from async world!");
}使用 async-std
#[async_std::main]
async fn main() {
async_main().await;
}
async fn async_main() {
println!("Hello from async-std!");
}控制权还给运行时
await 让出控制权
在每个 await 点,如果被等待的 future 还没准备好,Rust 就会暂停当前任务并切换到其他任务.await 点之间的所有内容都是同步执行的.
⚠️ 如果在一个 async 代码块中做了大量工作却没有任何
await点,这个 future 就会阻止其他 future 取得进展(称为"任务饥饿").
use std::{thread, time::Duration};
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
fn main() {
trpl::block_on(async {
// 没有 await,两者顺序串行执行
slow("x", 1000);
slow("y", 1000);
let a = async {
println!("'a' started.");
slow("a", 30);
slow("a", 10);
slow("a", 20);
trpl::sleep(Duration::from_millis(50)).await; // 让出控制权,切换到 b
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
slow("b", 10);
slow("b", 15);
slow("b", 350);
trpl::sleep(Duration::from_millis(50)).await; // 让出控制权,切换到 a
println!("'b' finished.");
};
// select 等待 a、b 中任意一个完成,另一个被丢弃(适合竞速/超时场景)
trpl::select(a, b).await;
});
}
await就像一个"让步点":告诉运行时"我现在需要等待,你可以先处理其他任务,等我准备好了再回来."
主动让出控制权
使用 trpl::yield_now 主动让出执行权:
use std::{thread, time::Duration};
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::yield_now().await; // 主动让出,让 b 运行
slow("a", 10);
trpl::yield_now().await;
slow("a", 20);
trpl::yield_now().await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::yield_now().await; // 主动让出,让 a 运行
slow("b", 10);
trpl::yield_now().await;
slow("b", 15);
trpl::yield_now().await;
slow("b", 350);
trpl::yield_now().await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}并发与 async
在 Rust 中,async 主要提供的是并发(concurrency)能力:在一个线程内交替推进多个任务,而不是为每个任务都创建一个线程.
创建并发任务
trpl::spawn_task:在 trpl::block_on 创建的异步环境中创建一个新的异步任务并立即激活它.
use std::time::Duration;
fn main() {
trpl::block_on(async {
// spawn_task 会创建一个新的任务并立即激活它
// 同样需要主动阻塞当前任务,让 spawn_task 生成的任务有机会运行
trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
// 主 async 块结束后,spawn_task 生成的任务也会被关闭
});
}
spawn_task生成的任务会随主任务结束而关闭.若要等待其完成,需使用handle.await.
阻塞主 async 块
spawn_task返回一个句柄(handle),可以通过.await等待该任务完成并获取结果.
use std::time::Duration;
fn main() {
trpl::block_on(async {
let handle = trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
// 阻塞主 async 块,等待 spawn_task 任务完成
handle.await.unwrap();
});
}
handle.await.unwrap()类似多线程中的join,等待任务完成并传播 panic.
并发多个任务
| API | 说明 |
|---|---|
trpl::join(a, b) | 同时等待 2 个 future 完成 |
trpl::join3(a, b, c) | 同时等待 3 个 future 完成 |
trpl::join!(...) | 同时等待任意数量的 future 完成(类似 Promise.all) |
use std::time::Duration;
fn main() {
trpl::block_on(async {
let a = async { /* 任务 A */ };
let b = async { /* 任务 B */ };
// 串行:总耗时 = A + B
// a.await;
// b.await;
// 并发:总耗时 ≈ max(A, B)
trpl::join(a, b).await;
});
}顺序执行 vs 并发执行
| 方式 | 行为 |
|---|---|
trpl::block_on | 顶级运行环境,程序通常只有一个 |
trpl::spawn_task | 子任务,必须在异步环境内部调用 |
trpl::spawn_task并不是立即执行,依旧需要运行时调度,让spawn_task生成的任务有机会运行.
// 顺序执行:等 a 跑完再跑 b
trpl::block_on(async {
task_a().await;
task_b().await;
});
// 并发执行:a、b 交替推进
trpl::block_on(async {
let handle = trpl::spawn_task(task_a());
task_b().await;
let _ = handle.await; // 确保 task_a 也完成
});
// 上面 trpl::spawn_task 并发上等同于,但底层机制不同
trpl::block_on(async {
trpl::join(task_a(), task_b()).await;
});并发还是并行? 取决于运行时和核心数:单核时交替执行(并发),多核时可能真正同时执行(并行).
Tokio 并发示例
use tokio::time::{sleep, Duration};
async fn task_a() -> &'static str {
sleep(Duration::from_millis(100)).await;
"A done"
}
async fn task_b() -> &'static str {
sleep(Duration::from_millis(50)).await;
"B done"
}
#[tokio::main]
async fn main() {
let (a, b) = tokio::join!(task_a(), task_b());
println!("{}, {}", a, b);
}
tokio::join!并发:总耗时 ≈ max(task_a, task_b)
| 宏/函数 | 行为 | 类比 JS |
|---|---|---|
join! | 全部完成后返回 | Promise.all |
select! | 谁先完成处理谁,其余丢弃 | Promise.race |
并发消息传递
创建异步通道
- 使用
trpl::channel创建一个异步通道,发送者(tx)和接收者(rx)可以在不同的任务中使用. rx必须使用mut声明,因为它需要可变引用来接收消息.send方法不需要.await,也不会阻塞程序.recv是异步方法,需要使用.await来等待它们完成.
use trpl::channel;
fn main() {
trpl::block_on(async {
// 创建一个异步通道,发送者 tx 和接收者 rx
let (tx, mut rx) = channel::<u32>();
// 使用 move 确保 tx 在运行完成后被释放,从而确保信道关闭
// 让 rx.recv() 能正确返回 None 结束循环
trpl::spawn_task(async move {
for i in 1..=5 {
tx.send(i).unwrap(); // 发送消息到通道,等待发送完成
trpl::sleep(std::time::Duration::from_millis(500)).await;
}
});
// 接收消息并打印,等待消息到达
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
});
}
send和recv都是异步方法,需要使用.await等待完成.rx必须声明为mut.
多生产者单消费者(MPSC)
trpl::channel 默认为多生产者单消费者模式,发送者可通过 .clone() 在多个任务中使用.
use trpl::channel;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = channel::<u32>();
let tx_clone = tx.clone(); // 克隆发送者以供第二个任务使用
trpl::spawn_task(async move {
for i in 1..=10 {
tx.send(i).unwrap(); // 发送消息到通道1,等待发送完成
trpl::sleep(std::time::Duration::from_millis(500)).await;
}
});
trpl::spawn_task(async move {
for i in 1..=5 {
tx_clone.send(i).unwrap(); // 发送消息到通道2,等待发送完成
trpl::sleep(std::time::Duration::from_millis(500)).await;
}
});
// 接收消息并打印,等待消息到达
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
});
}多生产者多消费者(广播)
trpl::broadcast_channel 允许多个接收者订阅同一个发送者的消息.
use trpl::broadcast_channel;
fn main() {
trpl::block_on(async {
let (tx, mut rx1) = broadcast_channel::<u32>();
let tx_clone = tx.clone(); // 克隆发送者以供第二个任务使用
let mut rx2 = tx.subscribe(); // 创建第二个接收者
trpl::spawn_task(async move {
for i in 1..=5 {
tx.send(i).unwrap(); // 发送消息到通道1,等待发送完成
trpl::sleep(std::time::Duration::from_millis(500)).await;
}
});
trpl::spawn_task(async move {
for i in 6..=10 {
tx_clone.send(i).unwrap(); // 发送消息到通道2,等待发送完成
trpl::sleep(std::time::Duration::from_millis(500)).await;
}
});
// 接收者1,接收消息并打印,等待消息到达
trpl::spawn_task(async move {
while let Some(value) = rx1.recv().await {
println!("Receiver 1 got: {}", value);
}
});
// 接收者2,接收消息并打印,等待消息到达
trpl::spawn_task(async move {
while let Some(value) = rx2.recv().await {
println!("Receiver 2 got: {}", value);
}
});
});
}Stream:按顺序出现的 Future
| 概念 | 说明 |
|---|---|
Future | 表示一个将来值 |
Stream | 表示一系列将来值(异步版迭代器) |
Iterator::next()—— 同步取值:"我已经有这堆数据了,一个接一个拿."StreamExt::next().await—— 异步取值:"数据随时间慢慢产生,没产生时我可以去干别的."
Ext是extension的缩写.StreamExt是为Stream提供实用方法的扩展 trait,这是 Rust 社区中非常常见的模式.
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
// stream_from_iter 把普通迭代器转换成 Stream
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
})
}
Streamtrait 将Iterator和Future的特征结合在一起;StreamExt在其上提供了更高层的 API(包括next方法).
Tokio 版本:
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut s = stream::iter(vec![10, 20, 30]);
while let Some(item) = s.next().await {
println!("{}", item);
}
}典型 Stream 来源:
- 网络消息流(WebSocket、SSE)
- 文件/套接字分块读取
- 定时器事件序列
深入 async 相关的 Traits
Future Trait
定义
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}Output:future 最终解析成的值的类型(类似Iterator的Item).poll:运行时调用此方法推进 future;完成返回Poll::Ready(value),未完成返回Poll::Pending.
返回值 Poll
pub enum Poll<T> {
Ready(T), // future 已完成,结果可用
Pending, // future 还未完成,稍后再试
}Unpin 与 !Unpin
大多数类型可以自由移动,它们自动实现 Unpin(编译器派生).只有包含自引用的类型才是 !Unpin(不可移动).
| Trait | 含义 | 典型例子 |
|---|---|---|
Unpin | 可以安全移动,Pin 对其无实际约束 | 基本类型、String、Vec<T> 等 |
!Unpin | 不能安全移动,必须用 Pin 保护其内存地址 | async 块生成的 future |
手动标记 !Unpin:
use std::marker::PhantomPinned;
struct SelfReferential {
data: String,
ptr: *const String, // 指向自身 data 字段
_pin: PhantomPinned, // 标记此类型为 !Unpin
}
PhantomPinned是一个零大小类型,专门用于让结构体退出Unpin的自动实现.
操作 Pin 包裹的值:
use std::pin::pin;
// pin! 宏将值固定在栈上,返回 Pin<&mut T>
let mut fut = pin!(async {
println!("pinned future");
});
// 对于 Unpin 类型,可以直接获取可变引用
let mut s = String::from("hello");
let pinned = Pin::new(&mut s);
let inner: &mut String = Pin::into_inner(pinned); // Unpin 类型可以解包
// 对于 !Unpin 类型,无法调用 Pin::into_inner实践规则:
- 绝大多数使用场景不需要手动处理
Pin,async/await编译器会自动处理. - 当你需要把多个
Future存入集合(如Vec)时,才需要显式使用Pin<Box<dyn Future>>或pin!宏.
// 堆上 pin(动态分发)
let fut: Pin<Box<dyn Future<Output = ()>>> = Box::pin(async {
println!("heap pinned");
});
// 栈上 pin(零分配)
let fut = pin!(async {
println!("stack pinned");
});Pin 与 Unpin
Rust 为 async 代码块生成的 future 可能包含自引用(某字段引用自身其他字段).若 future 在执行过程中被移动,这些指针就会变成悬垂指针,导致内存安全问题.
| 类型/Trait | 作用 |
|---|---|
Pin<P> | 保证被包裹的值在内存中地址不变,防止自引用失效 |
Unpin | 标记该类型可以安全移动,无需 Pin 保护 |
将 future pin 住以存入向量:
use std::pin::{Pin, pin};
let tx1_fut = pin!(async move {
// ...
});
let rx_fut = pin!(async {
// ...
});
let tx_fut = pin!(async move {
// ...
});
let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
vec![tx1_fut, rx_fut, tx_fut];
// 使用 join_all 等待所有 future 完成
trpl::join_all(futures).await;Stream Trait
Stream 是异步版的 Iterator,随时间产生多个值.
use std::pin::Pin;
use std::task::{Context, Poll};
trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
poll_next既像Future::poll(轮询就绪状态),又像Iterator::next(逐条产生值). 返回Poll<Option<Self::Item>>:
- 外层
Poll:是否就绪- 内层
Option:是否还有更多值(None表示流结束)
Future、任务和线程
| 概念 | 说明 |
|---|---|
| Future | 惰性计算描述,本身不自动执行 |
| 任务(Task) | 运行时调度的执行单元(把 future 提交给 runtime) |
| 线程(Thread) | 操作系统调度单元 |
执行流程:
编写 async fn → 得到 Future → 提交给运行时(如 tokio::spawn)→ 变成 Task → 运行时在线程上调度如何选择:
- CPU 密集型(大量计算、可并行拆分)→ 优先使用线程
- I/O 密集型(网络请求、文件读写、高并发连接)→ 优先使用 async
结论:
async不是"自动多线程".是否多线程、如何调度,取决于所用运行时的配置.
在线程中发送消息,在 async 中接收:
use std::{thread, time::Duration};
fn main() {
let (tx, mut rx) = trpl::channel();
// 在新线程中同步发送消息
thread::spawn(move || {
for i in 1..=10 {
tx.send(i).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 在异步环境中接收消息
trpl::block_on(async {
while let Some(message) = rx.recv().await {
println!("{message}");
}
});
}