*
* @file
*/
+use MediaWiki\MediaWikiServices;
/**
* Class for managing the deferred updates
const PRESEND = 1; // for updates that should run before flushing output buffer
const POSTSEND = 2; // for updates that should run after flushing output buffer
+ const BIG_QUEUE_SIZE = 100;
+
/**
* Add an update to the deferred list
*
}
}
+ /**
+ * Run all deferred updates immediately if there are no DB writes active
+ *
+ * If $mode is 'run' but there are busy databates, EnqueueableDataUpdate
+ * tasks will be enqueued anyway for the sake of progress.
+ *
+ * @param string $mode Use "enqueue" to use the job queue when possible
+ * @return bool Whether updates were allowed to run
+ * @since 1.28
+ */
+ public static function tryOpportunisticExecute( $mode = 'run' ) {
+ static $recursionGuard = false;
+ if ( $recursionGuard ) {
+ return false; // COMMITs trigger inside update loop and inside some updates
+ }
+
+ try {
+ $recursionGuard = true;
+ if ( !self::getBusyDbConnections() ) {
+ self::doUpdates( $mode );
+ return true;
+ }
+
+ if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
+ // If we cannot run the updates with outer transaction context, try to
+ // at least enqueue all the updates that support queueing to job queue
+ self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
+ self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
+ }
+
+ return !self::pendingUpdatesCount();
+ } finally {
+ $recursionGuard = false;
+ }
+ }
+
+ /**
+ * Enqueue a job for each EnqueueableDataUpdate item and return the other items
+ *
+ * @param DeferrableUpdate[] $updates A list of deferred update instances
+ * @return DeferrableUpdate[] Remaining updates that do not support being queued
+ */
+ private static function enqueueUpdates( array $updates ) {
+ $remaining = [];
+
+ foreach ( $updates as $update ) {
+ if ( $update instanceof EnqueueableDataUpdate ) {
+ $spec = $update->getAsJobSpecification();
+ JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
+ } else {
+ $remaining[] = $update;
+ }
+ }
+
+ return $remaining;
+ }
+
+ /**
+ * @return integer Number of enqueued updates
+ * @since 1.28
+ */
+ public static function pendingUpdatesCount() {
+ return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
+ }
+
/**
* Clear all pending updates without performing them. Generally, you don't
* want or need to call this. Unit tests need it though.
self::$preSendUpdates = [];
self::$postSendUpdates = [];
}
+
+ /**
+ * Set the rollback/commit watcher on a DB to trigger update runs when safe
+ *
+ * @TODO: use this to replace DB logic in push()
+ * @param LoadBalancer $lb
+ * @since 1.28
+ */
+ public static function installDBListener( LoadBalancer $lb ) {
+ static $triggers = [ IDatabase::TRIGGER_COMMIT, IDatabase::TRIGGER_ROLLBACK ];
+ // Hook into active master connections to find a moment where no writes are pending
+ $lb->setTransactionListener(
+ __METHOD__,
+ function ( $trigger, IDatabase $conn ) use ( $triggers ) {
+ global $wgCommandLineMode;
+
+ if ( $wgCommandLineMode && in_array( $trigger, $triggers ) ) {
+ DeferredUpdates::tryOpportunisticExecute();
+ }
+ }
+ );
+ }
+
+ /**
+ * @return IDatabase[] Connection where commit() cannot be called yet
+ */
+ private static function getBusyDbConnections() {
+ $connsBusy = [];
+
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
+ $lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
+ if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
+ $connsBusy[] = $conn;
+ }
+ } );
+ } );
+
+ return $connsBusy;
+ }
}