无畏并发
Rust 的"无畏并发"(Fearless Concurrency)是指利用编译器的强大检查机制,让开发者可以在编译期就发现大部分并发相关的致命 bug(如数据竞争、死锁等),即使新手也能写出安全可靠的多线程代码.
创建子线程
Rust 标准库提供了线程支持(std::thread).通过 thread::spawn 可以轻松创建多线程,让多个任务"几乎同时"运行.
示例:
use std::thread;
fn main() {
// thread::spawn 函数传递闭包,闭包内为子线程执行的代码
let handle = thread::spawn(|| {
for i in 1..5 {
println!("子线程:{}", i);
}
});
for i in 1..5 {
println!("主线程:{}", i);
}
handle.join().unwrap(); // 等待子线程结束,确保子线程执行完毕
}注意:
join方法会阻塞主线程,直到子线程执行完毕.
转移所有权
默认情况下,线程的闭包会捕获外部变量,使用 move 强制所有权转移进子线程,防止数据竞争,避免子线程获取到已经失效的变量.
use std::thread;
fn main() {
let v = vec![1,2,3];
let handle = thread::spawn(move || {
println!("子线程收到的数据: {:?}", v);
});
handle.join().unwrap();
}消息传递通信
Rust 推崇"数据转移而不是共享".通过通道(channel)在线程间传递所有权,实现线程安全通信.
信道(channel)
表示数据从一个线程发送到另一个线程的通道.
信道有两个组成部分:一个发送端(transmitter)和一个接收端(receiver).发送端位于上游位置,在这里可以比作将橡皮鸭放入河中,接收端则位于下游,橡皮鸭最终会漂流至此.代码中的一部分调用发送端的方法以及希望发送的数据,另一部分则检查接收端收到的消息.当发送端或接收端任一被丢弃时可以认为信道被关闭(closed)了.
创建信道
使用 mpsc::channel 函数创建信道
mpsc是 多生产者,单消费者(multiple producer, single consumer)的缩写,表示一个信道可以有多个发送端和单个接收端.tx和rx分别表示 发送端(transmitter)和 接收端(receiver)
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}send方法发送一个值到通道中,如果通道已经关闭,则返回错误,会转移所有权.recv方法从通道中接收一个值,如果当前没有值,则阻塞等待.try_recv方法从通道中接收一个值,如果当前没有值,则不阻塞,立即返回错误.
发送多个值
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}创建多个生产者
使用 clone 方法创建多个发送端,每个发送端可以发送不同的值.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}共享状态并发
通过共享内存的方式来实现并发(多所有权),需要使用互斥锁(Mutex)和原子引用计数(Arc)来实现多线程安全地共享和修改一份数据.
互斥锁(Mutex)
互斥锁(Mutex)是一种用于保护共享资源的同步原语.它确保同一时刻最多只有一个线程能访问被保护的数据.
互斥锁的实现原理是:
- 当一个线程想要访问共享资源时,它必须先获取互斥锁.
- 当一个线程访问共享资源时,其他线程必须等待.
- 当一个线程访问完共享资源后,它必须释放互斥锁.
作为一个现实中互斥器的例子,想象一下在某个会议的一次小组座谈会中,只有一个麦克风.如果一位成员要发言,他必须请求或表示希望使用麦克风.得到了麦克风后,他可以畅所欲言,讲完后再将麦克风交给下一位希望讲话的成员.如果一位成员结束发言后忘记将麦克风交还,其他人将无法发言.如果对共享麦克风的管理出现了问题,座谈会将无法正常进行!
Mutex::new(value)创建一个新的互斥锁,并初始化值.lock()方法获取互斥锁,如果当前互斥锁被其他线程持有,则当前线程会阻塞等待.try_lock()方法尝试获取互斥锁,如果当前互斥锁被其他线程持有,则当前线程会立即返回错误.unlock()方法释放互斥锁.
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
} // 这里 num 离开作用域,自动释放锁
println!("m = {m:?}");
}死锁:两个线程互相等待对方释放锁.
Rust 只负责"线程安全",无法自动避免死锁,因此建议只持有锁的代码尽量短小,或者使用
try_lock()防止长时间阻塞.
原子引用计数(Arc)
原子引用计数(Arc)是一种用于实现多线程安全地共享一份数据的机制.它使用原子操作来修改引用计数,有轻微性能开销,但保证多线程安全.
字母
a代表 原子性(atomic),所以这是一个原子引用计数(atomically reference counted)指针.
Arc::new(value)创建一个新的原子引用计数指针,并初始化值.Arc::clone(&arc)方法创建一个新的原子引用计数指针,并增加引用计数.Arc::strong_count(&arc)方法获取当前的强引用计数.Arc::weak_count(&arc)方法获取当前的弱引用计数.
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let data = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let mut num = data.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终结果: {}", *data.lock().unwrap());
}单/多线程的修改和共享
RefCell<T>/Rc<T> 和 Mutex<T>/Arc<T> 是两种不同的并发安全机制.
RefCell<T>/Rc<T>是用于单线程安全地修改和共享一份数据.Mutex<T>/Arc<T>是用于多线程安全地修改和共享一份数据.
RefCell<T> vs Mutex<T>
| 特性 | RefCell<T> | Mutex<T> |
|---|---|---|
| 作用域 | 单线程内部可变 | 多线程互斥访问 |
| 线程安全 | ❌ 非线程安全(只能单线程用) | ✅ 线程安全 |
| 可变性 | 运行时内可变性 | 互斥修改(必须加锁) |
| 检查机制 | 借用规则运行时检查(panic) | 上锁失败会阻塞/阻塞失败可报错 |
| 应用场景 | 适用于结构体内部、Mock 测试等 | 多线程环境下共享和修改资源 |
| 典型组合 | Rc<RefCell<T>> | Arc<Mutex<T>> |
Rc<T> vs Arc<T>
| 特性 | Rc<T> | Arc<T> |
|---|---|---|
| 作用域 | 单线程的引用计数 | 多线程的引用计数 |
| 线程安全 | ❌ 非线程安全 | ✅ 线程安全(原子操作实现) |
| 主要用途 | 单线程下多个所有者共享不可变数据 | 多线程下多个所有者共享不可变数据 |
| 可变数据 | 搭配 RefCell<T> 使用 | 搭配 Mutex<T>/RwLock<T> 使用 |
| 典型组合 | Rc<RefCell<T>> | Arc<Mutex<T>>, Arc<RwLock<T>> |
| 性能 | 性能高(无原子操作) | 性能略低(原子操作以保证安全) |
原子类型(Atomic Types)
当数据极其简单,比如 usize,可以使用原子操作(不用锁,效率高,保证线程安全).
AtomicUsize::new(value)创建一个新的原子usize变量,并初始化值.fetch_add(value, ordering)方法以原子方式将当前值增加指定值,并返回之前的值.load(ordering)方法以原子方式获取当前值.store(value, ordering)方法以原子方式设置当前值.
Ordering参数指定内存顺序,常用Ordering::SeqCst保持全局一致性.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() {
let counter = AtomicUsize::new(0);
let mut handles = vec![];
for _ in 0..10 {
let c = &counter;
let handle = thread::spawn(move || {
c.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("计数器结果: {}", counter.load(Ordering::SeqCst));
}线程安全 trait:Send 和 Sync
Send 和 Sync 的定义
- Send 表示一个类型的所有权可在线程间转移.大多数 Rust 类型都是 Send.
- Sync 表示一个类型的引用可被多线程安全共享.比如
Mutex<T>、Arc<T>是Sync.
对于任意类型
T:T: Sync⇔&T: Send如果
T实现了Sync的话&T就实现了Send.如果
&T实现了Send的话T就实现了Sync.
Send 示例:
let x = 5;
std::thread::spawn(move || {
println!("x = {x}"); // 这里 x 被"搬到"新线程了,所有权转移给了新线程,x实现了Send trait,所以可以在线程间转移
});Sync 示例:
use std::sync::Arc;
use std::thread;
fn main() {
let data = Arc::new(vec![1, 2, 3, 4, 5]);
let mut handles = vec![];
for i in 0..3 {
let data = Arc::clone(&data); // Vec<T>实现了Sync trait,所以引用可以在线程间共享
let handle = thread::spawn(move || {
println!("线程 {} 读取数据: {:?}", i, data);
});
handles.push(handle);
}
for h in handles {
h.join().unwrap();
}
}编译器自动推导和检查 Send/Sync.如果尝试线程间传递非 Send 类型,会直接报错.
Rc<T>和RefCell<T>不是Send和Sync
Send/Sync 类型表
| 类型 | Send | Sync | 说明 |
|---|---|---|---|
i32 | ✅ | ✅ | 基本类型 |
String | ✅ | ✅ | 堆数据但安全 |
Vec<T> | ✅ | ✅ | 只读安全 |
Arc<T> | ✅ | ✅ | 线程安全引用计数 |
Mutex<T> | ✅ | ✅ | 互斥锁保护数据 |
Rc<T> | ❌ | ❌ | 非线程安全引用计数 |
RefCell<T> | ❌ | ❌ | 运行时借用检查 |
Cell<T> | ✅ | ❌ | 内部可变,不可共享 |
使用线程池
如 rayon、threadpool
大量小任务时,不建议反复新建线程.可以使用第三方库线程池高效分配任务.
rayon 并行 map:
use rayon::prelude::*;
fn main() {
let v = vec![1, 2, 3, 4, 5];
let sum: i32 = v.par_iter().map(|x| x * x).sum();
println!("平方和: {}", sum);
}依赖添加:
[dependencies]
rayon = "1.5"总结
Rust 的并发模型设计宗旨:"要么不能编译,要么不会出错."让并发安全成为习惯,让 bug 和 panic 成为极小概率事件.这就是无畏并发!