From 7c821caef52897e7513a3ccb0a1d89b5b5cbb746 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Tue, 21 Apr 2015 23:13:31 -0700 Subject: [PATCH] Added $wgJobSerialCommitThreshold setting * This is used to avoid lag by certain jobs Bug: T95501 Change-Id: Id707c9a840fa23d56407e03aaae4e25149a1f906 --- includes/DefaultSettings.php | 15 +++++++ includes/db/Database.php | 19 ++++++-- includes/db/DatabaseMysqlBase.php | 4 ++ includes/db/LoadBalancer.php | 3 +- includes/jobqueue/JobRunner.php | 75 +++++++++++++++++++++++++++---- 5 files changed, 103 insertions(+), 13 deletions(-) diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index 3cfeb8c2fe..e323ec60f3 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -6451,6 +6451,21 @@ $wgJobTypesExcludedFromDefaultQueue = array( 'AssembleUploadChunks', 'PublishSta */ $wgJobBackoffThrottling = array(); +/** + * Make job runners commit changes for slave-lag prone jobs one job at a time. + * This is useful if there are many job workers that race on slave lag checks. + * If set, jobs taking this many seconds of DB write time have serialized commits. + * + * Note that affected jobs may have worse lock contention. Also, if they affect + * several DBs at once they may have a smaller chance of being atomic due to the + * possibility of connection loss while queueing up to commit. Affected jobs may + * also fail due to the commit lock acquisition timeout. + * + * @var float|bool + * @since 1.26 + */ +$wgJobSerialCommitThreshold = false; + /** * Map of job types to configuration arrays. * This determines which queue class and storage system is used for each job type. diff --git a/includes/db/Database.php b/includes/db/Database.php index 8c1ebf91cc..2c1ebeaa03 100644 --- a/includes/db/Database.php +++ b/includes/db/Database.php @@ -4235,7 +4235,7 @@ abstract class DatabaseBase implements IDatabase { } /** - * Check to see if a named lock is available. This is non-blocking. + * Check to see if a named lock is available (non-blocking) * * @param string $lockName Name of lock to poll * @param string $method Name of method calling us @@ -4249,8 +4249,7 @@ abstract class DatabaseBase implements IDatabase { /** * Acquire a named lock * - * Abstracted from Filestore::lock() so child classes can implement for - * their own needs. + * Named locks are not related to transactions * * @param string $lockName Name of lock to aquire * @param string $method Name of method calling us @@ -4262,7 +4261,9 @@ abstract class DatabaseBase implements IDatabase { } /** - * Release a lock. + * Release a lock + * + * Named locks are not related to transactions * * @param string $lockName Name of lock to release * @param string $method Name of method calling us @@ -4275,6 +4276,16 @@ abstract class DatabaseBase implements IDatabase { return true; } + /** + * Check to see if a named lock used by lock() use blocking queues + * + * @return bool + * @since 1.26 + */ + public function namedLocksEnqueue() { + return false; + } + /** * Lock specific tables * diff --git a/includes/db/DatabaseMysqlBase.php b/includes/db/DatabaseMysqlBase.php index aac95a8cca..64917ccfd8 100644 --- a/includes/db/DatabaseMysqlBase.php +++ b/includes/db/DatabaseMysqlBase.php @@ -873,6 +873,10 @@ abstract class DatabaseMysqlBase extends DatabaseBase { return ( $row->lockstatus == 1 ); } + public function namedLocksEnqueue() { + return true; + } + /** * @param array $read * @param array $write diff --git a/includes/db/LoadBalancer.php b/includes/db/LoadBalancer.php index 624f46bc3b..be67d75763 100644 --- a/includes/db/LoadBalancer.php +++ b/includes/db/LoadBalancer.php @@ -845,8 +845,9 @@ class LoadBalancer { /** * @return int + * @since 1.26 */ - private function getWriterIndex() { + public function getWriterIndex() { return 0; } diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 94254239c1..2311ea2a07 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -35,6 +35,11 @@ class JobRunner implements LoggerAwareInterface { /** @var callable|null Debug output handler */ protected $debug; + /** + * @var LoggerInterface $logger + */ + protected $logger; + /** * @param callable $debug Optional debug output handler */ @@ -42,13 +47,9 @@ class JobRunner implements LoggerAwareInterface { $this->debug = $debug; } - /** - * @var LoggerInterface $logger - */ - protected $logger; - /** * @param LoggerInterface $logger + * @return void */ public function setLogger( LoggerInterface $logger ) { $this->logger = $logger; @@ -183,7 +184,7 @@ class JobRunner implements LoggerAwareInterface { ++$jobsRun; $status = $job->run(); $error = $job->getLastError(); - wfGetLBFactory()->commitMasterChanges(); + $this->commitMasterChanges( $job ); } catch ( Exception $e ) { MWExceptionHandler::rollbackMasterChangesAndLog( $e ); $status = false; @@ -304,7 +305,6 @@ class JobRunner implements LoggerAwareInterface { * @return array Map of (job type => backoff expiry timestamp) */ private function loadBackoffs( array $backoffs, $mode = 'wait' ) { - $file = wfTempDir() . '/mw-runJobs-backoffs.json'; if ( is_file( $file ) ) { $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; @@ -342,7 +342,6 @@ class JobRunner implements LoggerAwareInterface { * @return array The new backoffs account for $backoffs and the latest file data */ private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { - if ( !$deltas ) { return $this->loadBackoffs( $backoffs, $mode ); } @@ -409,4 +408,64 @@ class JobRunner implements LoggerAwareInterface { call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) ); } } + + /** + * Commit any DB master changes from a job on all load balancers + * + * @param Job $job + * @throws DBError + */ + private function commitMasterChanges( Job $job ) { + global $wgJobSerialCommitThreshold; + + $lb = wfGetLB( wfWikiID() ); + if ( $wgJobSerialCommitThreshold !== false ) { + // Generally, there is one master connection to the local DB + $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); + } else { + $dbwSerial = false; + } + + if ( !$dbwSerial + || !$dbwSerial->namedLocksEnqueue() + || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold + ) { + // Writes are all to foreign DBs, named locks don't form queues, + // or $wgJobSerialCommitThreshold is not reached; commit changes now + wfGetLBFactory()->commitMasterChanges(); + return; + } + + $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() ); + $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]"; + $this->logger->info( $msg ); + $this->debugCallback( $msg ); + + // Wait for an exclusive lock to commit + if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) { + // This will trigger a rollback in the main loop + throw new DBError( $dbwSerial, "Timed out waiting on commit queue." ); + } + // Wait for the generic slave to catch up + $pos = $lb->getMasterPos(); + if ( $pos ) { + $lb->waitForOne( $pos ); + } + + // Re-ping all masters with transactions. This throws DBError if some + // connection died while waiting on locks/slaves, triggering a rollback. + wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) { + $lb->forEachOpenConnection( function( DatabaseBase $conn ) { + if ( $conn->writesOrCallbacksPending() ) { + $conn->query( "SELECT 1" ); + } + } ); + } ); + + // Actually commit the DB master changes + wfGetLBFactory()->commitMasterChanges(); + + // Release the lock + $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ ); + } } -- 2.20.1