引言
在使用 SQLite 数据库时,我们经常会遇到并发事务的问题。本文将通过一个具体的实验来深入探讨 SQLite 的锁机制,特别是关于读操作的锁获取策略。
SQLite 中的读操作锁策略
SQLite 中的读操作获取锁的策略会根据不同的上下文而变化:
1. 事务外的读操作
在自动提交(autocommit)模式下:
- 单条 SELECT 语句不会获取 SHARED 锁
- 读取完成后立即释放所有锁
2. 显式事务内的读操作
在显式事务中:
- BEGIN TRANSACTION 后的读操作会获取 SHARED 锁
- SHARED 锁会持续到事务结束
- 多个事务可以同时持有 SHARED 锁
3. 不同隔离级别的影响
- SERIALIZABLE(默认):读操作获取并保持 SHARED 锁
- READ UNCOMMITTED:读操作不获取锁,可能读到未提交数据
实验验证
让我们通过一个具体的实验来验证这些行为。实验代码模拟了两个并发事务:事务 A 进行读写操作,事务 B 进行读操作。
有一个事务 A,事务 A 开启之后,先进行了一些查询操作,然后进行了 update 操作,接着又进行了查询操作,但在 commit 之前,事务 B 开始进行了查询操作。
假设在事务 B 长期持有 shared 锁不释放,事务 A 也无法提交成功。
完整代码
import logging
import sqlite3
import threading
import time
from contextlib import contextmanager
# 配置日志
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(threadName)s - %(message)s"
)
logger = logging.getLogger(__name__)
class TestDB:
def __init__(self, db_path: str = "test.db"):
self.db_path = db_path
self.init_db()
def init_db(self):
"""初始化数据库和测试数据"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 创建测试表
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS test_table (
id INTEGER PRIMARY KEY,
value TEXT
)
"""
)
# 插入测试数据
cursor.execute("DELETE FROM test_table")
cursor.execute(
"INSERT INTO test_table (id, value) VALUES (1, 'test_value')"
)
conn.commit()
@contextmanager
def get_connection(self) -> sqlite3.Connection:
"""获取数据库连接的上下文管理器"""
conn = sqlite3.connect(self.db_path, timeout=1) # 设置较短的超时时间
conn.execute("PRAGMA busy_timeout = 5000")
try:
yield conn
finally:
conn.close()
def transaction_a(db: TestDB, event: threading.Event):
"""模拟事务A的操作"""
try:
with db.get_connection() as conn:
logger.info("事务A开始")
conn.execute("BEGIN") # 立即获取写锁
# 查询数据
logger.info("事务A: 执行查询")
cursor = conn.cursor()
cursor.execute("SELECT * FROM test_table WHERE id = 1")
result = cursor.fetchone()
logger.info(f"事务A: 查询结果 = {result}")
# 通知事务B开始
event.set()
# 故意等待一段时间, 等待 B 事务获取 shared 锁
time.sleep(1)
# 更新数据
logger.info("事务A: 执行更新")
cursor.execute(
"UPDATE test_table SET value = 'updated_by_A' WHERE id = 1"
)
logger.info("事务A: 更新完成")
# 再次查询
logger.info("事务A: 再次查询")
cursor.execute("SELECT * FROM test_table WHERE id = 1")
result = cursor.fetchone()
logger.info(f"事务A: 查询结果 = {result}")
# 提交事务
logger.info("事务A: 尝试提交")
conn.commit()
logger.info("事务A: 提交成功")
except Exception as e:
logger.error(f"事务A发生错误: {e}")
raise
def transaction_b(db: TestDB, event: threading.Event):
"""模拟事务B的操作"""
try:
# 等待事务A的信号
event.wait()
with db.get_connection() as conn:
logger.info("事务B开始")
conn.execute("BEGIN")
logger.info("事务B: 执行查询")
cursor = conn.cursor()
cursor.execute("SELECT * FROM test_table WHERE id = 1")
result = cursor.fetchone()
logger.info(f"事务B: 查询结果 = {result}")
# 持续保持共享锁一段时间
logger.info("事务B: 保持共享锁10秒")
time.sleep(10)
# 提交事务
logger.info("事务B: 尝试提交")
conn.commit()
logger.info("事务B: 提交成功")
except Exception as e:
logger.error(f"事务B发生错误: {e}")
raise
def main():
# 创建数据库实例
db = TestDB()
# 创建事件用于同步两个事务
event = threading.Event()
# 创建并启动两个线程
thread_a = threading.Thread(
target=transaction_a, args=(db, event), name="TransactionA"
)
thread_b = threading.Thread(
target=transaction_b, args=(db, event), name="TransactionB"
)
thread_a.start()
thread_b.start()
# 等待两个线程结束
thread_a.join()
thread_b.join()
# 验证最终状态
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM test_table WHERE id = 1")
result = cursor.fetchone()
logger.info(f"最终数据库状态: {result}")
if __name__ == "__main__":
main()
运行结果
2024-12-26 13:05:03,328 - TransactionA - 事务A开始
2024-12-26 13:05:03,328 - TransactionA - 事务A: 执行查询
2024-12-26 13:05:03,328 - TransactionA - 事务A: 查询结果 = (1, 'test_value')
2024-12-26 13:05:03,328 - TransactionB - 事务B开始
2024-12-26 13:05:03,329 - TransactionB - 事务B: 执行查询
2024-12-26 13:05:03,329 - TransactionB - 事务B: 查询结果 = (1, 'test_value')
2024-12-26 13:05:03,329 - TransactionB - 事务B: 保持共享锁10秒
2024-12-26 13:05:04,329 - TransactionA - 事务A: 执行更新
2024-12-26 13:05:04,330 - TransactionA - 事务A: 更新完成
2024-12-26 13:05:04,330 - TransactionA - 事务A: 再次查询
2024-12-26 13:05:04,330 - TransactionA - 事务A: 查询结果 = (1, 'updated_by_A')
2024-12-26 13:05:04,331 - TransactionA - 事务A: 尝试提交
2024-12-26 13:05:09,332 - TransactionA - 事务A发生错误: database is locked
Exception in thread TransactionA:
Traceback (most recent call last):
File "/opt/vcpkg/installed/x64-linux/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/opt/vcpkg/installed/x64-linux/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/home/xyz/max-hmi-server/test.py", line 84, in transaction_a
conn.commit()
sqlite3.OperationalError: database is locked
2024-12-26 13:05:13,338 - TransactionB - 事务B: 尝试提交
2024-12-26 13:05:13,339 - TransactionB - 事务B: 提交成功
2024-12-26 13:05:13,340 - MainThread - 最终数据库状态: (1, 'test_value')
运行结果显示:
- 事务 A 能够执行 UPDATE 操作而不被阻塞
- 但在尝试 COMMIT 时失败(database is locked)
- 最终数据库状态保持不变(test_value)
这验证了以下关键点:
- UPDATE 操作不会立即尝试获取 EXCLUSIVE 锁
- 更改首先在内存中记录
- 只有在 COMMIT 阶段才会尝试获取 EXCLUSIVE 锁
- 如果其他事务持有 SHARED 锁,COMMIT 将被阻塞直到超时
时序图
让我分析这个场景的锁升级过程:
sequenceDiagram
participant A as 事务A
participant B as 事务B
A->>A: BEGIN
A->>A: 查询操作(获取SHARED)
B->>B: BEGIN
B->>B: 查询操作(获取SHARED)
A->>A: UPDATE(在内存中记录更改)
Note right of A: UPDATE 操作不需要立即获取 EXCLUSIVE 锁
A->>A: COMMIT(尝试获取EXCLUSIVE)
A->>A: 等待其他SHARED锁释放...
A->>A: 等待超时后抛出"database is locked"
A->>A: ROLLBACK
B->>B: 继续持有SHARED锁
B->>B: COMMIT(成功)
总结
我们可以得出以下结论:
- SQLite 的锁升级是延迟的,直到真正需要写入数据库文件时才发生
- 写操作的内存更改和实际的文件写入是分离的
- 事务的 COMMIT 阶段是最关键的锁竞争点
这种设计既保证了数据的一致性,又提供了较好的并发性能。