/ 我的程序人生 数据库 / 33浏览

SQLite 锁机制:读锁策略与并发事务分析

引言

在使用 SQLite 数据库时,我们经常会遇到并发事务的问题。本文将通过一个具体的实验来深入探讨 SQLite 的锁机制,特别是关于读操作的锁获取策略。

SQLite 中的读操作锁策略

SQLite 中的读操作获取锁的策略会根据不同的上下文而变化:

1. 事务外的读操作

在自动提交(autocommit)模式下:

  • 单条 SELECT 语句不会获取 SHARED 锁
  • 读取完成后立即释放所有锁

2. 显式事务内的读操作

在显式事务中:

  • BEGIN TRANSACTION 后的读操作会获取 SHARED 锁
  • SHARED 锁会持续到事务结束
  • 多个事务可以同时持有 SHARED 锁

3. 不同隔离级别的影响

  • SERIALIZABLE(默认):读操作获取并保持 SHARED 锁
  • READ UNCOMMITTED:读操作不获取锁,可能读到未提交数据

实验验证

让我们通过一个具体的实验来验证这些行为。实验代码模拟了两个并发事务:事务 A 进行读写操作,事务 B 进行读操作。

有一个事务 A,事务 A 开启之后,先进行了一些查询操作,然后进行了 update 操作,接着又进行了查询操作,但在 commit 之前,事务 B 开始进行了查询操作。

假设在事务 B 长期持有 shared 锁不释放,事务 A 也无法提交成功。

完整代码

import logging
import sqlite3
import threading
import time
from contextlib import contextmanager

# 配置日志
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(threadName)s - %(message)s"
)
logger = logging.getLogger(__name__)


class TestDB:
    def __init__(self, db_path: str = "test.db"):
        self.db_path = db_path
        self.init_db()

    def init_db(self):
        """初始化数据库和测试数据"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            # 创建测试表
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS test_table (
                    id INTEGER PRIMARY KEY,
                    value TEXT
                )
            """
            )
            # 插入测试数据
            cursor.execute("DELETE FROM test_table")
            cursor.execute(
                "INSERT INTO test_table (id, value) VALUES (1, 'test_value')"
            )
            conn.commit()

    @contextmanager
    def get_connection(self) -> sqlite3.Connection:
        """获取数据库连接的上下文管理器"""
        conn = sqlite3.connect(self.db_path, timeout=1)  # 设置较短的超时时间
        conn.execute("PRAGMA busy_timeout = 5000")
        try:
            yield conn
        finally:
            conn.close()


def transaction_a(db: TestDB, event: threading.Event):
    """模拟事务A的操作"""
    try:
        with db.get_connection() as conn:
            logger.info("事务A开始")
            conn.execute("BEGIN")  # 立即获取写锁

            # 查询数据
            logger.info("事务A: 执行查询")
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM test_table WHERE id = 1")
            result = cursor.fetchone()
            logger.info(f"事务A: 查询结果 = {result}")

            # 通知事务B开始
            event.set()
            # 故意等待一段时间, 等待 B 事务获取 shared 锁
            time.sleep(1)

            # 更新数据
            logger.info("事务A: 执行更新")
            cursor.execute(
                "UPDATE test_table SET value = 'updated_by_A' WHERE id = 1"
            )
            logger.info("事务A: 更新完成")

            # 再次查询
            logger.info("事务A: 再次查询")
            cursor.execute("SELECT * FROM test_table WHERE id = 1")
            result = cursor.fetchone()
            logger.info(f"事务A: 查询结果 = {result}")

            # 提交事务
            logger.info("事务A: 尝试提交")
            conn.commit()
            logger.info("事务A: 提交成功")

    except Exception as e:
        logger.error(f"事务A发生错误: {e}")
        raise


def transaction_b(db: TestDB, event: threading.Event):
    """模拟事务B的操作"""
    try:
        # 等待事务A的信号
        event.wait()

        with db.get_connection() as conn:
            logger.info("事务B开始")
            conn.execute("BEGIN")

            logger.info("事务B: 执行查询")
            cursor = conn.cursor()

            cursor.execute("SELECT * FROM test_table WHERE id = 1")
            result = cursor.fetchone()
            logger.info(f"事务B: 查询结果 = {result}")

            # 持续保持共享锁一段时间
            logger.info("事务B: 保持共享锁10秒")
            time.sleep(10)

            # 提交事务
            logger.info("事务B: 尝试提交")
            conn.commit()
            logger.info("事务B: 提交成功")

    except Exception as e:
        logger.error(f"事务B发生错误: {e}")
        raise


