Mkdir700's Note

Mkdir700's Note

Tokio:应该使用哪种 mutex?

26
2025-03-15

基本概念

互斥锁(Mutex)是一种同步原语,用于保护共享数据,确保在任何时刻只有一个线程可以访问这些数据。

两种互斥锁的主要区别

1. 阻塞行为

  • 标准库互斥锁(std::sync::Mutex)

    • 当你调用 .lock() 时,如果锁已被占用,当前线程会阻塞直到获得锁
    • 阻塞意味着线程会暂停执行,操作系统会将 CPU 时间分配给其他线程
    • 这会阻塞整个线程,包括该线程上的所有其他任务
  • 异步互斥锁(tokio::sync::Mutex)

    • 当你调用 .lock().await 时,如果锁已被占用,当前任务会挂起而不是阻塞
    • 挂起意味着当前任务暂停执行,但线程可以继续执行其他任务
    • 这只会挂起当前异步任务,不会阻塞整个线程

2. 跨越.await 点

  • 标准库互斥锁

    • 不能安全地跨越 .await 点持有锁
    • 如果你在持有锁的同时执行 .await 操作,可能导致死锁
  • 异步互斥锁

    • 专门设计为可以跨越 .await 点持有锁
    • 允许你在持有锁的同时执行其他异步操作

3. 性能开销

  • 标准库互斥锁

    • 通常性能开销较小
    • 适合短时间持有的场景
  • 异步互斥锁

    • 由于支持异步操作,性能开销较大
    • 适合需要在持有锁的同时执行异步操作的场景

示例说明

示例 1:标准库互斥锁的基本使用

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // 创建一个被Arc包装的Mutex
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        // 克隆Arc以在多个线程间共享
        let counter = Arc::clone(&counter);
        
        // 创建新线程
        let handle = thread::spawn(move || {
            // 获取锁 - 如果锁被占用,这里会阻塞线程
            let mut num = counter.lock().unwrap();
            
            // 修改受保护的数据
            *num += 1;
            
            // 锁在这里自动释放(当num离开作用域时)
        });
        
        handles.push(handle);
    }

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印最终结果
    println!("最终计数: {}", *counter.lock().unwrap());
}

示例 2:异步互斥锁的基本使用

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // 创建一个被Arc包装的异步Mutex
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        // 克隆Arc以在多个任务间共享
        let counter = Arc::clone(&counter);
        
        // 创建新的异步任务
        let handle = tokio::spawn(async move {
            // 获取锁 - 如果锁被占用,这里会挂起当前任务,但不会阻塞线程
            let mut num = counter.lock().await;
            
            // 修改受保护的数据
            *num += 1;
            
            // 锁在这里自动释放(当num离开作用域时)
        });
        
        handles.push(handle);
    }

    // 等待所有任务完成
    for handle in handles {
        handle.await.unwrap();
    }

    // 打印最终结果
    println!("最终计数: {}", *counter.lock().await);
}

示例 3:展示跨越.await 点的区别

// 使用标准库Mutex - 可能导致死锁!
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
    
    tokio::spawn(async move {
        // 获取锁
        let mut lock = data.lock().unwrap();
        
        // 危险! 在持有锁的同时执行异步操作
        // 如果另一个任务也尝试获取这个锁并在此之前获得了执行权
        // 那么当前任务恢复执行时可能无法继续(死锁)
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        
        // 修改数据
        *lock += 1;
    });
}
// 使用异步Mutex - 安全地跨越.await点
use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
    
    tokio::spawn(async move {
        // 获取锁
        let mut lock = data.lock().await;
        
        // 安全! 异步Mutex设计为可以跨越.await点
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        
        // 修改数据
        *lock += 1;
    });
}

何时使用哪种互斥锁

使用标准库互斥锁的场景:

  • 保护纯内存数据(非 IO 资源)
  • 锁持有时间短
  • 不需要在持有锁时执行异步操作
  • 对性能要求高

使用异步互斥锁的场景:

  • 需要在持有锁的同时执行异步操作
  • 保护 IO 资源(如数据库连接)
  • 锁可能被长时间持有
  • 不希望阻塞整个线程

最佳实践

  1. 默认选择标准库互斥锁:除非有特殊需求,优先使用标准库互斥锁,因为它性能更好。

  2. 包装模式:将互斥锁包装在结构体中,提供方法来操作内部数据:

struct Counter {
    count: Arc<Mutex<i32>>,
}

impl Counter {
    fn new() -> Self {
        Self {
            count: Arc::new(Mutex::new(0)),
        }
    }
    
    fn increment(&self) {
        let mut count = self.count.lock().unwrap();
        *count += 1;
    }
    
    fn get_count(&self) -> i32 {
        *self.count.lock().unwrap()
    }
}
  1. 对于 IO 资源,考虑使用专门的管理任务
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建通道
    let (tx, mut rx) = mpsc::channel(100);
    
    // 启动管理任务
    tokio::spawn(async move {
        // 假设这是一个数据库连接
        let db_connection = establish_connection().await;
        
        // 处理请求
        while let Some(request) = rx.recv().await {
            // 使用连接处理请求
            process_request(&db_connection, request).await;
        }
    });
    
    // 发送请求
    tx.send(Request::new()).await.unwrap();
}