From 4122e53845a3c27ed72734d3fb60ffbe27f543b0 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 29 Mar 2019 19:02:15 -0700 Subject: [PATCH] jobqueue: fix DBO_TRX logic in JobQueueDB for avoiding transactions Various methods were missing the flag setting logic and tests could fail or have "outer scope" warnings in the logs for sqlite. Change-Id: Ia0607d189a307667297f06109a34363c92e37d92 --- includes/jobqueue/JobQueueDB.php | 64 +++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index 495be73981..74a6559ee5 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -83,6 +83,8 @@ class JobQueueDB extends JobQueue { */ protected function doIsEmpty() { $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { $found = $dbr->selectField( // unclaimed job 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__ @@ -106,8 +108,10 @@ class JobQueueDB extends JobQueue { return $size; } + $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { - $dbr = $this->getReplicaDB(); $size = (int)$dbr->selectField( 'job', 'COUNT(*)', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__ @@ -137,6 +141,8 @@ class JobQueueDB extends JobQueue { } $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { $count = (int)$dbr->selectField( 'job', 'COUNT(*)', [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ], @@ -168,6 +174,8 @@ class JobQueueDB extends JobQueue { } $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { $count = (int)$dbr->selectField( 'job', 'COUNT(*)', [ @@ -195,6 +203,8 @@ class JobQueueDB extends JobQueue { */ protected function doBatchPush( array $jobs, $flags ) { $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); // In general, there will be two cases here: // a) sqlite; DB connection is probably a regular round-aware handle. // If the connection is busy with a transaction, then defer the job writes @@ -283,15 +293,12 @@ class JobQueueDB extends JobQueue { */ protected function doPop() { $dbw = $this->getMasterDB(); - try { - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); + $job = false; // job popped off + try { $uuid = wfRandomString( 32 ); // pop attempt - $job = false; // job popped off do { // retry when our row is invalid or deleted as a duplicate // Try to reserve a row in the DB... if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) { @@ -337,6 +344,8 @@ class JobQueueDB extends JobQueue { */ protected function claimRandom( $uuid, $rand, $gte ) { $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); // Check cache to see if the queue has <= OFFSET items $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); @@ -414,6 +423,8 @@ class JobQueueDB extends JobQueue { */ protected function claimOldest( $uuid ) { $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); $row = false; // the row acquired do { @@ -478,13 +489,9 @@ class JobQueueDB extends JobQueue { } $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); try { - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); - // Delete a row with a single DELETE without holding row locks over RTTs... $dbw->delete( 'job', [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ ); @@ -515,6 +522,9 @@ class JobQueueDB extends JobQueue { // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); + $cache = $this->dupCache; $dbw->onTransactionCommitOrIdle( function () use ( $cache, $params, $key ) { @@ -538,6 +548,8 @@ class JobQueueDB extends JobQueue { */ protected function doDelete() { $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); try { $dbw->delete( 'job', [ 'job_cmd' => $this->type ] ); } catch ( DBError $e ) { @@ -594,6 +606,8 @@ class JobQueueDB extends JobQueue { */ protected function getJobIterator( array $conds ) { $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); try { return new MappedIterator( $dbr->select( 'job', self::selectFields(), $conds ), @@ -626,6 +640,8 @@ class JobQueueDB extends JobQueue { protected function doGetSiblingQueuesWithJobs( array $types ) { $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); // @note: this does not check whether the jobs are claimed or not. // This is useful so JobQueueGroup::pop() also sees queues that only // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue @@ -643,6 +659,9 @@ class JobQueueDB extends JobQueue { protected function doGetSiblingQueueSizes( array $types ) { $dbr = $this->getReplicaDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbr ); + $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ], [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] ); @@ -663,6 +682,8 @@ class JobQueueDB extends JobQueue { $now = time(); $count = 0; // affected rows $dbw = $this->getMasterDB(); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scope = $this->getScopedNoTrxFlag( $dbw ); try { if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { @@ -821,6 +842,21 @@ class JobQueueDB extends JobQueue { } } + /** + * @param IDatabase $db + * @return ScopedCallback + */ + private function getScopedNoTrxFlag( IDatabase $db ) { + $autoTrx = $db->getFlag( DBO_TRX ); // get current setting + $db->clearFlag( DBO_TRX ); // make each query its own transaction + + return new ScopedCallback( function () use ( $db, $autoTrx ) { + if ( $autoTrx ) { + $db->setFlag( DBO_TRX ); // restore old setting + } + } ); + } + /** * @param string $property * @return string -- 2.20.1