Skip to content

异步编程

Rust 的异步编程能够让你以一种高效且安全的方式处理 I/O 密集型或高并发任务,如 Web 服务、网络通信等.在 Rust 中,异步靠 async/await 语法、FutureStream 特性实现.

async、await 和 Future

关键字说明
async声明异步代码块、函数或闭包,返回一个实现了 Future trait 的类型
await等待一个 Future 完成并获取其结果,只能在异步上下文中使用
Future一个"现在也许还没准备好,但将来某时刻会准备好的值"(类似 JS 中的 Promise

在 Rust 里,async 只是在"打包"逻辑,.await 才是"按下了播放键".只 asyncawait 的代码是不会执行的.

async 语法

Rust 中的 async 关键字可以用来定义异步函数、异步块和异步闭包.

1.async fn 异步函数

定义一个异步函数,返回一个实现了 Future trait 的类型.

rust
async fn say_hello() {
    println!("Hello from async!");
}

调用 async fn 会立即返回一个 Future,实际的执行要靠 runtime 调度,可以通过 .await 获得最终结果.

2.async { ... } 异步块

定义一个异步块,返回一个实现了 Future trait 的匿名类型.

rust
let future = async {
    println!("Hello from async block!");
};

异步块可以直接在需要 future 的地方使用,也可以赋值给变量.

3.async move { ... } 异步闭包

捕获外部环境变量并获得所有权,返回一个实现了 Future trait 的匿名类型.

rust
let name = String::from("Alice");
let future = async move {
    println!("Hello, {}!", name);
};

move 关键字确保异步闭包捕获的变量被移动到闭包内部,使其在异步执行时仍然有效.

await 语法

  • await 用于等待一个 Future 完成并获取其结果,必须在异步上下文中使用.
  • 与其他语言不同,Rust 的 await 使用后缀 .await 语法,这样可以非常方便地进行链式调用来组合多个 Future.

1.在异步函数中使用 await

rust
async fn learn_async() -> u32 {
    7
}

async fn async_main() {
    let value = learn_async().await;
    println!("value = {}", value);
}

2.在异步块中使用 await

rust
async fn learn_async() -> u32 {
    7
}

let future = async {
    let value = learn_async().await;
    println!("value = {}", value);
};

3.在异步闭包中使用 await

rust
async fn learn_async() -> u32 {
    7
}

let future = async move {
    let value = learn_async().await;
    println!("value = {}", value);
};

Future Trait

Future 是一个代表"还未完成但最终会产生某个值"的 trait.标准库定义如下:

rust
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
  • 编译器将 async 代码转换为实现了 Future trait 的等效代码,类似 for 循环被转换为使用 Iterator trait 的代码.
  • 开发者可以为自定义数据类型手动实现 Future trait,以便它们能够与 async/await 语法一起使用.
  • poll 方法是 Future trait 的核心,它会被运行时调用来检查 future 是否完成:完成则返回 Poll::Ready(value),否则返回 Poll::Pending.

异步运行时(Runtime)

异步代码本身不会自动执行async 块/函数只会返回一个 Future,必须由运行时不断 poll 才会推进执行.

async 块/函数调用 → 返回 Future → 运行时调度 poll → 任务推进

⚠️ 标准库只提供 Future trait,不提供完整运行时.

main() 不能直接写成裸 async fn main() 作为程序入口,因为 Rust 的可执行程序入口要求 main 是一个同步函数,返回 ()Result<(), E>,不能返回 Future.

常见做法是使用运行时宏(如 #[tokio::main])或显式 block_on,把异步入口包装成同步 main.

使用 trpl

trpl 常用于《The Rust Programming Language》配套示例,便于学习 async 概念.

  • trpl::block_on:阻塞当前线程直到 future 完成(多线程运行时).
  • trpl::block_on 只能等待一个 future;若需等待多个,可结合 trpl::jointrpl::join! 使用.

在 trpl 库源码中,trpl::run 已被重命名为 trpl::block_on,保留 run 仅为兼容旧版教程代码.

rust
fn main() {
    // trpl::block_on() 中只能有一个 future,也就是一个 async 块或 async fn 调用
    // 但是这个 future 内部可以包含任意数量的 await 调用
    trpl::block_on(async {
        async_main().await;
    });
}

async fn async_main() {
    println!("Hello from trpl!");
}

可简化为:

rust
fn main() {
    // 简写:直接传入异步函数返回的 Future
    trpl::block_on(async_main());
}

async fn async_main() {
    println!("Hello from trpl!");
}

使用 Tokio

rust
#[tokio::main] // 宏会生成一个 Tokio 运行时,并将 main 包装为异步入口
async fn main() {
    async_main().await;
}

async fn async_main() {
    println!("Hello from async world!");
}

#[tokio::main] 近似等价于:

rust
fn main() {
    // 1. 初始化运行时
    let rt = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap();

    // 2. 将 main 内部逻辑作为异步任务并在运行时中执行
    rt.block_on(async {
        async_main().await;
    });
}

async fn async_main() {
    println!("Hello from async world!");
}

使用 async-std

rust
#[async_std::main]
async fn main() {
    async_main().await;
}

async fn async_main() {
    println!("Hello from async-std!");
}

控制权还给运行时

await 让出控制权

在每个 await 点,如果被等待的 future 还没准备好,Rust 就会暂停当前任务并切换到其他任务.await 点之间的所有内容都是同步执行的.

⚠️ 如果在一个 async 代码块中做了大量工作却没有任何 await 点,这个 future 就会阻止其他 future 取得进展(称为"任务饥饿").

rust
use std::{thread, time::Duration};

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

fn main() {
    trpl::block_on(async {
        // 没有 await,两者顺序串行执行
        slow("x", 1000);
        slow("y", 1000);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await; // 让出控制权,切换到 b
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await; // 让出控制权,切换到 a
            println!("'b' finished.");
        };

        // select 等待 a、b 中任意一个完成,另一个被丢弃(适合竞速/超时场景)
        trpl::select(a, b).await;
    });
}

await 就像一个"让步点":告诉运行时"我现在需要等待,你可以先处理其他任务,等我准备好了再回来."

主动让出控制权

使用 trpl::yield_now 主动让出执行权:

rust
use std::{thread, time::Duration};

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await; // 主动让出,让 b 运行
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await; // 主动让出,让 a 运行
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

并发与 async

在 Rust 中,async 主要提供的是并发(concurrency)能力:在一个线程内交替推进多个任务,而不是为每个任务都创建一个线程.

创建并发任务

trpl::spawn_task:在 trpl::block_on 创建的异步环境中创建一个新的异步任务并立即激活它.

rust
use std::time::Duration;

fn main() {
    trpl::block_on(async {
        // spawn_task 会创建一个新的任务并立即激活它
        // 同样需要主动阻塞当前任务,让 spawn_task 生成的任务有机会运行
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
        // 主 async 块结束后,spawn_task 生成的任务也会被关闭
    });
}

spawn_task 生成的任务会随主任务结束而关闭.若要等待其完成,需使用 handle.await.

阻塞主 async 块

  • spawn_task 返回一个句柄(handle),可以通过 .await 等待该任务完成并获取结果.
rust
use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        // 阻塞主 async 块,等待 spawn_task 任务完成
        handle.await.unwrap();
    });
}

