From f3cfdf0baa036a4b3375857cc74344ec09f6ac6f Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Mon, 22 Aug 2016 22:23:58 -0700 Subject: [PATCH] Remove commit() calls from JobQueueDB These are not safe for the common case where the local DB handle is used for the queue (and other table writes). Change-Id: Ic24a05c18bf31e49bf7e9a3c058deb5d35271511 --- includes/jobqueue/JobQueueDB.php | 2 -- includes/jobqueue/JobRunner.php | 11 ++++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index b9f4592ffc..3a1da13645 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -261,7 +261,6 @@ class JobQueueDB extends JobQueue { protected function doPop() { $dbw = $this->getMasterDB(); try { - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting $dbw->clearFlag( DBO_TRX ); // make each query its own transaction $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { @@ -457,7 +456,6 @@ class JobQueueDB extends JobQueue { $dbw = $this->getMasterDB(); try { - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting $dbw->clearFlag( DBO_TRX ); // make each query its own transaction $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index ad0abf247d..5f48dca3da 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -21,6 +21,7 @@ * @ingroup JobQueue */ +use MediaWiki\MediaWikiServices; use MediaWiki\Logger\LoggerFactory; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerInterface; @@ -122,7 +123,8 @@ class JobRunner implements LoggerAwareInterface { } // Flush any pending DB writes for sanity - wfGetLBFactory()->commitAll( __METHOD__ ); + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbFactory->commitAll( __METHOD__ ); // Catch huge single updates that lead to slave lag $trxProfiler = Profiler::instance()->getTransactionProfiler(); @@ -176,9 +178,11 @@ class JobRunner implements LoggerAwareInterface { $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); } + $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes $info = $this->executeJob( $job, $stats, $popTime ); if ( $info['status'] !== false || !$job->allowRetries() ) { $group->ack( $job ); // succeeded or job cannot be retried + $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes } // Back off of certain jobs for a while (for throttling and for errors) @@ -212,7 +216,7 @@ class JobRunner implements LoggerAwareInterface { $timePassed = microtime( true ) - $lastCheckTime; if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) { try { - wfGetLBFactory()->waitForReplication( [ + $lbFactory->waitForReplication( [ 'ifWritesSince' => $lastCheckTime, 'timeout' => self::MAX_ALLOWED_LAG ] ); @@ -257,6 +261,7 @@ class JobRunner implements LoggerAwareInterface { $msg = $job->toString() . " STARTING"; $this->logger->debug( $msg ); $this->debugCallback( $msg ); + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); // Run the job... $rssStart = $this->getMaxRssKb(); @@ -284,7 +289,7 @@ class JobRunner implements LoggerAwareInterface { // Commit all outstanding connections that are in a transaction // to get a fresh repeatable read snapshot on every connection. // Note that jobs are still responsible for handling slave lag. - wfGetLBFactory()->commitAll( __METHOD__ ); + $lbFactory->commitAll( __METHOD__ ); // Clear out title cache data from prior snapshots LinkCache::singleton()->clear(); $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); -- 2.20.1