From 44f486ffac23cc4e7e1330f5040c5ccaeb491cd1 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Sun, 28 Aug 2016 09:06:57 -0700 Subject: [PATCH] Use transaction listener to run DeferredUpdates in CLI mode This sets triggers on master position waits typically called after commitMasterChanges() or in commitAndWaitForReplication(). Change-Id: I127a8fe3cfc319abfa84fcd221ee2dae191c6d3b --- includes/deferred/DeferredUpdates.php | 108 ++++++++++++++++++++++++++ maintenance/Maintenance.php | 14 ++++ maintenance/doMaintenance.php | 4 + 3 files changed, 126 insertions(+) diff --git a/includes/deferred/DeferredUpdates.php b/includes/deferred/DeferredUpdates.php index ee14e1a89d..5622f9560d 100644 --- a/includes/deferred/DeferredUpdates.php +++ b/includes/deferred/DeferredUpdates.php @@ -19,6 +19,7 @@ * * @file */ +use MediaWiki\MediaWikiServices; /** * Class for managing the deferred updates @@ -46,6 +47,8 @@ class DeferredUpdates { const PRESEND = 1; // for updates that should run before flushing output buffer const POSTSEND = 2; // for updates that should run after flushing output buffer + const BIG_QUEUE_SIZE = 100; + /** * Add an update to the deferred list * @@ -181,6 +184,71 @@ class DeferredUpdates { } } + /** + * Run all deferred updates immediately if there are no DB writes active + * + * If $mode is 'run' but there are busy databates, EnqueueableDataUpdate + * tasks will be enqueued anyway for the sake of progress. + * + * @param string $mode Use "enqueue" to use the job queue when possible + * @return bool Whether updates were allowed to run + * @since 1.28 + */ + public static function tryOpportunisticExecute( $mode = 'run' ) { + static $recursionGuard = false; + if ( $recursionGuard ) { + return false; // COMMITs trigger inside update loop and inside some updates + } + + try { + $recursionGuard = true; + if ( !self::getBusyDbConnections() ) { + self::doUpdates( $mode ); + return true; + } + + if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) { + // If we cannot run the updates with outer transaction context, try to + // at least enqueue all the updates that support queueing to job queue + self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates ); + self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates ); + } + + return !self::pendingUpdatesCount(); + } finally { + $recursionGuard = false; + } + } + + /** + * Enqueue a job for each EnqueueableDataUpdate item and return the other items + * + * @param DeferrableUpdate[] $updates A list of deferred update instances + * @return DeferrableUpdate[] Remaining updates that do not support being queued + */ + private static function enqueueUpdates( array $updates ) { + $remaining = []; + + foreach ( $updates as $update ) { + if ( $update instanceof EnqueueableDataUpdate ) { + $spec = $update->getAsJobSpecification(); + JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] ); + } else { + $remaining[] = $update; + } + } + + return $remaining; + } + + /** + * @return integer Number of enqueued updates + * @since 1.28 + */ + public static function pendingUpdatesCount() { + return count( self::$preSendUpdates ) + count( self::$postSendUpdates ); + } + /** * Clear all pending updates without performing them. Generally, you don't * want or need to call this. Unit tests need it though. @@ -189,4 +257,44 @@ class DeferredUpdates { self::$preSendUpdates = []; self::$postSendUpdates = []; } + + /** + * Set the rollback/commit watcher on a DB to trigger update runs when safe + * + * @TODO: use this to replace DB logic in push() + * @param LoadBalancer $lb + * @since 1.28 + */ + public static function installDBListener( LoadBalancer $lb ) { + static $triggers = [ IDatabase::TRIGGER_COMMIT, IDatabase::TRIGGER_ROLLBACK ]; + // Hook into active master connections to find a moment where no writes are pending + $lb->setTransactionListener( + __METHOD__, + function ( $trigger, IDatabase $conn ) use ( $triggers ) { + global $wgCommandLineMode; + + if ( $wgCommandLineMode && in_array( $trigger, $triggers ) ) { + DeferredUpdates::tryOpportunisticExecute(); + } + } + ); + } + + /** + * @return IDatabase[] Connection where commit() cannot be called yet + */ + private static function getBusyDbConnections() { + $connsBusy = []; + + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) { + $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) { + if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) { + $connsBusy[] = $conn; + } + } ); + } ); + + return $connsBusy; + } } diff --git a/maintenance/Maintenance.php b/maintenance/Maintenance.php index ab316c0632..343687e61c 100644 --- a/maintenance/Maintenance.php +++ b/maintenance/Maintenance.php @@ -37,6 +37,7 @@ define( 'DO_MAINTENANCE', RUN_MAINTENANCE_IF_MAIN ); // original name, harmless $maintClass = false; use MediaWiki\Logger\LoggerFactory; +use MediaWiki\MediaWikiServices; /** * Abstract maintenance class for quickly writing and churning out @@ -548,6 +549,19 @@ abstract class Maintenance { } + /** + * Set triggers like when to try to run deferred updates + * @since 1.28 + */ + public function setTriggers() { + // Hook into period lag checks which often happen in long-running scripts + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbFactory->setWaitForReplicationListener( + __METHOD__, + [ 'DeferredUpdates', 'tryOpportunisticExecute' ] + ); + } + /** * Run a child maintenance script. Pass all of the current arguments * to it. diff --git a/maintenance/doMaintenance.php b/maintenance/doMaintenance.php index 1272ca221a..95bd089dd8 100644 --- a/maintenance/doMaintenance.php +++ b/maintenance/doMaintenance.php @@ -102,6 +102,10 @@ $maintenance->setConfig( ConfigFactory::getDefaultInstance()->makeConfig( 'main' // Sanity-check required extensions are installed $maintenance->checkRequiredExtensions(); +// A good time when no DBs have writes pending is around lag checks. +// This avoids having long running scripts just OOM and lose all the updates. +$maintenance->setTriggers(); + // Do the work $maintenance->execute(); -- 2.20.1