def main():
    # 创建数据库实例
    db = TestDB()

    # 创建事件用于同步两个事务
    event = threading.Event()

    # 创建并启动两个线程
    thread_a = threading.Thread(
        target=transaction_a, args=(db, event), name="TransactionA"
    )
    thread_b = threading.Thread(
        target=transaction_b, args=(db, event), name="TransactionB"
    )

    thread_a.start()
    thread_b.start()

    # 等待两个线程结束
    thread_a.join()
    thread_b.join()

    # 验证最终状态
    with db.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM test_table WHERE id = 1")
        result = cursor.fetchone()
        logger.info(f"最终数据库状态: {result}")


if __name__ == "__main__":
    main()

运行结果

2024-12-26 13:05:03,328 - TransactionA - 事务A开始
2024-12-26 13:05:03,328 - TransactionA - 事务A: 执行查询
2024-12-26 13:05:03,328 - TransactionA - 事务A: 查询结果 = (1, 'test_value')
2024-12-26 13:05:03,328 - TransactionB - 事务B开始
2024-12-26 13:05:03,329 - TransactionB - 事务B: 执行查询
2024-12-26 13:05:03,329 - TransactionB - 事务B: 查询结果 = (1, 'test_value')
2024-12-26 13:05:03,329 - TransactionB - 事务B: 保持共享锁10秒
2024-12-26 13:05:04,329 - TransactionA - 事务A: 执行更新
2024-12-26 13:05:04,330 - TransactionA - 事务A: 更新完成
2024-12-26 13:05:04,330 - TransactionA - 事务A: 再次查询
2024-12-26 13:05:04,330 - TransactionA - 事务A: 查询结果 = (1, 'updated_by_A')
2024-12-26 13:05:04,331 - TransactionA - 事务A: 尝试提交
2024-12-26 13:05:09,332 - TransactionA - 事务A发生错误: database is locked
Exception in thread TransactionA:
Traceback (most recent call last):
  File "/opt/vcpkg/installed/x64-linux/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/opt/vcpkg/installed/x64-linux/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/xyz/max-hmi-server/test.py", line 84, in transaction_a
    conn.commit()
sqlite3.OperationalError: database is locked
2024-12-26 13:05:13,338 - TransactionB - 事务B: 尝试提交
2024-12-26 13:05:13,339 - TransactionB - 事务B: 提交成功
2024-12-26 13:05:13,340 - MainThread - 最终数据库状态: (1, 'test_value')

运行结果显示:

  1. 事务 A 能够执行 UPDATE 操作而不被阻塞
  2. 但在尝试 COMMIT 时失败(database is locked)
  3. 最终数据库状态保持不变(test_value)

这验证了以下关键点:

  1. UPDATE 操作不会立即尝试获取 EXCLUSIVE 锁
  2. 更改首先在内存中记录
  3. 只有在 COMMIT 阶段才会尝试获取 EXCLUSIVE 锁
  4. 如果其他事务持有 SHARED 锁,COMMIT 将被阻塞直到超时

时序图

让我分析这个场景的锁升级过程:

sequenceDiagram
    participant A as 事务A
    participant B as 事务B

    A->>A: BEGIN
    A->>A: 查询操作(获取SHARED)
    B->>B: BEGIN
    B->>B: 查询操作(获取SHARED)
    
    A->>A: UPDATE(在内存中记录更改)
    Note right of A: UPDATE 操作不需要立即获取 EXCLUSIVE 锁
    
    A->>A: COMMIT(尝试获取EXCLUSIVE)
    A->>A: 等待其他SHARED锁释放...
    A->>A: 等待超时后抛出"database is locked"
    A->>A: ROLLBACK

    B->>B: 继续持有SHARED锁
    B->>B: COMMIT(成功)
                        

总结

我们可以得出以下结论:

  1. SQLite 的锁升级是延迟的,直到真正需要写入数据库文件时才发生
  2. 写操作的内存更改和实际的文件写入是分离的
  3. 事务的 COMMIT 阶段是最关键的锁竞争点

这种设计既保证了数据的一致性,又提供了较好的并发性能。

使用 GoReleaser 发布 Rust 二进制文件
使用 GoReleaser 发布 Rust 二进制文件
解决在 Windows 上 openssl-sys 构建失败的问题
解决 Rust 测试中的并行执行冲突:保护共享资源的策略
解决 Rust 测试中的并行执行冲突:保护共享资源的策略
Rust 中的跨平台开发:处理平台特定代码和未使用代码警告
Rust 中的跨平台开发:处理平台特定代码和未使用代码警告
ncdu:高效的磁盘使用分析工具
ncdu:高效的磁盘使用分析工具
Barrier - 在多台计算机之间共享键鼠及扩展屏幕
Barrier - 在多台计算机之间共享键鼠及扩展屏幕