Skip to content

无畏并发

Rust 的"无畏并发"(Fearless Concurrency)是指利用编译器的强大检查机制,让开发者可以在编译期就发现大部分并发相关的致命 bug(如数据竞争、死锁等),即使新手也能写出安全可靠的多线程代码.

创建子线程

Rust 标准库提供了线程支持(std::thread).通过 thread::spawn 可以轻松创建多线程,让多个任务"几乎同时"运行.

示例:

rust
use std::thread;

fn main() {
  // thread::spawn 函数传递闭包,闭包内为子线程执行的代码
    let handle = thread::spawn(|| {
        for i in 1..5 {
            println!("子线程:{}", i);
        }
    });

    for i in 1..5 {
        println!("主线程:{}", i);
    }
    handle.join().unwrap(); // 等待子线程结束,确保子线程执行完毕
}

注意:join 方法会阻塞主线程,直到子线程执行完毕.

转移所有权

默认情况下,线程的闭包会捕获外部变量,使用 move 强制所有权转移进子线程,防止数据竞争,避免子线程获取到已经失效的变量.

rust
use std::thread;

fn main() {
    let v = vec![1,2,3];
    let handle = thread::spawn(move || {
        println!("子线程收到的数据: {:?}", v);
    });
    handle.join().unwrap();
}

消息传递通信

Rust 推崇"数据转移而不是共享".通过通道(channel)在线程间传递所有权,实现线程安全通信.

信道(channel)

表示数据从一个线程发送到另一个线程的通道.

信道有两个组成部分:一个发送端(transmitter)和一个接收端(receiver).发送端位于上游位置,在这里可以比作将橡皮鸭放入河中,接收端则位于下游,橡皮鸭最终会漂流至此.代码中的一部分调用发送端的方法以及希望发送的数据,另一部分则检查接收端收到的消息.当发送端或接收端任一被丢弃时可以认为信道被关闭(closed)了.

创建信道

使用 mpsc::channel 函数创建信道

  • mpsc 是 多生产者,单消费者(multiple producer, single consumer)的缩写,表示一个信道可以有多个发送端和单个接收端.
  • txrx 分别表示 发送端(transmitter)和 接收端(receiver)
rust
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
  • send 方法发送一个值到通道中,如果通道已经关闭,则返回错误,会转移所有权.
  • recv 方法从通道中接收一个值,如果当前没有值,则阻塞等待.
  • try_recv 方法从通道中接收一个值,如果当前没有值,则不阻塞,立即返回错误.

发送多个值

rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

创建多个生产者

使用 clone 方法创建多个发送端,每个发送端可以发送不同的值.

rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
  let (tx, rx) = mpsc::channel();
    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

共享状态并发

通过共享内存的方式来实现并发(多所有权),需要使用互斥锁(Mutex)和原子引用计数(Arc)来实现多线程安全地共享和修改一份数据.

互斥锁(Mutex)

互斥锁(Mutex)是一种用于保护共享资源的同步原语.它确保同一时刻最多只有一个线程能访问被保护的数据.

互斥锁的实现原理是:

  • 当一个线程想要访问共享资源时,它必须先获取互斥锁.
  • 当一个线程访问共享资源时,其他线程必须等待.
  • 当一个线程访问完共享资源后,它必须释放互斥锁.

作为一个现实中互斥器的例子,想象一下在某个会议的一次小组座谈会中,只有一个麦克风.如果一位成员要发言,他必须请求或表示希望使用麦克风.得到了麦克风后,他可以畅所欲言,讲完后再将麦克风交给下一位希望讲话的成员.如果一位成员结束发言后忘记将麦克风交还,其他人将无法发言.如果对共享麦克风的管理出现了问题,座谈会将无法正常进行!

  • Mutex::new(value) 创建一个新的互斥锁,并初始化值.
  • lock() 方法获取互斥锁,如果当前互斥锁被其他线程持有,则当前线程会阻塞等待.
  • try_lock() 方法尝试获取互斥锁,如果当前互斥锁被其他线程持有,则当前线程会立即返回错误.
  • unlock() 方法释放互斥锁.
rust
use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    } // 这里 num 离开作用域,自动释放锁

    println!("m = {m:?}");
}

死锁:两个线程互相等待对方释放锁.

Rust 只负责"线程安全",无法自动避免死锁,因此建议只持有锁的代码尽量短小,或者使用 try_lock() 防止长时间阻塞.

原子引用计数(Arc)

原子引用计数(Arc)是一种用于实现多线程安全地共享一份数据的机制.它使用原子操作来修改引用计数,有轻微性能开销,但保证多线程安全.

字母 a 代表 原子性(atomic),所以这是一个原子引用计数(atomically reference counted)指针.

  • Arc::new(value) 创建一个新的原子引用计数指针,并初始化值.
  • Arc::clone(&arc) 方法创建一个新的原子引用计数指针,并增加引用计数.
  • Arc::strong_count(&arc) 方法获取当前的强引用计数.
  • Arc::weak_count(&arc) 方法获取当前的弱引用计数.
