Tokio:应该使用哪种 mutex?
编辑
26
2025-03-15
基本概念
互斥锁(Mutex)是一种同步原语,用于保护共享数据,确保在任何时刻只有一个线程可以访问这些数据。
两种互斥锁的主要区别
1. 阻塞行为
-
标准库互斥锁(
std::sync::Mutex
):- 当你调用
.lock()
时,如果锁已被占用,当前线程会阻塞直到获得锁 - 阻塞意味着线程会暂停执行,操作系统会将 CPU 时间分配给其他线程
- 这会阻塞整个线程,包括该线程上的所有其他任务
- 当你调用
-
异步互斥锁(
tokio::sync::Mutex
):- 当你调用
.lock().await
时,如果锁已被占用,当前任务会挂起而不是阻塞 - 挂起意味着当前任务暂停执行,但线程可以继续执行其他任务
- 这只会挂起当前异步任务,不会阻塞整个线程
- 当你调用
2. 跨越.await 点
-
标准库互斥锁:
- 不能安全地跨越
.await
点持有锁 - 如果你在持有锁的同时执行
.await
操作,可能导致死锁
- 不能安全地跨越
-
异步互斥锁:
- 专门设计为可以跨越
.await
点持有锁 - 允许你在持有锁的同时执行其他异步操作
- 专门设计为可以跨越
3. 性能开销
-
标准库互斥锁:
- 通常性能开销较小
- 适合短时间持有的场景
-
异步互斥锁:
- 由于支持异步操作,性能开销较大
- 适合需要在持有锁的同时执行异步操作的场景
示例说明
示例 1:标准库互斥锁的基本使用
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// 创建一个被Arc包装的Mutex
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
// 克隆Arc以在多个线程间共享
let counter = Arc::clone(&counter);
// 创建新线程
let handle = thread::spawn(move || {
// 获取锁 - 如果锁被占用,这里会阻塞线程
let mut num = counter.lock().unwrap();
// 修改受保护的数据
*num += 1;
// 锁在这里自动释放(当num离开作用域时)
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 打印最终结果
println!("最终计数: {}", *counter.lock().unwrap());
}
示例 2:异步互斥锁的基本使用
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
// 创建一个被Arc包装的异步Mutex
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
// 克隆Arc以在多个任务间共享
let counter = Arc::clone(&counter);
// 创建新的异步任务
let handle = tokio::spawn(async move {
// 获取锁 - 如果锁被占用,这里会挂起当前任务,但不会阻塞线程
let mut num = counter.lock().await;
// 修改受保护的数据
*num += 1;
// 锁在这里自动释放(当num离开作用域时)
});
handles.push(handle);
}
// 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
// 打印最终结果
println!("最终计数: {}", *counter.lock().await);
}
示例 3:展示跨越.await 点的区别
// 使用标准库Mutex - 可能导致死锁!
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0));
tokio::spawn(async move {
// 获取锁
let mut lock = data.lock().unwrap();
// 危险! 在持有锁的同时执行异步操作
// 如果另一个任务也尝试获取这个锁并在此之前获得了执行权
// 那么当前任务恢复执行时可能无法继续(死锁)
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// 修改数据
*lock += 1;
});
}
// 使用异步Mutex - 安全地跨越.await点
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0));
tokio::spawn(async move {
// 获取锁
let mut lock = data.lock().await;
// 安全! 异步Mutex设计为可以跨越.await点
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// 修改数据
*lock += 1;
});
}
何时使用哪种互斥锁
使用标准库互斥锁的场景:
- 保护纯内存数据(非 IO 资源)
- 锁持有时间短
- 不需要在持有锁时执行异步操作
- 对性能要求高
使用异步互斥锁的场景:
- 需要在持有锁的同时执行异步操作
- 保护 IO 资源(如数据库连接)
- 锁可能被长时间持有
- 不希望阻塞整个线程
最佳实践
-
默认选择标准库互斥锁:除非有特殊需求,优先使用标准库互斥锁,因为它性能更好。
-
包装模式:将互斥锁包装在结构体中,提供方法来操作内部数据:
struct Counter {
count: Arc<Mutex<i32>>,
}
impl Counter {
fn new() -> Self {
Self {
count: Arc::new(Mutex::new(0)),
}
}
fn increment(&self) {
let mut count = self.count.lock().unwrap();
*count += 1;
}
fn get_count(&self) -> i32 {
*self.count.lock().unwrap()
}
}
- 对于 IO 资源,考虑使用专门的管理任务:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// 创建通道
let (tx, mut rx) = mpsc::channel(100);
// 启动管理任务
tokio::spawn(async move {
// 假设这是一个数据库连接
let db_connection = establish_connection().await;
// 处理请求
while let Some(request) = rx.recv().await {
// 使用连接处理请求
process_request(&db_connection, request).await;
}
});
// 发送请求
tx.send(Request::new()).await.unwrap();
}
- 0
- 0
-
分享