handle.await.unwrap() 类似多线程中的 join,等待任务完成并传播 panic.

并发多个任务

API说明
trpl::join(a, b)同时等待 2 个 future 完成
trpl::join3(a, b, c)同时等待 3 个 future 完成
trpl::join!(...)同时等待任意数量的 future 完成(类似 Promise.all
rust
use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let a = async { /* 任务 A */ };
        let b = async { /* 任务 B */ };

        // 串行:总耗时 = A + B
        // a.await;
        // b.await;

        // 并发:总耗时 ≈ max(A, B)
        trpl::join(a, b).await;
    });
}

顺序执行 vs 并发执行

方式行为
trpl::block_on顶级运行环境,程序通常只有一个
trpl::spawn_task子任务,必须在异步环境内部调用
  • trpl::spawn_task 并不是立即执行,依旧需要运行时调度,让 spawn_task 生成的任务有机会运行.
rust
// 顺序执行:等 a 跑完再跑 b
trpl::block_on(async {
    task_a().await;
    task_b().await;
});

// 并发执行:a、b 交替推进
trpl::block_on(async {
    let handle = trpl::spawn_task(task_a());
    task_b().await;
    let _ = handle.await; // 确保 task_a 也完成
});

// 上面 trpl::spawn_task 并发上等同于,但底层机制不同
trpl::block_on(async {
    trpl::join(task_a(), task_b()).await;
});

并发还是并行? 取决于运行时和核心数:单核时交替执行(并发),多核时可能真正同时执行(并行).

Tokio 并发示例

rust
use tokio::time::{sleep, Duration};

async fn task_a() -> &'static str {
    sleep(Duration::from_millis(100)).await;
    "A done"
}

async fn task_b() -> &'static str {
    sleep(Duration::from_millis(50)).await;
    "B done"
}

