From ac202927d4ea104124fb50ca0b9fdd588cf45111 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Thu, 25 May 2017 12:37:08 -0700 Subject: [PATCH] Add $wgMaxJobDBWriteDuration setting for avoiding replication lag This is similar to $wgMaxUserDBWriteDuration except for jobs. Also use the Config class in JobRunner instead of globals. Bug: T95501 Change-Id: I4949bb99c26451429c7acf82ecc4444bf9fb835f --- includes/DefaultSettings.php | 9 ++++++++ includes/jobqueue/JobRunner.php | 37 +++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index 19c585d14c..5b833477a6 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -8560,6 +8560,15 @@ $wgPopularPasswordFile = __DIR__ . '/../serialized/commonpasswords.cdb'; */ $wgMaxUserDBWriteDuration = false; +/* + * Max time (in seconds) a job-generated transaction can spend in writes. + * If exceeded, the transaction is rolled back with an error instead of being committed. + * + * @var int|bool Disabled if false + * @since 1.30 + */ +$wgMaxJobDBWriteDuration = false; + /** * Mapping of event channels (or channel categories) to EventRelayer configuration. * diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 6415533508..18cabc5df1 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -38,6 +38,8 @@ use Wikimedia\Rdbms\DBReplicationWaitError; * @since 1.24 */ class JobRunner implements LoggerAwareInterface { + /** @var Config */ + protected $config; /** @var callable|null Debug output handler */ protected $debug; @@ -74,6 +76,7 @@ class JobRunner implements LoggerAwareInterface { $logger = LoggerFactory::getInstance( 'runJobs' ); } $this->setLogger( $logger ); + $this->config = MediaWikiServices::getInstance()->getMainConfig(); } /** @@ -101,7 +104,8 @@ class JobRunner implements LoggerAwareInterface { * @return array Summary response that can easily be JSON serialized */ public function run( array $options ) { - global $wgJobClasses, $wgTrxProfilerLimits; + $jobClasses = $this->config->get( 'JobClasses' ); + $profilerLimits = $this->config->get( 'TrxProfilerLimits' ); $response = [ 'jobs' => [], 'reached' => 'none-ready' ]; @@ -111,7 +115,7 @@ class JobRunner implements LoggerAwareInterface { $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; // Bail if job type is invalid - if ( $type !== false && !isset( $wgJobClasses[$type] ) ) { + if ( $type !== false && !isset( $jobClasses[$type] ) ) { $response['reached'] = 'none-possible'; return $response; } @@ -136,7 +140,7 @@ class JobRunner implements LoggerAwareInterface { // Catch huge single updates that lead to replica DB lag $trxProfiler = Profiler::instance()->getTransactionProfiler(); $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) ); - $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ ); + $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ ); // Some jobs types should not run until a certain timestamp $backoffs = []; // map of (type => UNIX expiry) @@ -360,15 +364,13 @@ class JobRunner implements LoggerAwareInterface { * @see $wgJobBackoffThrottling */ private function getBackoffTimeToWait( Job $job ) { - global $wgJobBackoffThrottling; + $throttling = $this->config->get( 'JobBackoffThrottling' ); - if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) || - $job instanceof DuplicateJob // no work was done - ) { + if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) { return 0; // not throttled } - $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()]; + $itemsPerSecond = $throttling[$job->getType()]; if ( $itemsPerSecond <= 0 ) { return 0; // not throttled } @@ -516,17 +518,17 @@ class JobRunner implements LoggerAwareInterface { * @throws DBError */ private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) { - global $wgJobSerialCommitThreshold; + $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' ); $time = false; $lb = $lbFactory->getMainLB( wfWikiID() ); - if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) { + if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) { // Generally, there is one master connection to the local DB $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); // We need natively blocking fast locks if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) { $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY ); - if ( $time < $wgJobSerialCommitThreshold ) { + if ( $time < $syncThreshold ) { $dbwSerial = false; } } else { @@ -538,7 +540,12 @@ class JobRunner implements LoggerAwareInterface { } if ( !$dbwSerial ) { - $lbFactory->commitMasterChanges( $fnameTrxOwner ); + $lbFactory->commitMasterChanges( + $fnameTrxOwner, + // Abort if any transaction was too big + [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ] + ); + return; } @@ -563,7 +570,11 @@ class JobRunner implements LoggerAwareInterface { } // Actually commit the DB master changes - $lbFactory->commitMasterChanges( $fnameTrxOwner ); + $lbFactory->commitMasterChanges( + $fnameTrxOwner, + // Abort if any transaction was too big + [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ] + ); ScopedCallback::consume( $unlocker ); } } -- 2.20.1