Use transaction listener to run DeferredUpdates in CLI mode
authorAaron Schulz <aschulz@wikimedia.org>
Sun, 28 Aug 2016 16:06:57 +0000 (09:06 -0700)
committerKrinkle <krinklemail@gmail.com>
Thu, 1 Sep 2016 03:30:00 +0000 (03:30 +0000)
This sets triggers on master position waits typically called
after commitMasterChanges() or in commitAndWaitForReplication().

Change-Id: I127a8fe3cfc319abfa84fcd221ee2dae191c6d3b

includes/deferred/DeferredUpdates.php
maintenance/Maintenance.php
maintenance/doMaintenance.php

index ee14e1a..5622f95 100644 (file)
@@ -19,6 +19,7 @@
  *
  * @file
  */
+use MediaWiki\MediaWikiServices;
 
 /**
  * Class for managing the deferred updates
@@ -46,6 +47,8 @@ class DeferredUpdates {
        const PRESEND = 1; // for updates that should run before flushing output buffer
        const POSTSEND = 2; // for updates that should run after flushing output buffer
 
+       const BIG_QUEUE_SIZE = 100;
+
        /**
         * Add an update to the deferred list
         *
@@ -181,6 +184,71 @@ class DeferredUpdates {
                }
        }
 
+       /**
+        * Run all deferred updates immediately if there are no DB writes active
+        *
+        * If $mode is 'run' but there are busy databates, EnqueueableDataUpdate
+        * tasks will be enqueued anyway for the sake of progress.
+        *
+        * @param string $mode Use "enqueue" to use the job queue when possible
+        * @return bool Whether updates were allowed to run
+        * @since 1.28
+        */
+       public static function tryOpportunisticExecute( $mode = 'run' ) {
+               static $recursionGuard = false;
+               if ( $recursionGuard ) {
+                       return false; // COMMITs trigger inside update loop and inside some updates
+               }
+
+               try {
+                       $recursionGuard = true;
+                       if ( !self::getBusyDbConnections() ) {
+                               self::doUpdates( $mode );
+                               return true;
+                       }
+
+                       if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
+                               // If we cannot run the updates with outer transaction context, try to
+                               // at least enqueue all the updates that support queueing to job queue
+                               self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
+                               self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
+                       }
+
+                       return !self::pendingUpdatesCount();
+               } finally {
+                       $recursionGuard = false;
+               }
+       }
+
+       /**
+        * Enqueue a job for each EnqueueableDataUpdate item and return the other items
+        *
+        * @param DeferrableUpdate[] $updates A list of deferred update instances
+        * @return DeferrableUpdate[] Remaining updates that do not support being queued
+        */
+       private static function enqueueUpdates( array $updates ) {
+               $remaining = [];
+
+               foreach ( $updates as $update ) {
+                       if ( $update instanceof EnqueueableDataUpdate ) {
+                               $spec = $update->getAsJobSpecification();
+                               JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
+                       } else {
+                               $remaining[] = $update;
+                       }
+               }
+
+               return $remaining;
+       }
+
+       /**
+        * @return integer Number of enqueued updates
+        * @since 1.28
+        */
+       public static function pendingUpdatesCount() {
+               return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
+       }
+
        /**
         * Clear all pending updates without performing them. Generally, you don't
         * want or need to call this. Unit tests need it though.
@@ -189,4 +257,44 @@ class DeferredUpdates {
                self::$preSendUpdates = [];
                self::$postSendUpdates = [];
        }
+
+       /**
+        * Set the rollback/commit watcher on a DB to trigger update runs when safe
+        *
+        * @TODO: use this to replace DB logic in push()
+        * @param LoadBalancer $lb
+        * @since 1.28
+        */
+       public static function installDBListener( LoadBalancer $lb ) {
+               static $triggers = [ IDatabase::TRIGGER_COMMIT, IDatabase::TRIGGER_ROLLBACK ];
+               // Hook into active master connections to find a moment where no writes are pending
+               $lb->setTransactionListener(
+                       __METHOD__,
+                       function ( $trigger, IDatabase $conn ) use ( $triggers ) {
+                               global $wgCommandLineMode;
+
+                               if ( $wgCommandLineMode && in_array( $trigger, $triggers ) ) {
+                                       DeferredUpdates::tryOpportunisticExecute();
+                               }
+                       }
+               );
+       }
+
+       /**
+        * @return IDatabase[] Connection where commit() cannot be called yet
+        */
+       private static function getBusyDbConnections() {
+               $connsBusy = [];
+
+               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
+                       $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
+                               if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
+                                       $connsBusy[] = $conn;
+                               }
+                       } );
+               } );
+
+               return $connsBusy;
+       }
 }
index ab316c0..343687e 100644 (file)
@@ -37,6 +37,7 @@ define( 'DO_MAINTENANCE', RUN_MAINTENANCE_IF_MAIN ); // original name, harmless
 $maintClass = false;
 
 use MediaWiki\Logger\LoggerFactory;
+use MediaWiki\MediaWikiServices;
 
 /**
  * Abstract maintenance class for quickly writing and churning out
@@ -548,6 +549,19 @@ abstract class Maintenance {
 
        }
 
+       /**
+        * Set triggers like when to try to run deferred updates
+        * @since 1.28
+        */
+       public function setTriggers() {
+               // Hook into period lag checks which often happen in long-running scripts
+               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               $lbFactory->setWaitForReplicationListener(
+                       __METHOD__,
+                       [ 'DeferredUpdates', 'tryOpportunisticExecute' ]
+               );
+       }
+
        /**
         * Run a child maintenance script. Pass all of the current arguments
         * to it.
index 1272ca2..95bd089 100644 (file)
@@ -102,6 +102,10 @@ $maintenance->setConfig( ConfigFactory::getDefaultInstance()->makeConfig( 'main'
 // Sanity-check required extensions are installed
 $maintenance->checkRequiredExtensions();
 
+// A good time when no DBs have writes pending is around lag checks.
+// This avoids having long running scripts just OOM and lose all the updates.
+$maintenance->setTriggers();
+
 // Do the work
 $maintenance->execute();