rust
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let data = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let data = Arc::clone(&data);
        let handle = thread::spawn(move || {
            let mut num = data.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
    println!("最终结果: {}", *data.lock().unwrap());
}

单/多线程的修改和共享

RefCell<T>/Rc<T>Mutex<T>/Arc<T> 是两种不同的并发安全机制.

  • RefCell<T>/Rc<T> 是用于单线程安全地修改和共享一份数据.
  • Mutex<T>/Arc<T> 是用于多线程安全地修改和共享一份数据.

RefCell<T> vs Mutex<T>

特性RefCell<T>Mutex<T>
作用域单线程内部可变多线程互斥访问
线程安全❌ 非线程安全(只能单线程用)✅ 线程安全
可变性运行时内可变性互斥修改(必须加锁)
检查机制借用规则运行时检查(panic)上锁失败会阻塞/阻塞失败可报错
应用场景适用于结构体内部、Mock 测试等多线程环境下共享和修改资源
典型组合Rc<RefCell<T>>Arc<Mutex<T>>

Rc<T> vs Arc<T>

特性Rc<T>Arc<T>
作用域单线程的引用计数多线程的引用计数
线程安全❌ 非线程安全✅ 线程安全(原子操作实现)
主要用途单线程下多个所有者共享不可变数据多线程下多个所有者共享不可变数据
可变数据搭配 RefCell<T> 使用搭配 Mutex<T>/RwLock<T> 使用
典型组合Rc<RefCell<T>>Arc<Mutex<T>>, Arc<RwLock<T>>
性能性能高(无原子操作)性能略低(原子操作以保证安全)

原子类型(Atomic Types)

当数据极其简单,比如 usize,可以使用原子操作(不用锁,效率高,保证线程安全).

  • AtomicUsize::new(value) 创建一个新的原子 usize 变量,并初始化值.
  • fetch_add(value, ordering) 方法以原子方式将当前值增加指定值,并返回之前的值.
  • load(ordering) 方法以原子方式获取当前值.
  • store(value, ordering) 方法以原子方式设置当前值.

Ordering 参数指定内存顺序,常用 Ordering::SeqCst 保持全局一致性.

rust
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
    let counter = AtomicUsize::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let c = &counter;
        let handle = thread::spawn(move || {
            c.fetch_add(1, Ordering::SeqCst);
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("计数器结果: {}", counter.load(Ordering::SeqCst));
}

线程安全 trait:Send 和 Sync

SendSync 的定义

  • Send 表示一个类型的所有权可在线程间转移.大多数 Rust 类型都是 Send.
  • Sync 表示一个类型的引用可被多线程安全共享.比如 Mutex<T>Arc<T>Sync.

对于任意类型 T:T: Sync&T: Send

如果 T 实现了 Sync 的话 &T 就实现了 Send.

如果 &T实现了 Send 的话 T 就实现了 Sync.

Send 示例:

rust
 let x = 5;
std::thread::spawn(move || {
    println!("x = {x}"); // 这里 x 被"搬到"新线程了,所有权转移给了新线程,x实现了Send trait,所以可以在线程间转移
});

Sync 示例:

rust
use std::sync::Arc;
use std::thread;

fn main() {
    let data = Arc::new(vec![1, 2, 3, 4, 5]);

    let mut handles = vec![];

    for i in 0..3 {
        let data = Arc::clone(&data); // Vec<T>实现了Sync trait,所以引用可以在线程间共享

        let handle = thread::spawn(move || {
            println!("线程 {} 读取数据: {:?}", i, data);
        });
        handles.push(handle);
    }
    for h in handles {
        h.join().unwrap();
    }
}

编译器自动推导和检查 Send/Sync.如果尝试线程间传递非 Send 类型,会直接报错.

Rc<T>RefCell<T> 不是 SendSync

Send/Sync 类型表

类型SendSync说明
i32基本类型
String堆数据但安全
Vec<T>只读安全
Arc<T>线程安全引用计数
Mutex<T>互斥锁保护数据
Rc<T>非线程安全引用计数
RefCell<T>运行时借用检查
Cell<T>内部可变,不可共享

使用线程池

rayonthreadpool

大量小任务时,不建议反复新建线程.可以使用第三方库线程池高效分配任务.

rayon 并行 map:

rust
use rayon::prelude::*;

fn main() {
    let v = vec![1, 2, 3, 4, 5];
    let sum: i32 = v.par_iter().map(|x| x * x).sum();
    println!("平方和: {}", sum);
}

依赖添加:

toml
[dependencies]
rayon = "1.5"

总结

Rust 的并发模型设计宗旨:"要么不能编译,要么不会出错."让并发安全成为习惯,让 bug 和 panic 成为极小概率事件.这就是无畏并发!

基于 MIT 协议发布