X-Git-Url: https://git.cyclocoop.org/%28%28?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobRunner.php;h=5f48dca3da913845240eb4b5375ac044efd785e5;hb=1b8eac21abd41d0d9ddae8a4358e2aa933b0d959;hp=1350958b7859477eb1950f853d5f7fa7a2b94f8f;hpb=45693871ad1d22c2248f32be82adde59d40ae3a5;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 1350958b78..5f48dca3da 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -21,6 +21,7 @@ * @ingroup JobQueue */ +use MediaWiki\MediaWikiServices; use MediaWiki\Logger\LoggerFactory; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerInterface; @@ -122,7 +123,8 @@ class JobRunner implements LoggerAwareInterface { } // Flush any pending DB writes for sanity - wfGetLBFactory()->commitAll( __METHOD__ ); + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbFactory->commitAll( __METHOD__ ); // Catch huge single updates that lead to slave lag $trxProfiler = Profiler::instance()->getTransactionProfiler(); @@ -176,9 +178,11 @@ class JobRunner implements LoggerAwareInterface { $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); } + $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes $info = $this->executeJob( $job, $stats, $popTime ); if ( $info['status'] !== false || !$job->allowRetries() ) { $group->ack( $job ); // succeeded or job cannot be retried + $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes } // Back off of certain jobs for a while (for throttling and for errors) @@ -212,7 +216,7 @@ class JobRunner implements LoggerAwareInterface { $timePassed = microtime( true ) - $lastCheckTime; if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) { try { - wfGetLBFactory()->waitForReplication( [ + $lbFactory->waitForReplication( [ 'ifWritesSince' => $lastCheckTime, 'timeout' => self::MAX_ALLOWED_LAG ] ); @@ -257,6 +261,7 @@ class JobRunner implements LoggerAwareInterface { $msg = $job->toString() . " STARTING"; $this->logger->debug( $msg ); $this->debugCallback( $msg ); + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); // Run the job... $rssStart = $this->getMaxRssKb(); @@ -284,7 +289,7 @@ class JobRunner implements LoggerAwareInterface { // Commit all outstanding connections that are in a transaction // to get a fresh repeatable read snapshot on every connection. // Note that jobs are still responsible for handling slave lag. - wfGetLBFactory()->commitAll( __METHOD__ ); + $lbFactory->commitAll( __METHOD__ ); // Clear out title cache data from prior snapshots LinkCache::singleton()->clear(); $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); @@ -307,7 +312,7 @@ class JobRunner implements LoggerAwareInterface { $stats->timing( "jobqueue.run.$jType", $timeMs ); // Track RSS increases for jobs (in case of memory leaks) if ( $rssStart && $rssEnd ) { - $stats->increment( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart ); + $stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart ); } if ( $status === false ) { @@ -522,23 +527,12 @@ class JobRunner implements LoggerAwareInterface { // This will trigger a rollback in the main loop throw new DBError( $dbwSerial, "Timed out waiting on commit queue." ); } - // Wait for the generic slave to catch up + // Wait for the slave DBs to catch up $pos = $lb->getMasterPos(); if ( $pos ) { - $lb->waitForOne( $pos ); + $lb->waitForAll( $pos ); } - $fname = __METHOD__; - // Re-ping all masters with transactions. This throws DBError if some - // connection died while waiting on locks/slaves, triggering a rollback. - wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) use ( $fname ) { - $lb->forEachOpenConnection( function( IDatabase $conn ) use ( $fname ) { - if ( $conn->writesOrCallbacksPending() ) { - $conn->query( "SELECT 1", $fname ); - } - } ); - } ); - // Actually commit the DB master changes wfGetLBFactory()->commitMasterChanges( __METHOD__ );