From dc0cdc8a4dd0265317ce9a93a0579a9f84419c87 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Sun, 28 Aug 2016 09:23:52 -0700 Subject: [PATCH] Make DeferredUpdates able to run DataUpdates * Also make ErrorPageError exceptions display themselves in PRESEND mode. Before they were always suppressed. * Make DataUpdate::runUpdates() simply wrap DeferredUpdates::execute(). * Remove unused installDBListener() method, which was basically moved to Maintenance. * Enable DBO_TRX for DeferredUpdates::execute() in CLI mode * Also perform sub-DeferrableUpdate jobs right after their parent for better transaction locality. * Made rollbackMasterChangesAndLog() clear all master transactions/rounds, even if there are no changes yet. This keeps the state cleaner for continuing. * For sanity, avoid calling acquirePageLock() in link updates unless the transaction ticket is set. These locks are already redundant and weaker in range than the locks the Job classes that run them get. This helps guard against DBTransactionError. * Renamed $type to $stage to be more clear about the order. Change-Id: I1e90b56cc80041d70fb9158ac4f027285ad0f2c9 --- includes/api/ApiPurge.php | 4 +- includes/deferred/DataUpdate.php | 44 +--- includes/deferred/DeferredUpdates.php | 194 +++++++++++------- includes/deferred/LinksDeletionUpdate.php | 11 +- includes/deferred/LinksUpdate.php | 10 +- includes/exception/MWExceptionHandler.php | 7 +- tests/phpunit/includes/EditPageTest.php | 2 + .../includes/deferred/DeferredUpdatesTest.php | 86 +++++++- 8 files changed, 220 insertions(+), 138 deletions(-) diff --git a/includes/api/ApiPurge.php b/includes/api/ApiPurge.php index 822369afbb..f671103fb5 100644 --- a/includes/api/ApiPurge.php +++ b/includes/api/ApiPurge.php @@ -91,7 +91,9 @@ class ApiPurge extends ApiBase { # Update the links tables $updates = $content->getSecondaryDataUpdates( $title, null, $forceRecursiveLinkUpdate, $p_result ); - DataUpdate::runUpdates( $updates ); + foreach ( $updates as $update ) { + DeferredUpdates::addUpdate( $update, DeferredUpdates::PRESEND ); + } $r['linkupdate'] = true; diff --git a/includes/deferred/DataUpdate.php b/includes/deferred/DataUpdate.php index cad89b1479..8d26460c9a 100644 --- a/includes/deferred/DataUpdate.php +++ b/includes/deferred/DataUpdate.php @@ -44,50 +44,12 @@ abstract class DataUpdate implements DeferrableUpdate { /** * Convenience method, calls doUpdate() on every DataUpdate in the array. * - * This methods supports transactions logic by first calling beginTransaction() - * on all updates in the array, then calling doUpdate() on each, and, if all goes well, - * then calling commitTransaction() on each update. If an error occurs, - * rollbackTransaction() will be called on any update object that had beginTransaction() - * called but not yet commitTransaction(). - * - * This allows for limited transactional logic across multiple backends for storing - * secondary data. - * * @param DataUpdate[] $updates A list of DataUpdate instances * @param string $mode Use "enqueue" to use the job queue when possible [Default: run] - * @throws Exception|null + * @throws Exception + * @deprecated Since 1.28 Use DeferredUpdates::execute() */ public static function runUpdates( array $updates, $mode = 'run' ) { - if ( $mode === 'enqueue' ) { - // When possible, push updates as jobs instead of calling doUpdate() - $updates = self::enqueueUpdates( $updates ); - } - - foreach ( $updates as $update ) { - $update->doUpdate(); - } - } - - /** - * Enqueue jobs for every DataUpdate that support enqueueUpdate() - * and return the remaining DataUpdate objects (those that do not) - * - * @param DataUpdate[] $updates A list of DataUpdate instances - * @return DataUpdate[] - * @since 1.27 - */ - protected static function enqueueUpdates( array $updates ) { - $remaining = []; - - foreach ( $updates as $update ) { - if ( $update instanceof EnqueueableDataUpdate ) { - $spec = $update->getAsJobSpecification(); - JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] ); - } else { - $remaining[] = $update; - } - } - - return $remaining; + DeferredUpdates::execute( $updates, $mode, DeferredUpdates::ALL ); } } diff --git a/includes/deferred/DeferredUpdates.php b/includes/deferred/DeferredUpdates.php index 5918770d5f..24c7930464 100644 --- a/includes/deferred/DeferredUpdates.php +++ b/includes/deferred/DeferredUpdates.php @@ -30,8 +30,11 @@ use MediaWiki\MediaWikiServices; * run synchronously. If such an update works via queueing, it will be more likely to complete by * the time the client makes their next request after this one. * - * In CLI mode, updates are only deferred until the current wiki has no DB write transaction - * active within this request. + * In CLI mode, updates run immediately if no DB writes are pending. Otherwise, they run when: + * - a) Any waitForReplication() call if no writes are pending on any DB + * - b) A commit happens on Maintenance::getDB( DB_MASTER ) if no writes are pending on any DB + * - c) EnqueueableDataUpdate tasks may enqueue on commit of Maintenance::getDB( DB_MASTER ) + * - d) At the completion of Maintenance::execute() * * When updates are deferred, they use a FIFO queue (one for pre-send and one for post-send). * @@ -43,24 +46,33 @@ class DeferredUpdates { /** @var DeferrableUpdate[] Updates to be deferred until after request end */ private static $postSendUpdates = []; - const ALL = 0; // all updates + const ALL = 0; // all updates; in web requests, use only after flushing the output buffer const PRESEND = 1; // for updates that should run before flushing output buffer const POSTSEND = 2; // for updates that should run after flushing output buffer const BIG_QUEUE_SIZE = 100; + /** @var array|null Information about the current execute() call or null if not running */ + private static $executeContext; + /** * Add an update to the deferred list to be run later by execute() * * In CLI mode, callback magic will also be used to run updates when safe * * @param DeferrableUpdate $update Some object that implements doUpdate() - * @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) + * @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) */ - public static function addUpdate( DeferrableUpdate $update, $type = self::POSTSEND ) { + public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) { global $wgCommandLineMode; - if ( $type === self::PRESEND ) { + // This is a sub-DeferredUpdate, run it right after its parent update + if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) { + self::$executeContext['subqueue'][] = $update; + return; + } + + if ( $stage === self::PRESEND ) { self::push( self::$preSendUpdates, $update ); } else { self::push( self::$postSendUpdates, $update ); @@ -80,31 +92,37 @@ class DeferredUpdates { * @see MWCallableUpdate::__construct() * * @param callable $callable - * @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) + * @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) * @param IDatabase|null $dbw Abort if this DB is rolled back [optional] (since 1.28) */ public static function addCallableUpdate( - $callable, $type = self::POSTSEND, IDatabase $dbw = null + $callable, $stage = self::POSTSEND, IDatabase $dbw = null ) { - self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $type ); + self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage ); } /** * Do any deferred updates and clear the list * * @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"] - * @param integer $type DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27) + * @param integer $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27) */ - public static function doUpdates( $mode = 'run', $type = self::ALL ) { - if ( $type === self::ALL || $type == self::PRESEND ) { - self::execute( self::$preSendUpdates, $mode ); + public static function doUpdates( $mode = 'run', $stage = self::ALL ) { + $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage; + + if ( $stage === self::ALL || $stage === self::PRESEND ) { + self::execute( self::$preSendUpdates, $mode, $stageEffective ); } - if ( $type === self::ALL || $type == self::POSTSEND ) { - self::execute( self::$postSendUpdates, $mode ); + if ( $stage === self::ALL || $stage == self::POSTSEND ) { + self::execute( self::$postSendUpdates, $mode, $stageEffective ); } } + /** + * @param DeferrableUpdate[] $queue + * @param DeferrableUpdate $update + */ private static function push( array &$queue, DeferrableUpdate $update ) { if ( $update instanceof MergeableUpdate ) { $class = get_class( $update ); // fully-qualified class @@ -120,57 +138,101 @@ class DeferredUpdates { } } - public static function execute( array &$queue, $mode ) { - $stats = \MediaWiki\MediaWikiServices::getInstance()->getStatsdDataFactory(); + /** + * @param DeferrableUpdate[] &$queue List of DeferrableUpdate objects + * @param string $mode Use "enqueue" to use the job queue when possible + * @param integer $stage Class constant (PRESEND, POSTSEND) (since 1.28) + * @throws ErrorPageError Happens on top-level calls + * @throws Exception Happens on second-level calls + */ + public static function execute( array &$queue, $mode, $stage ) { + $services = MediaWikiServices::getInstance(); + $stats = $services->getStatsdDataFactory(); + $lbFactory = $services->getDBLoadBalancerFactory(); $method = RequestContext::getMain()->getRequest()->getMethod(); - $updates = $queue; // snapshot of queue - // Keep doing rounds of updates until none get enqueued - while ( count( $updates ) ) { + /** @var ErrorPageError $reportableError */ + $reportableError = null; + /** @var DeferrableUpdate[] $updates Snapshot of queue */ + $updates = $queue; + + // Keep doing rounds of updates until none get enqueued... + while ( $updates ) { $queue = []; // clear the queue - /** @var DataUpdate[] $dataUpdates */ - $dataUpdates = []; - /** @var DeferrableUpdate[] $otherUpdates */ - $otherUpdates = []; - foreach ( $updates as $update ) { - if ( $update instanceof DataUpdate ) { - $dataUpdates[] = $update; - } else { - $otherUpdates[] = $update; + + if ( $mode === 'enqueue' ) { + try { + // Push enqueuable updates to the job queue and get the rest + $updates = self::enqueueUpdates( $updates ); + } catch ( Exception $e ) { + // Let other updates have a chance to run if this failed + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); } + } - $name = $update instanceof DeferrableCallback - ? get_class( $update ) . '-' . $update->getOrigin() - : get_class( $update ); + // Order will be DataUpdate followed by generic DeferrableUpdate tasks + $updatesByType = [ 'data' => [], 'generic' => [] ]; + foreach ( $updates as $du ) { + $updatesByType[$du instanceof DataUpdate ? 'data' : 'generic'][] = $du; + $name = ( $du instanceof DeferrableCallback ) + ? get_class( $du ) . '-' . $du->getOrigin() + : get_class( $du ); $stats->increment( 'deferred_updates.' . $method . '.' . $name ); } - // Delegate DataUpdate execution to the DataUpdate class - try { - DataUpdate::runUpdates( $dataUpdates, $mode ); - } catch ( Exception $e ) { - // Let the other updates occur if these had to rollback - MWExceptionHandler::logException( $e ); - } - // Execute the non-DataUpdate tasks - foreach ( $otherUpdates as $update ) { - try { - $update->doUpdate(); - wfGetLBFactory()->commitMasterChanges( __METHOD__ ); - } catch ( Exception $e ) { - // We don't want exceptions thrown during deferred updates to - // be reported to the user since the output is already sent - if ( !$e instanceof ErrorPageError ) { - MWExceptionHandler::logException( $e ); + // Execute all remaining tasks... + foreach ( $updatesByType as $updatesForType ) { + foreach ( $updatesForType as $update ) { + self::$executeContext = [ + 'update' => $update, + 'stage' => $stage, + 'subqueue' => [] + ]; + /** @var DeferrableUpdate $update */ + $guiError = self::runUpdate( $update, $lbFactory, $stage ); + $reportableError = $reportableError ?: $guiError; + // Do the subqueue updates for $update until there are none + while ( self::$executeContext['subqueue'] ) { + $subUpdate = reset( self::$executeContext['subqueue'] ); + $firstKey = key( self::$executeContext['subqueue'] ); + unset( self::$executeContext['subqueue'][$firstKey] ); + + $guiError = self::runUpdate( $subUpdate, $lbFactory, $stage ); + $reportableError = $reportableError ?: $guiError; } - // Make sure incomplete transactions are not committed and end any - // open atomic sections so that other DB updates have a chance to run - wfGetLBFactory()->rollbackMasterChanges( __METHOD__ ); + self::$executeContext = null; } } $updates = $queue; // new snapshot of queue (check for new entries) } + + if ( $reportableError ) { + throw $reportableError; // throw the first of any GUI errors + } + } + + /** + * @param DeferrableUpdate $update + * @param LBFactory $lbFactory + * @param integer $stage + * @return ErrorPageError|null + */ + private static function runUpdate( DeferrableUpdate $update, LBFactory $lbFactory, $stage ) { + $guiError = null; + try { + $lbFactory->beginMasterChanges( __METHOD__ ); + $update->doUpdate(); + $lbFactory->commitMasterChanges( __METHOD__ ); + } catch ( Exception $e ) { + // Reporting GUI exceptions does not work post-send + if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) { + $guiError = $e; + } + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + } + + return $guiError; } /** @@ -184,6 +246,12 @@ class DeferredUpdates { * @since 1.28 */ public static function tryOpportunisticExecute( $mode = 'run' ) { + // execute() loop is already running + if ( self::$executeContext ) { + return false; + } + + // Avoiding running updates without them having outer scope if ( !self::getBusyDbConnections() ) { self::doUpdates( $mode ); return true; @@ -237,28 +305,6 @@ class DeferredUpdates { self::$postSendUpdates = []; } - /** - * Set the rollback/commit watcher on a DB to trigger update runs when safe - * - * @TODO: use this to replace DB logic in push() - * @param LoadBalancer $lb - * @since 1.28 - */ - public static function installDBListener( LoadBalancer $lb ) { - static $triggers = [ IDatabase::TRIGGER_COMMIT, IDatabase::TRIGGER_ROLLBACK ]; - // Hook into active master connections to find a moment where no writes are pending - $lb->setTransactionListener( - __METHOD__, - function ( $trigger, IDatabase $conn ) use ( $triggers ) { - global $wgCommandLineMode; - - if ( $wgCommandLineMode && in_array( $trigger, $triggers ) ) { - DeferredUpdates::tryOpportunisticExecute(); - } - } - ); - } - /** * @return IDatabase[] Connection where commit() cannot be called yet */ diff --git a/includes/deferred/LinksDeletionUpdate.php b/includes/deferred/LinksDeletionUpdate.php index d0b12a0416..41591665ed 100644 --- a/includes/deferred/LinksDeletionUpdate.php +++ b/includes/deferred/LinksDeletionUpdate.php @@ -64,9 +64,12 @@ class LinksDeletionUpdate extends DataUpdate implements EnqueueableDataUpdate { // Page may already be deleted, so don't just getId() $id = $this->pageId; - // Make sure all links update threads see the changes of each other. - // This handles the case when updates have to batched into several COMMITs. - $scopedLock = LinksUpdate::acquirePageLock( $this->getDB(), $id ); + + if ( $this->ticket ) { + // Make sure all links update threads see the changes of each other. + // This handles the case when updates have to batched into several COMMITs. + $scopedLock = LinksUpdate::acquirePageLock( $this->getDB(), $id ); + } $title = $this->page->getTitle(); $dbw = $this->getDB(); // convenience @@ -189,7 +192,7 @@ class LinksDeletionUpdate extends DataUpdate implements EnqueueableDataUpdate { } } - // Commit and release the lock + // Commit and release the lock (if set) ScopedCallback::consume( $scopedLock ); } diff --git a/includes/deferred/LinksUpdate.php b/includes/deferred/LinksUpdate.php index 69f8d133bf..e24a9c0737 100644 --- a/includes/deferred/LinksUpdate.php +++ b/includes/deferred/LinksUpdate.php @@ -162,14 +162,16 @@ class LinksUpdate extends DataUpdate implements EnqueueableDataUpdate { * @note: this is managed by DeferredUpdates::execute(). Do not run this in a transaction. */ public function doUpdate() { - // Make sure all links update threads see the changes of each other. - // This handles the case when updates have to batched into several COMMITs. - $scopedLock = self::acquirePageLock( $this->getDB(), $this->mId ); + if ( $this->ticket ) { + // Make sure all links update threads see the changes of each other. + // This handles the case when updates have to batched into several COMMITs. + $scopedLock = self::acquirePageLock( $this->getDB(), $this->mId ); + } Hooks::run( 'LinksUpdate', [ &$this ] ); $this->doIncrementalUpdate(); - // Commit and release the lock + // Commit and release the lock (if set) ScopedCallback::consume( $scopedLock ); // Run post-commit hooks without DBO_TRX $this->getDB()->onTransactionIdle( function() { diff --git a/includes/exception/MWExceptionHandler.php b/includes/exception/MWExceptionHandler.php index 7d8b244ba4..9c83d3c365 100644 --- a/includes/exception/MWExceptionHandler.php +++ b/includes/exception/MWExceptionHandler.php @@ -19,6 +19,7 @@ */ use MediaWiki\Logger\LoggerFactory; +use MediaWiki\MediaWikiServices; /** * Handler class for MWExceptions @@ -136,16 +137,16 @@ class MWExceptionHandler { * @param Exception|Throwable $e */ public static function rollbackMasterChangesAndLog( $e ) { - $factory = wfGetLBFactory(); - if ( $factory->hasMasterChanges() ) { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + if ( $lbFactory->hasMasterChanges() ) { $logger = LoggerFactory::getInstance( 'Bug56269' ); $logger->warning( 'Exception thrown with an uncommited database transaction: ' . self::getLogMessage( $e ), self::getLogContext( $e ) ); - $factory->rollbackMasterChanges( __METHOD__ ); } + $lbFactory->rollbackMasterChanges( __METHOD__ ); } /** diff --git a/tests/phpunit/includes/EditPageTest.php b/tests/phpunit/includes/EditPageTest.php index c259a51781..5a01dc006f 100644 --- a/tests/phpunit/includes/EditPageTest.php +++ b/tests/phpunit/includes/EditPageTest.php @@ -348,6 +348,8 @@ class EditPageTest extends MediaWikiLangTestCase { wfGetDB( DB_MASTER )->commit( __METHOD__ ); + $this->assertEquals( 0, DeferredUpdates::pendingUpdatesCount(), 'No deferred updates' ); + if ( $expectedCode != EditPage::AS_BLANK_ARTICLE ) { $latest = $page->getLatest(); $page->doDeleteArticleReal( $pageTitle ); diff --git a/tests/phpunit/includes/deferred/DeferredUpdatesTest.php b/tests/phpunit/includes/deferred/DeferredUpdatesTest.php index ecc67b73ef..4227693a2e 100644 --- a/tests/phpunit/includes/deferred/DeferredUpdatesTest.php +++ b/tests/phpunit/includes/deferred/DeferredUpdatesTest.php @@ -5,10 +5,15 @@ class DeferredUpdatesTest extends MediaWikiTestCase { $this->setMwGlobals( 'wgCommandLineMode', false ); $updates = [ - '1' => 'deferred update 1', - '2' => 'deferred update 2', - '3' => 'deferred update 3', - '2-1' => 'deferred update 1 within deferred update 2', + '1' => "deferred update 1;\n", + '2' => "deferred update 2;\n", + '2-1' => "deferred update 1 within deferred update 2;\n", + '2-2' => "deferred update 2 within deferred update 2;\n", + '3' => "deferred update 3;\n", + '3-1' => "deferred update 1 within deferred update 3;\n", + '3-2' => "deferred update 2 within deferred update 3;\n", + '3-1-1' => "deferred update 1 within deferred update 1 within deferred update 3;\n", + '3-2-1' => "deferred update 1 within deferred update 2 with deferred update 3;\n", ]; DeferredUpdates::addCallableUpdate( function () use ( $updates ) { @@ -23,14 +28,41 @@ class DeferredUpdatesTest extends MediaWikiTestCase { echo $updates['2-1']; } ); + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['2-2']; + } + ); } ); DeferredUpdates::addCallableUpdate( function () use ( $updates ) { - echo $updates[3]; + echo $updates['3']; + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['3-1']; + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['3-1-1']; + } + ); + } + ); + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['3-2']; + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['3-2-1']; + } + ); + } + ); } ); + $this->assertEquals( 3, DeferredUpdates::pendingUpdatesCount() ); + $this->expectOutputString( implode( '', $updates ) ); DeferredUpdates::doUpdates(); @@ -63,13 +95,20 @@ class DeferredUpdatesTest extends MediaWikiTestCase { public function testDoUpdatesCLI() { $this->setMwGlobals( 'wgCommandLineMode', true ); - $updates = [ - '1' => 'deferred update 1', - '2' => 'deferred update 2', - '2-1' => 'deferred update 1 within deferred update 2', - '3' => 'deferred update 3', + '1' => "deferred update 1;\n", + '2' => "deferred update 2;\n", + '2-1' => "deferred update 1 within deferred update 2;\n", + '2-2' => "deferred update 2 within deferred update 2;\n", + '3' => "deferred update 3;\n", + '3-1' => "deferred update 1 within deferred update 3;\n", + '3-2' => "deferred update 2 within deferred update 3;\n", + '3-1-1' => "deferred update 1 within deferred update 1 within deferred update 3;\n", + '3-2-1' => "deferred update 1 within deferred update 2 with deferred update 3;\n", ]; + + wfGetLBFactory()->commitMasterChanges( __METHOD__ ); // clear anything + DeferredUpdates::addCallableUpdate( function () use ( $updates ) { echo $updates['1']; @@ -83,11 +122,36 @@ class DeferredUpdatesTest extends MediaWikiTestCase { echo $updates['2-1']; } ); + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['2-2']; + } + ); } ); DeferredUpdates::addCallableUpdate( function () use ( $updates ) { - echo $updates[3]; + echo $updates['3']; + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['3-1']; + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['3-1-1']; + } + ); + } + ); + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['3-2']; + DeferredUpdates::addCallableUpdate( + function () use ( $updates ) { + echo $updates['3-2-1']; + } + ); + } + ); } ); -- 2.20.1