使用 sqlite3_unlock_notify() API
/* This example uses the pthreads API */ #include <pthread.h> /* ** A pointer to an instance of this structure is passed as the user-context ** pointer when registering for an unlock-notify callback. */ typedef struct UnlockNotification UnlockNotification; struct UnlockNotification { int fired; /* True after unlock event has occurred */ pthread_cond_t cond; /* Condition variable to wait on */ pthread_mutex_t mutex; /* Mutex to protect structure */ }; /* ** This function is an unlock-notify callback registered with SQLite. */ static void unlock_notify_cb(void **apArg, int nArg){ int i; for(i=0; i<nArg; i++){ UnlockNotification *p = (UnlockNotification *)apArg[i]; pthread_mutex_lock(&p->mutex); p->fired = 1; pthread_cond_signal(&p->cond); pthread_mutex_unlock(&p->mutex); } } /* ** This function assumes that an SQLite API call (either sqlite3_prepare_v2() ** or sqlite3_step()) has just returned SQLITE_LOCKED. The argument is the ** associated database connection. ** ** This function calls sqlite3_unlock_notify() to register for an ** unlock-notify callback, then blocks until that callback is delivered ** and returns SQLITE_OK. The caller should then retry the failed operation. ** ** Or, if sqlite3_unlock_notify() indicates that to block would deadlock ** the system, then this function returns SQLITE_LOCKED immediately. In ** this case the caller should not retry the operation and should roll ** back the current transaction (if any). */ static int wait_for_unlock_notify(sqlite3 *db){ int rc; UnlockNotification un; /* Initialize the UnlockNotification structure. */ un.fired = 0; pthread_mutex_init(&un.mutex, 0); pthread_cond_init(&un.cond, 0); /* Register for an unlock-notify callback. */ rc = sqlite3_unlock_notify(db, unlock_notify_cb, (void *)&un); assert( rc==SQLITE_LOCKED || rc==SQLITE_OK ); /* The call to sqlite3_unlock_notify() always returns either SQLITE_LOCKED ** or SQLITE_OK. ** ** If SQLITE_LOCKED was returned, then the system is deadlocked. In this ** case this function needs to return SQLITE_LOCKED to the caller so ** that the current transaction can be rolled back. Otherwise, block ** until the unlock-notify callback is invoked, then return SQLITE_OK. */ if( rc==SQLITE_OK ){ pthread_mutex_lock(&un.mutex); if( !un.fired ){ pthread_cond_wait(&un.cond, &un.mutex); } pthread_mutex_unlock(&un.mutex); } /* Destroy the mutex and condition variables. */ pthread_cond_destroy(&un.cond); pthread_mutex_destroy(&un.mutex); return rc; } /* ** This function is a wrapper around the SQLite function sqlite3_step(). ** It functions in the same way as step(), except that if a required ** shared-cache lock cannot be obtained, this function may block waiting for ** the lock to become available. In this scenario the normal API step() ** function always returns SQLITE_LOCKED. ** ** If this function returns SQLITE_LOCKED, the caller should rollback ** the current transaction (if any) and try again later. Otherwise, the ** system may become deadlocked. */ int sqlite3_blocking_step(sqlite3_stmt *pStmt){ int rc; while( SQLITE_LOCKED==(rc = sqlite3_step(pStmt)) ){ rc = wait_for_unlock_notify(sqlite3_db_handle(pStmt)); if( rc!=SQLITE_OK ) break; sqlite3_reset(pStmt); } return rc; } /* ** This function is a wrapper around the SQLite function sqlite3_prepare_v2(). ** It functions in the same way as prepare_v2(), except that if a required ** shared-cache lock cannot be obtained, this function may block waiting for ** the lock to become available. In this scenario the normal API prepare_v2() ** function always returns SQLITE_LOCKED. ** ** If this function returns SQLITE_LOCKED, the caller should rollback ** the current transaction (if any) and try again later. Otherwise, the ** system may become deadlocked. */ int sqlite3_blocking_prepare_v2( sqlite3 *db, /* Database handle. */ const char *zSql, /* UTF-8 encoded SQL statement. */ int nSql, /* Length of zSql in bytes. */ sqlite3_stmt **ppStmt, /* OUT: A pointer to the prepared statement */ const char **pz /* OUT: End of parsed string */ ){ int rc; while( SQLITE_LOCKED==(rc = sqlite3_prepare_v2(db, zSql, nSql, ppStmt, pz)) ){ rc = wait_for_unlock_notify(db); if( rc!=SQLITE_OK ) break; } return rc; }
当两个或多个连接以共享缓存模式访问同一个数据库时,将使用对各个表的读写(共享和排他)锁来确保并发执行的事务保持隔离。在写入表之前,必须在该表上获得一个写(独占)锁。在读取之前,必须获得读取(共享)锁。连接在结束其事务时释放所有持有的表锁。如果连接无法获得所需的锁,则对sqlite3_step()的调用将返回 SQLITE_LOCKED。
虽然不太常见,但如果无法获得每个附加数据库的sqlite_schema 表上的读锁,对sqlite3_prepare()或 sqlite3_prepare_v2()的调用也可能返回 SQLITE_LOCKED。这些 API 需要读取 sqlite_schema 表中包含的模式数据,以便将 SQL 语句编译为sqlite3_stmt*对象。
本文介绍了一种使用 SQLite sqlite3_unlock_notify() 接口的技术,这样对sqlite3_step()和sqlite3_prepare_v2()的调用就会 阻塞,直到所需的锁可用,而不是立即返回 SQLITE_LOCKED。如果出现在左侧的 sqlite3_blocking_step() 或 sqlite3_blocking_prepare_v2() 函数返回 SQLITE_LOCKED,这表明阻塞会导致系统死锁。
sqlite3_unlock_notify() API 仅在使用定义的预处理器符号 SQLITE_ENABLE_UNLOCK_NOTIFY 编译库时可用 ,记录在此处。本文不能替代阅读完整的 API 文档!
sqlite3_unlock_notify()接口设计用于为每个数据库连接分配单独线程 的系统。实现中没有任何东西可以阻止单个线程运行多个数据库连接。然而,sqlite3_unlock_notify() 接口一次只能在一个连接上工作,所以这里介绍的锁解析逻辑将只在每个线程的一个数据库连接上工作。
sqlite3_unlock_notify() API
在调用sqlite3_step()或sqlite3_prepare_v2()返回 SQLITE_LOCKED 后,可以调用sqlite3_unlock_notify() API 来注册解锁通知回调。在持有阻止调用sqlite3_step()或sqlite3_prepare_v2()成功的表锁的数据库连接完成其事务并释放所有锁之后,SQLite 调用解锁通知回调。例如,如果对 sqlite3_step() 的调用是尝试从表 X 读取,而其他某个连接 Y 持有表 X 的写锁,则 sqlite3_step() 将返回 SQLITE_LOCKED。如果sqlite3_unlock_notify()然后调用,解锁通知回调将在连接 Y 的事务结束后调用。解锁通知回调正在等待的连接,在本例中为连接 Y,被称为“阻塞连接”。
如果对尝试写入数据库表的 sqlite3_step() 的调用返回 SQLITE_LOCKED,则可能有多个其他连接在相关数据库表上持有读锁。在这种情况下,SQLite 只是任意选择其他连接之一,并在该连接的事务完成时发出解锁通知回调。无论对 sqlite3_step() 的调用是否被一个或多个连接阻塞,当发出相应的解锁通知回调时,不能保证所需的锁可用,只能保证它可能可用。
发出解锁通知回调时,它是从调用与阻塞连接关联的 sqlite3_step()(或 sqlite3_close())中发出的。从解锁通知回调中调用任何 sqlite3_XXX() API 函数都是非法的。预期的用途是解锁通知回调将向其他等待线程发出信号或安排稍后发生的某些操作。
sqlite3_blocking_step()函数使用的算法如下:
在提供的语句句柄上调用 sqlite3_step()。如果调用返回 SQLITE_LOCKED 以外的任何内容,则将此值返回给调用者。否则,继续。
在与提供的语句句柄关联的数据库连接句柄上调用sqlite3_unlock_notify()以注册解锁通知回调。如果对 unlock_notify() 的调用返回 SQLITE_LOCKED,则将此值返回给调用者。
阻塞直到解锁通知回调被另一个线程调用。
在语句句柄上调用 sqlite3_reset()。由于 SQLITE_LOCKED 错误可能只发生在第一次调用 sqlite3_step() 时(不可能一次调用 sqlite3_step() 返回 SQLITE_ROW 然后再返回下一个 SQLITE_LOCKED),语句句柄可以在此时重置而不影响结果从调用者的角度来看查询。如果此时未调用 sqlite3_reset(),下一次调用 sqlite3_step() 将返回 SQLITE_MISUSE。
返回步骤 1。
sqlite3_blocking_prepare_v2() 函数使用的算法类似,只是省略了第 4 步(重置语句句柄)。
作家饥饿
多个连接可能同时持有读锁。如果许多线程正在获取重叠的读锁,则可能是至少一个线程始终持有一个读锁。然后等待写锁的表将永远等待。这种情况称为“作家饥饿”。
SQLite 帮助应用程序避免编写器饥饿。在尝试获取表上的写锁失败后(因为一个或多个其他连接持有读锁),在共享缓存上打开新事务的所有尝试都会失败,直到以下情况之一为真:
- 当前作者结束其交易,或者
- 共享高速缓存上打开的读取事务数降至零。
尝试打开新的读取事务失败会将 SQLITE_LOCKED 返回给调用者。如果调用者随后调用sqlite3_unlock_notify()来注册解锁通知回调,则阻塞连接是当前在共享缓存上具有打开的写事务的连接。这可以防止写者饥饿,因为如果没有新的读事务可以打开并且假设所有现有的读事务最终结束,写者最终将有机会获得所需的写锁。
pthread API
当sqlite3_unlock_notify()被 wait_for_unlock_notify() 调用时,阻止 sqlite3_step() 或 sqlite3_prepare_v2() 调用成功的阻塞连接可能已经完成了它的事务。在这种情况下,在 sqlite3_unlock_notify()返回之前立即调用解锁通知回调。或者,在调用sqlite3_unlock_notify()之后但在线程开始等待异步信号之前, 第二个线程可能会调用解锁通知回调 。
究竟如何处理这种潜在的竞争条件取决于应用程序使用的线程和同步原语接口。此示例使用 pthreads,现代类 UNIX 系统(包括 Linux)提供的接口。
pthreads 接口提供了 pthread_cond_wait() 函数。此函数允许调用者同时释放互斥量并开始等待异步信号。使用这个函数、一个“触发”标志和一个互斥锁,可以消除上述竞争条件,如下所示:
当调用 unlock-notify 回调时,可能在调用sqlite3_unlock_notify()的线程开始等待异步信号之前,它会执行以下操作:
- 获得互斥量。
- 将“解雇”标志设置为真。
- 尝试向等待线程发出信号。
- 释放互斥体。
当 wait_for_unlock_notify() 线程准备好开始等待解锁通知回调到达时,它:
- 获得互斥量。
- 检查是否设置了“已解雇”标志。如果是,则解锁通知回调已被调用。释放互斥量并继续。
- 以原子方式释放互斥量并开始等待异步信号。当信号到达时,继续。
这样,当 wait_for_unlock_notify() 线程开始阻塞时,解锁通知回调是否已经被调用或正在被调用并不重要。
可能的改进
本文中的代码至少可以通过两种方式进行改进:
- 它可以管理线程优先级。
- 它可以处理在删除表或索引时可能发生的 SQLITE_LOCKED 的特殊情况。
即使sqlite3_unlock_notify()函数只允许调用者指定单个用户上下文指针,解锁通知回调也会传递一个此类上下文指针的数组。这是因为如果当一个阻塞连接结束它的事务时,如果注册了多个解锁通知来调用同一个 C 函数,上下文指针将被编组到一个数组中并发出一个回调。如果每个线程都被分配了一个优先级,那么不是像这个实现那样只是以任意顺序向线程发出信号,而是可以在较低优先级线程之前向较高优先级线程发出信号。
如果执行了“DROP TABLE”或“DROP INDEX”SQL 命令,并且同一数据库连接当前有一个或多个正在执行的 SELECT 语句,则返回 SQLITE_LOCKED。如果 在这种情况下调用了sqlite3_unlock_notify(),那么将立即调用指定的回调。重新尝试“DROP TABLE”或“DROP INDEX”语句将返回另一个 SQLITE_LOCKED 错误。在左侧显示的 sqlite3_blocking_step() 的实现中,这可能会导致无限循环。
调用者可以通过使用扩展错误代码 来区分这种特殊的“DROP TABLE|INDEX”情况和其他情况。适合调用sqlite3_unlock_notify()时,扩展错误码为SQLITE_LOCKED_SHAREDCACHE。否则,在“DROP TABLE|INDEX”的情况下,它只是简单的 SQLITE_LOCKED。另一种解决方案可能是限制可以重新尝试任何单个查询的次数(比如 100)。虽然这可能不如人们希望的那样有效,但这种情况不太可能经常发生。