#[tokio::main]
async fn main() {
    let (a, b) = tokio::join!(task_a(), task_b());
    println!("{}, {}", a, b);
}

tokio::join! 并发:总耗时 ≈ max(task_a, task_b)

宏/函数行为类比 JS
join!全部完成后返回Promise.all
select!谁先完成处理谁,其余丢弃Promise.race

并发消息传递

创建异步通道

  • 使用trpl::channel 创建一个异步通道,发送者(tx)和接收者(rx)可以在不同的任务中使用.
  • rx 必须使用 mut 声明,因为它需要可变引用来接收消息.
  • send 方法不需要 .await,也不会阻塞程序.
  • recv 是异步方法,需要使用 .await 来等待它们完成.
rust
use trpl::channel;

fn main() {
    trpl::block_on(async {
        // 创建一个异步通道,发送者 tx 和接收者 rx
        let (tx, mut rx) = channel::<u32>();

        // 使用 move 确保 tx 在运行完成后被释放,从而确保信道关闭
        // 让 rx.recv() 能正确返回 None 结束循环
        trpl::spawn_task(async move {
            for i in 1..=5 {
                tx.send(i).unwrap();  // 发送消息到通道,等待发送完成
                trpl::sleep(std::time::Duration::from_millis(500)).await;
            }
        });

        // 接收消息并打印,等待消息到达
        while let Some(value) = rx.recv().await {
            println!("Received: {}", value);
        }
    });
}

sendrecv 都是异步方法,需要使用 .await 等待完成.rx 必须声明为 mut.

多生产者单消费者(MPSC)

trpl::channel 默认为多生产者单消费者模式,发送者可通过 .clone() 在多个任务中使用.

rust
use trpl::channel;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = channel::<u32>();
        let tx_clone = tx.clone(); // 克隆发送者以供第二个任务使用

        trpl::spawn_task(async move {
            for i in 1..=10 {
                tx.send(i).unwrap(); // 发送消息到通道1,等待发送完成
                trpl::sleep(std::time::Duration::from_millis(500)).await;
            }
        });

        trpl::spawn_task(async move {
            for i in 1..=5 {
                tx_clone.send(i).unwrap(); // 发送消息到通道2,等待发送完成
                trpl::sleep(std::time::Duration::from_millis(500)).await;
            }
        });

        // 接收消息并打印,等待消息到达
        while let Some(value) = rx.recv().await {
            println!("Received: {}", value);
        }
    });
}

多生产者多消费者(广播)

trpl::broadcast_channel 允许多个接收者订阅同一个发送者的消息.

rust
use trpl::broadcast_channel;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx1) = broadcast_channel::<u32>();
        let tx_clone = tx.clone(); // 克隆发送者以供第二个任务使用
        let mut rx2 = tx.subscribe(); // 创建第二个接收者

        trpl::spawn_task(async move {
            for i in 1..=5 {
                tx.send(i).unwrap(); // 发送消息到通道1,等待发送完成
                trpl::sleep(std::time::Duration::from_millis(500)).await;
            }
        });

        trpl::spawn_task(async move {
            for i in 6..=10 {
                tx_clone.send(i).unwrap(); // 发送消息到通道2,等待发送完成
                trpl::sleep(std::time::Duration::from_millis(500)).await;
            }
        });

        // 接收者1,接收消息并打印,等待消息到达
        trpl::spawn_task(async move {
            while let Some(value) = rx1.recv().await {
                println!("Receiver 1 got: {}", value);
            }
        });

        // 接收者2,接收消息并打印,等待消息到达
        trpl::spawn_task(async move {
            while let Some(value) = rx2.recv().await {
                println!("Receiver 2 got: {}", value);
            }
        });
    });
}

Stream:按顺序出现的 Future

概念说明
Future表示一个将来值
Stream表示一系列将来值(异步版迭代器)
  • Iterator::next() —— 同步取值:"我已经有这堆数据了,一个接一个拿."
  • StreamExt::next().await —— 异步取值:"数据随时间慢慢产生,没产生时我可以去干别的."

Extextension 的缩写.StreamExt 是为 Stream 提供实用方法的扩展 trait,这是 Rust 社区中非常常见的模式.

rust
use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);

        // stream_from_iter 把普通迭代器转换成 Stream
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    })
}

Stream trait 将 IteratorFuture 的特征结合在一起;StreamExt 在其上提供了更高层的 API(包括 next 方法).

Tokio 版本:

rust
use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let mut s = stream::iter(vec![10, 20, 30]);

    while let Some(item) = s.next().await {
        println!("{}", item);
    }
}

