From 6c73b32fd5b2a0c17d0fb32935804b6457ccd992 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Tue, 6 Sep 2016 15:25:53 -0700 Subject: [PATCH] Convert JobRunner to using beginMasterChanges() This lets the runJobs.php $wgCommandLineMode hack be removed. Some fixes based on unit tests: * Only call applyTransactionRoundFlags() for master connections for transaction rounds from beginMasterChanges(). * Also cleaned up the commitAndWaitForReplication() reset logic. * Removed deprecated DataUpdate::doUpdate() calls from jobs since they cannot nest in a transaction round. Change-Id: Ia9b91f539dc11a5c05bdac4bcd99d6615c4dc48d --- includes/db/loadbalancer/LBFactory.php | 5 +++-- includes/db/loadbalancer/LoadBalancer.php | 6 +++--- includes/jobqueue/JobRunner.php | 20 +++++++++++--------- includes/jobqueue/jobs/DeleteLinksJob.php | 2 +- includes/jobqueue/jobs/RefreshLinksJob.php | 4 +++- maintenance/runJobs.php | 8 -------- 6 files changed, 21 insertions(+), 24 deletions(-) diff --git a/includes/db/loadbalancer/LBFactory.php b/includes/db/loadbalancer/LBFactory.php index 2d91bb7484..aba3fb8ed5 100644 --- a/includes/db/loadbalancer/LBFactory.php +++ b/includes/db/loadbalancer/LBFactory.php @@ -537,12 +537,13 @@ abstract class LBFactory implements DestructibleService { return; } - $fnameEffective = $fname; // The transaction owner and any caller with the empty transaction ticket can commit // so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError. if ( $this->trxRoundId !== false && $fname !== $this->trxRoundId ) { $this->trxLogger->info( "$fname: committing on behalf of {$this->trxRoundId}." ); $fnameEffective = $this->trxRoundId; + } else { + $fnameEffective = $fname; } $this->commitMasterChanges( $fnameEffective ); @@ -550,7 +551,7 @@ abstract class LBFactory implements DestructibleService { // If a nested caller committed on behalf of $fname, start another empty $fname // transaction, leaving the caller with the same empty transaction state as before. if ( $fnameEffective !== $fname ) { - $this->beginMasterChanges( $fname ); + $this->beginMasterChanges( $fnameEffective ); } } diff --git a/includes/db/loadbalancer/LoadBalancer.php b/includes/db/loadbalancer/LoadBalancer.php index 71286a976f..cb22c361d2 100644 --- a/includes/db/loadbalancer/LoadBalancer.php +++ b/includes/db/loadbalancer/LoadBalancer.php @@ -864,11 +864,11 @@ class LoadBalancer { $this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() ) ); $db->setTransactionProfiler( $this->trxProfiler ); - if ( $this->trxRoundId !== false ) { - $this->applyTransactionRoundFlags( $db ); - } if ( $server['serverIndex'] === $this->getWriterIndex() ) { + if ( $this->trxRoundId !== false ) { + $this->applyTransactionRoundFlags( $db ); + } foreach ( $this->trxRecurringCallbacks as $name => $callback ) { $db->setTransactionListener( $name, $callback ); } diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index dfaf972527..7a6b5fc138 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -181,7 +181,7 @@ class JobRunner implements LoggerAwareInterface { $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); } - $info = $this->executeJob( $job, $stats, $popTime ); + $info = $this->executeJob( $job, $lbFactory, $stats, $popTime ); if ( $info['status'] !== false || !$job->allowRetries() ) { $group->ack( $job ); // succeeded or job cannot be retried $lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes @@ -254,27 +254,28 @@ class JobRunner implements LoggerAwareInterface { /** * @param Job $job + * @param LBFactory $lbFactory * @param StatsdDataFactory $stats * @param float $popTime * @return array Map of status/error/timeMs */ - private function executeJob( Job $job, $stats, $popTime ) { + private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) { $jType = $job->getType(); $msg = $job->toString() . " STARTING"; $this->logger->debug( $msg ); $this->debugCallback( $msg ); - $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); // Run the job... $rssStart = $this->getMaxRssKb(); $jobStartTime = microtime( true ); try { + $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope + $lbFactory->beginMasterChanges( $fnameTrxOwner ); $status = $job->run(); $error = $job->getLastError(); - $this->commitMasterChanges( $lbFactory, $job ); - + $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner ); + // Run any deferred update tasks; doUpdates() manages transactions itself DeferredUpdates::doUpdates(); - $this->commitMasterChanges( $lbFactory, $job ); } catch ( Exception $e ) { MWExceptionHandler::rollbackMasterChangesAndLog( $e ); $status = false; @@ -497,9 +498,10 @@ class JobRunner implements LoggerAwareInterface { * * @param LBFactory $lbFactory * @param Job $job + * @param string $fnameTrxOwner * @throws DBError */ - private function commitMasterChanges( LBFactory $lbFactory, Job $job ) { + private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) { global $wgJobSerialCommitThreshold; $lb = $lbFactory->getMainLB( wfWikiID() ); @@ -521,7 +523,7 @@ class JobRunner implements LoggerAwareInterface { } if ( !$dbwSerial ) { - $lbFactory->commitMasterChanges( __METHOD__ ); + $lbFactory->commitMasterChanges( $fnameTrxOwner ); return; } @@ -542,7 +544,7 @@ class JobRunner implements LoggerAwareInterface { } // Actually commit the DB master changes - $lbFactory->commitMasterChanges( __METHOD__ ); + $lbFactory->commitMasterChanges( $fnameTrxOwner ); // Release the lock $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ ); diff --git a/includes/jobqueue/jobs/DeleteLinksJob.php b/includes/jobqueue/jobs/DeleteLinksJob.php index 8d565bd1d9..5c0f89f73f 100644 --- a/includes/jobqueue/jobs/DeleteLinksJob.php +++ b/includes/jobqueue/jobs/DeleteLinksJob.php @@ -59,7 +59,7 @@ class DeleteLinksJob extends Job { $update = new LinksDeletionUpdate( $page, $pageId, $timestamp ); $update->setTransactionTicket( $factory->getEmptyTransactionTicket( __METHOD__ ) ); - DataUpdate::runUpdates( [ $update ] ); + $update->doUpdate(); return true; } diff --git a/includes/jobqueue/jobs/RefreshLinksJob.php b/includes/jobqueue/jobs/RefreshLinksJob.php index b0dcd57442..a337da4857 100644 --- a/includes/jobqueue/jobs/RefreshLinksJob.php +++ b/includes/jobqueue/jobs/RefreshLinksJob.php @@ -263,7 +263,9 @@ class RefreshLinksJob extends Job { } } - DataUpdate::runUpdates( $updates ); + foreach ( $updates as $update ) { + $update->doUpdate(); + } InfoAction::invalidateCache( $title ); diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php index c3c2391545..2e011fecde 100644 --- a/maintenance/runJobs.php +++ b/maintenance/runJobs.php @@ -53,8 +53,6 @@ class RunJobs extends Maintenance { } public function execute() { - global $wgCommandLineMode; - if ( $this->hasOption( 'procs' ) ) { $procs = intval( $this->getOption( 'procs' ) ); if ( $procs < 1 || $procs > 1000 ) { @@ -70,10 +68,6 @@ class RunJobs extends Maintenance { $outputJSON = ( $this->getOption( 'result' ) === 'json' ); $wait = $this->hasOption( 'wait' ); - // Enable DBO_TRX for atomicity; JobRunner manages transactions - // and works well in web server mode already (@TODO: this is a hack) - $wgCommandLineMode = false; - $runner = new JobRunner( LoggerFactory::getInstance( 'runJobs' ) ); if ( !$outputJSON ) { $runner->setDebugHandler( [ $this, 'debugInternal' ] ); @@ -111,8 +105,6 @@ class RunJobs extends Maintenance { sleep( 1 ); } - - $wgCommandLineMode = true; } /** -- 2.20.1