'TrackBlobs' => __DIR__ . '/maintenance/storage/trackBlobs.php',
'TrackingCategories' => __DIR__ . '/includes/TrackingCategories.php',
'TraditionalImageGallery' => __DIR__ . '/includes/gallery/TraditionalImageGallery.php',
+ 'TransactionRoundAwareUpdate' => __DIR__ . '/includes/deferred/TransactionRoundAwareUpdate.php',
'TransactionRoundDefiningUpdate' => __DIR__ . '/includes/deferred/TransactionRoundDefiningUpdate.php',
'TransformParameterError' => __DIR__ . '/includes/media/TransformParameterError.php',
'TransformTooBigImageAreaError' => __DIR__ . '/includes/media/TransformTooBigImageAreaError.php',
use Wikimedia\Rdbms\IDatabase;
use MediaWiki\MediaWikiServices;
use Wikimedia\Rdbms\LBFactory;
+use Wikimedia\Rdbms\ILBFactory;
use Wikimedia\Rdbms\LoadBalancer;
/**
// Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
do {
if ( $stage === self::ALL || $stage === self::PRESEND ) {
- self::execute( self::$preSendUpdates, $mode, $stageEffective );
+ self::handleUpdateQueue( self::$preSendUpdates, $mode, $stageEffective );
}
if ( $stage === self::ALL || $stage == self::POSTSEND ) {
- self::execute( self::$postSendUpdates, $mode, $stageEffective );
+ self::handleUpdateQueue( self::$postSendUpdates, $mode, $stageEffective );
}
} while ( $stage === self::ALL && self::$preSendUpdates );
}
}
/**
- * Immediately run/queue a list of updates
+ * Immediately run or enqueue a list of updates
*
* @param DeferrableUpdate[] &$queue List of DeferrableUpdate objects
- * @param string $mode Use "enqueue" to use the job queue when possible
+ * @param string $mode Either "run" or "enqueue" (to use the job queue when possible)
* @param int $stage Class constant (PRESEND, POSTSEND) (since 1.28)
* @throws ErrorPageError Happens on top-level calls
* @throws Exception Happens on second-level calls
*/
- protected static function execute( array &$queue, $mode, $stage ) {
+ protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
$services = MediaWikiServices::getInstance();
$stats = $services->getStatsdDataFactory();
$lbFactory = $services->getDBLoadBalancerFactory();
self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
try {
/** @var DeferrableUpdate $update */
- $guiError = self::runUpdate( $update, $lbFactory, $mode, $stage );
+ $guiError = self::handleUpdate( $update, $lbFactory, $mode, $stage );
$reportableError = $reportableError ?: $guiError;
// Do the subqueue updates for $update until there are none
while ( self::$executeContext['subqueue'] ) {
$subUpdate->setTransactionTicket( $ticket );
}
- $guiError = self::runUpdate( $subUpdate, $lbFactory, $mode, $stage );
+ $guiError = self::handleUpdate( $subUpdate, $lbFactory, $mode, $stage );
$reportableError = $reportableError ?: $guiError;
}
} finally {
}
/**
+ * Run or enqueue an update
+ *
* @param DeferrableUpdate $update
* @param LBFactory $lbFactory
* @param string $mode
* @param int $stage
* @return ErrorPageError|null
*/
- private static function runUpdate(
+ private static function handleUpdate(
DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage
) {
$guiError = null;
$spec = $update->getAsJobSpecification();
$domain = $spec['domain'] ?? $spec['wiki'];
JobQueueGroup::singleton( $domain )->push( $spec['job'] );
- } elseif ( $update instanceof TransactionRoundDefiningUpdate ) {
- $update->doUpdate();
} else {
- // Run the bulk of the update now
- $fnameTrxOwner = get_class( $update ) . '::doUpdate';
- $lbFactory->beginMasterChanges( $fnameTrxOwner );
- $update->doUpdate();
- $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ self::attemptUpdate( $update, $lbFactory );
}
} catch ( Exception $e ) {
// Reporting GUI exceptions does not work post-send
if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) {
$guiError = $e;
}
- MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+ $lbFactory->rollbackMasterChanges( __METHOD__ );
// VW-style hack to work around T190178, so we can make sure
// PageMetaDataUpdater doesn't throw exceptions.
return $guiError;
}
+ /**
+ * Attempt to run an update with the appropriate transaction round state it expects
+ *
+ * DeferredUpdate classes that wrap the execution of bundles of other DeferredUpdate
+ * instances can use this method to run the updates. Any such wrapper class should
+ * always use TRX_ROUND_ABSENT itself.
+ *
+ * @param DeferrableUpdate $update
+ * @param ILBFactory $lbFactory
+ * @since 1.34
+ */
+ public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
+ if (
+ $update instanceof TransactionRoundAwareUpdate &&
+ $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
+ ) {
+ $update->doUpdate();
+ } else {
+ // Run the bulk of the update now
+ $fnameTrxOwner = get_class( $update ) . '::doUpdate';
+ $lbFactory->beginMasterChanges( $fnameTrxOwner );
+ $update->doUpdate();
+ $lbFactory->commitMasterChanges( $fnameTrxOwner );
+ }
+ }
+
/**
* Run all deferred updates immediately if there are no DB writes active
*
--- /dev/null
+<?php
+
+/**
+ * Deferrable update that specifies whether it must run outside of any explicit
+ * LBFactory transaction round or must run inside of a round owned by doUpdate().
+ *
+ * @since 1.34
+ */
+interface TransactionRoundAwareUpdate {
+ /** @var int No transaction round should be used */
+ const TRX_ROUND_ABSENT = 1;
+ /** @var int A transaction round owned by self::doUpdate should be used */
+ const TRX_ROUND_PRESENT = 2;
+
+ /**
+ * @return int One of the class TRX_ROUND_* constants
+ */
+ public function getTransactionRoundRequirement();
+}
*
* @since 1.31
*/
-class TransactionRoundDefiningUpdate implements DeferrableUpdate, DeferrableCallback {
+class TransactionRoundDefiningUpdate
+ implements DeferrableUpdate, DeferrableCallback, TransactionRoundAwareUpdate
+{
/** @var callable|null */
private $callback;
/** @var string */
public function getOrigin() {
return $this->fname;
}
+
+ /**
+ * @return int One of the class TRX_ROUND_* constants
+ * @since 1.34
+ */
+ final public function getTransactionRoundRequirement() {
+ return self::TRX_ROUND_ABSENT;
+ }
}
use BagOStuff;
/**
- * Class for ensuring a consistent ordering of events as seen by the user, despite replication.
+ * Helper class for mitigating DB replication lag in order to provide "session consistency"
+ *
+ * This helps to ensure a consistent ordering of events as seen by an client
+ *
* Kind of like Hawking's [[Chronology Protection Agency]].
*/
class ChronologyProtector implements LoggerAwareInterface {
if ( isset( $client['clientId'] ) ) {
$this->clientId = $client['clientId'];
} else {
- $this->clientId = strlen( $secret )
+ $this->clientId = ( $secret != '' )
? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
: md5( $client['ip'] . "\n" . $client['agent'] );
}
}
/**
- * Initialise a ILoadBalancer to give it appropriate chronology protection.
+ * Apply the "session consistency" DB replication position to a new ILoadBalancer
*
- * If the stash has a previous master position recorded, this will try to
- * make sure that the next query to a replica DB of that master will see changes up
+ * If the stash has a previous master position recorded, this will try to make
+ * sure that the next query to a replica DB of that master will see changes up
* to that position by delaying execution. The delay may timeout and allow stale
* data if no non-lagged replica DBs are available.
*
+ * This method should only be called from LBFactory.
+ *
* @param ILoadBalancer $lb
* @return void
*/
- public function initLB( ILoadBalancer $lb ) {
- if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
- return; // non-replicated setup or disabled
+ public function applySessionReplicationPosition( ILoadBalancer $lb ) {
+ if ( !$this->enabled ) {
+ return; // disabled
}
- $this->initPositions();
-
$masterName = $lb->getServerName( $lb->getWriterIndex() );
- if (
- isset( $this->startupPositions[$masterName] ) &&
- $this->startupPositions[$masterName] instanceof DBMasterPos
- ) {
- $pos = $this->startupPositions[$masterName];
- $this->logger->debug( __METHOD__ . ": LB for '$masterName' set to pos $pos\n" );
+ $startupPositions = $this->getStartupMasterPositions();
+
+ $pos = $startupPositions[$masterName] ?? null;
+ if ( $pos instanceof DBMasterPos ) {
+ $this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'\n" );
$lb->waitFor( $pos );
}
}
/**
- * Notify the ChronologyProtector that the ILoadBalancer is about to shut
- * down. Saves replication positions.
+ * Save the "session consistency" DB replication position for an end-of-life ILoadBalancer
+ *
+ * This saves the replication position of the master DB if this request made writes to it.
+ *
+ * This method should only be called from LBFactory.
*
* @param ILoadBalancer $lb
* @return void
*/
- public function shutdownLB( ILoadBalancer $lb ) {
+ public function storeSessionReplicationPosition( ILoadBalancer $lb ) {
if ( !$this->enabled ) {
- return; // not enabled
+ return; // disabled
} elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
// Only save the position if writes have been done on the connection
return;
}
if ( $this->shutdownPositions === [] ) {
+ $this->logger->debug( __METHOD__ . ": no master positions to save\n" );
+
return []; // nothing to save
}
- $this->logger->debug( __METHOD__ . ": saving master pos for " .
+ $this->logger->debug(
+ __METHOD__ . ": saving master pos for " .
implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
);
/**
* Load in previous master positions for the client
*/
- protected function initPositions() {
+ protected function getStartupMasterPositions() {
if ( $this->initialized ) {
- return;
+ return $this->startupPositions;
}
$this->initialized = true;
+ $this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)\n" );
+
if ( $this->wait ) {
// If there is an expectation to see master positions from a certain write
// index or higher, then block until it appears, or until a timeout is reached.
$this->startupPositions = [];
$this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
}
+
+ return $this->startupPositions;
}
/**
[
'ip' => $this->requestInfo['IPAddress'],
'agent' => $this->requestInfo['UserAgent'],
- 'clientId' => $this->requestInfo['ChronologyClientId']
+ 'clientId' => $this->requestInfo['ChronologyClientId'] ?: null
],
$this->requestInfo['ChronologyPositionIndex'],
$this->secret
) {
// Record all the master positions needed
$this->forEachLB( function ( ILoadBalancer $lb ) use ( $cp ) {
- $cp->shutdownLB( $lb );
+ $cp->storeSessionReplicationPosition( $lb );
} );
// Write them to the persistent stash. Try to do something useful by running $work
// while ChronologyProtector waits for the stash write to replicate to all DCs.
'chronologyCallback' => function ( ILoadBalancer $lb ) {
// Defer ChronologyProtector construction in case setRequestInfo() ends up
// being called later (but before the first connection attempt) (T192611)
- $this->getChronologyProtector()->initLB( $lb );
+ $this->getChronologyProtector()->applySessionReplicationPosition( $lb );
},
'roundStage' => $initStage
];
$mockDB2->expects( $this->exactly( 1 ) )->method( 'lastDoneWrites' );
// Nothing to wait for on first HTTP request start
- $cp->initLB( $lb1 );
- $cp->initLB( $lb2 );
+ $cp->applySessionReplicationPosition( $lb1 );
+ $cp->applySessionReplicationPosition( $lb2 );
// Record positions in stash on first HTTP request end
- $cp->shutdownLB( $lb1 );
- $cp->shutdownLB( $lb2 );
+ $cp->storeSessionReplicationPosition( $lb1 );
+ $cp->storeSessionReplicationPosition( $lb2 );
$cpIndex = null;
$cp->shutdown( null, 'sync', $cpIndex );
);
// Wait for last positions to be reached on second HTTP request start
- $cp->initLB( $lb1 );
- $cp->initLB( $lb2 );
+ $cp->applySessionReplicationPosition( $lb1 );
+ $cp->applySessionReplicationPosition( $lb2 );
// Shutdown (nothing to record)
- $cp->shutdownLB( $lb1 );
- $cp->shutdownLB( $lb2 );
+ $cp->storeSessionReplicationPosition( $lb1 );
+ $cp->storeSessionReplicationPosition( $lb2 );
$cpIndex = null;
$cp->shutdown( null, 'sync', $cpIndex );
* @covers DeferredUpdates::addUpdate
* @covers DeferredUpdates::push
* @covers DeferredUpdates::doUpdates
- * @covers DeferredUpdates::execute
- * @covers DeferredUpdates::runUpdate
+ * @covers DeferredUpdates::handleUpdateQueue
+ * @covers DeferredUpdates::attemptUpdate
*/
public function testAddAndRun() {
$update = $this->getMockBuilder( DeferrableUpdate::class )
/**
* @covers DeferredUpdates::doUpdates
- * @covers DeferredUpdates::execute
+ * @covers DeferredUpdates::handleUpdateQueue
* @covers DeferredUpdates::addUpdate
*/
public function testDoUpdatesWeb() {
/**
* @covers DeferredUpdates::doUpdates
- * @covers DeferredUpdates::execute
+ * @covers DeferredUpdates::handleUpdateQueue
* @covers DeferredUpdates::addUpdate
*/
public function testDoUpdatesCLI() {
/**
* @covers DeferredUpdates::doUpdates
- * @covers DeferredUpdates::execute
+ * @covers DeferredUpdates::handleUpdateQueue
* @covers DeferredUpdates::addUpdate
*/
public function testPresendAddOnPostsendRun() {
}
/**
- * @covers DeferredUpdates::runUpdate
+ * @covers DeferredUpdates::attemptUpdate
*/
public function testRunUpdateTransactionScope() {
$this->setMwGlobals( 'wgCommandLineMode', false );
}
/**
- * @covers DeferredUpdates::runUpdate
+ * @covers DeferredUpdates::attemptUpdate
* @covers TransactionRoundDefiningUpdate::getOrigin
*/
public function testRunOuterScopeUpdate() {
$ran = 0;
DeferredUpdates::addUpdate( new TransactionRoundDefiningUpdate(
- function () use ( &$ran, $lbFactory ) {
- $ran++;
- $this->assertFalse( $lbFactory->hasTransactionRound(), 'No transaction' );
- } )
+ function () use ( &$ran, $lbFactory ) {
+ $ran++;
+ $this->assertFalse( $lbFactory->hasTransactionRound(), 'No transaction' );
+ } )
);
DeferredUpdates::doUpdates();
--- /dev/null
+<?php
+
+/**
+ * Holds tests for ChronologyProtector abstract MediaWiki class.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+use Wikimedia\Rdbms\ChronologyProtector;
+
+/**
+ * @group Database
+ * @covers \Wikimedia\Rdbms\ChronologyProtector::__construct
+ * @covers \Wikimedia\Rdbms\ChronologyProtector::getClientId
+ */
+class ChronologyProtectorTest extends PHPUnit\Framework\TestCase {
+ /**
+ * @dataProvider clientIdProvider
+ * @param array $client
+ * @param string $secret
+ * @param string $expectedId
+ */
+ public function testClientId( array $client, $secret, $expectedId ) {
+ $bag = new HashBagOStuff();
+ $cp = new ChronologyProtector( $bag, $client, null, $secret );
+
+ $this->assertEquals( $expectedId, $cp->getClientId() );
+ }
+
+ public function clientIdProvider() {
+ return [
+ [
+ [
+ 'ip' => '127.0.0.1',
+ 'agent' => "Totally-Not-FireFox"
+ ],
+ '',
+ '45e93a9c215c031d38b7c42d8e4700ca',
+ ],
+ [
+ [
+ 'ip' => '127.0.0.7',
+ 'agent' => "Totally-Not-FireFox"
+ ],
+ '',
+ 'b1d604117b51746c35c3df9f293c84dc'
+ ],
+ [
+ [
+ 'ip' => '127.0.0.1',
+ 'agent' => "Totally-FireFox"
+ ],
+ '',
+ '731b4e06a65e2346b497fc811571c4d7'
+ ],
+ [
+ [
+ 'ip' => '127.0.0.1',
+ 'agent' => "Totally-Not-FireFox"
+ ],
+ 'secret',
+ 'defff51ded73cd901253d874c9b2077d'
+ ]
+ ];
+ }
+}