典型 Stream 来源:

  • 网络消息流(WebSocket、SSE)
  • 文件/套接字分块读取
  • 定时器事件序列

深入 async 相关的 Traits

Future Trait

定义

rust
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
  • Output:future 最终解析成的值的类型(类似 IteratorItem).
  • poll:运行时调用此方法推进 future;完成返回 Poll::Ready(value),未完成返回 Poll::Pending.

返回值 Poll

rust
pub enum Poll<T> {
    Ready(T),   // future 已完成,结果可用
    Pending,    // future 还未完成,稍后再试
}

Unpin 与 !Unpin

大多数类型可以自由移动,它们自动实现 Unpin(编译器派生).只有包含自引用的类型才是 !Unpin(不可移动).

Trait含义典型例子
Unpin可以安全移动,Pin 对其无实际约束基本类型、StringVec<T>
!Unpin不能安全移动,必须用 Pin 保护其内存地址async 块生成的 future

手动标记 !Unpin:

rust
use std::marker::PhantomPinned;

struct SelfReferential {
    data: String,
    ptr: *const String, // 指向自身 data 字段
    _pin: PhantomPinned, // 标记此类型为 !Unpin
}

PhantomPinned 是一个零大小类型,专门用于让结构体退出 Unpin 的自动实现.

操作 Pin 包裹的值:

rust
use std::pin::pin;

// pin! 宏将值固定在栈上,返回 Pin<&mut T>
let mut fut = pin!(async {
    println!("pinned future");
});

// 对于 Unpin 类型,可以直接获取可变引用
let mut s = String::from("hello");
let pinned = Pin::new(&mut s);
let inner: &mut String = Pin::into_inner(pinned); // Unpin 类型可以解包

// 对于 !Unpin 类型,无法调用 Pin::into_inner

实践规则:

  • 绝大多数使用场景不需要手动处理 Pin,async/await 编译器会自动处理.
  • 当你需要把多个 Future 存入集合(如 Vec)时,才需要显式使用 Pin<Box<dyn Future>>pin! 宏.
rust
// 堆上 pin(动态分发)
let fut: Pin<Box<dyn Future<Output = ()>>> = Box::pin(async {
    println!("heap pinned");
});

// 栈上 pin(零分配)
let fut = pin!(async {
    println!("stack pinned");
});

Pin 与 Unpin

Rust 为 async 代码块生成的 future 可能包含自引用(某字段引用自身其他字段).若 future 在执行过程中被移动,这些指针就会变成悬垂指针,导致内存安全问题.

类型/Trait作用
Pin<P>保证被包裹的值在内存中地址不变,防止自引用失效
Unpin标记该类型可以安全移动,无需 Pin 保护

将 future pin 住以存入向量:

rust
use std::pin::{Pin, pin};

let tx1_fut = pin!(async move {
    // ...
});

let rx_fut = pin!(async {
    // ...
});

let tx_fut = pin!(async move {
    // ...
});

let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
    vec![tx1_fut, rx_fut, tx_fut];

// 使用 join_all 等待所有 future 完成
trpl::join_all(futures).await;

Stream Trait

Stream 是异步版的 Iterator,随时间产生多个值.

rust
use std::pin::Pin;
use std::task::{Context, Poll};

trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>>;
}

poll_next 既像 Future::poll(轮询就绪状态),又像 Iterator::next(逐条产生值). 返回 Poll<Option<Self::Item>>:

  • 外层 Poll:是否就绪
  • 内层 Option:是否还有更多值(None 表示流结束)

Future、任务和线程

概念说明
Future惰性计算描述,本身不自动执行
任务(Task)运行时调度的执行单元(把 future 提交给 runtime)
线程(Thread)操作系统调度单元

执行流程:

编写 async fn → 得到 Future → 提交给运行时(如 tokio::spawn)→ 变成 Task → 运行时在线程上调度

如何选择:

  • CPU 密集型(大量计算、可并行拆分)→ 优先使用线程
  • I/O 密集型(网络请求、文件读写、高并发连接)→ 优先使用 async

结论:async 不是"自动多线程".是否多线程、如何调度,取决于所用运行时的配置.

在线程中发送消息,在 async 中接收:

rust
use std::{thread, time::Duration};

fn main() {
    let (tx, mut rx) = trpl::channel();

    // 在新线程中同步发送消息
    thread::spawn(move || {
        for i in 1..=10 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // 在异步环境中接收消息
    trpl::block_on(async {
        while let Some(message) = rx.recv().await {
            println!("{message}");
        }
    });
}

基于 MIT 协议发布