* mw.Api has a new option, useUS, to use U+001F (Unit Separator) when
appropriate for sending multi-valued parameters. This defaults to true when
the mw.Api instance seems to be for the local wiki.
+* After a client performs an action which alters a database that has replica databases,
+ MediaWiki will wait for the replica databases to synchronize with the master database
+ while it renders the HTML output. However, if the output is a redirect, it will instead
+ alter the redirect URL to include a ?cpPosTime parameter that triggers the database
+ synchronization when the URL is followed by the client.
=== External library changes in 1.28 ===
/**
* @see MediaWiki::preOutputCommit()
+ * @param callable $postCommitWork [default: null]
* @since 1.26
*/
- public function doPreOutputCommit() {
- self::preOutputCommit( $this->context );
+ public function doPreOutputCommit( callable $postCommitWork = null ) {
+ self::preOutputCommit( $this->context, $postCommitWork );
}
/**
* the user can receive a response (in case commit fails)
*
* @param IContextSource $context
+ * @param callable $postCommitWork [default: null]
* @since 1.27
*/
- public static function preOutputCommit( IContextSource $context ) {
+ public static function preOutputCommit(
+ IContextSource $context, callable $postCommitWork = null
+ ) {
// Either all DBs should commit or none
ignore_user_abort( true );
$config = $context->getConfig();
-
+ $request = $context->getRequest();
+ $output = $context->getOutput();
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+
// Commit all changes
$lbFactory->commitMasterChanges(
__METHOD__,
// Abort if any transaction was too big
[ 'maxWriteDuration' => $config->get( 'MaxUserDBWriteDuration' ) ]
);
+ wfDebug( __METHOD__ . ': primary transaction round committed' );
+ // Run updates that need to block the user or affect output (this is the last chance)
DeferredUpdates::doUpdates( 'enqueue', DeferredUpdates::PRESEND );
wfDebug( __METHOD__ . ': pre-send deferred updates completed' );
- // Record ChronologyProtector positions
- $lbFactory->shutdown();
- wfDebug( __METHOD__ . ': all transactions committed' );
+ // Decide when clients block on ChronologyProtector DB position writes
+ if (
+ $request->wasPosted() &&
+ $output->getRedirect() &&
+ $lbFactory->hasOrMadeRecentMasterChanges( INF ) &&
+ self::isWikiClusterURL( $output->getRedirect(), $context )
+ ) {
+ // OutputPage::output() will be fast; $postCommitWork will not be useful for
+ // masking the latency of syncing DB positions accross all datacenters synchronously.
+ // Instead, make use of the RTT time of the client follow redirects.
+ $flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
+ // Client's next request should see 1+ positions with this DBMasterPos::asOf() time
+ $safeUrl = $lbFactory->appendPreShutdownTimeAsQuery(
+ $output->getRedirect(),
+ microtime( true )
+ );
+ $output->redirect( $safeUrl );
+ } else {
+ // OutputPage::output() is fairly slow; run it in $postCommitWork to mask
+ // the latency of syncing DB positions accross all datacenters synchronously
+ $flags = $lbFactory::SHUTDOWN_CHRONPROT_SYNC;
+ }
+ // Record ChronologyProtector positions for DBs affected in this request at this point
+ $lbFactory->shutdown( $flags, $postCommitWork );
+ wfDebug( __METHOD__ . ': LBFactory shutdown completed' );
// Set a cookie to tell all CDN edge nodes to "stick" the user to the DC that handles this
// POST request (e.g. the "master" data center). Also have the user briefly bypass CDN so
// ChronologyProtector works for cacheable URLs.
- $request = $context->getRequest();
if ( $request->wasPosted() && $lbFactory->hasOrMadeRecentMasterChanges() ) {
$expires = time() + $config->get( 'DataCenterUpdateStickTTL' );
$options = [ 'prefix' => '' ];
// also intimately related to the value of $wgCdnReboundPurgeDelay.
if ( $lbFactory->laggedReplicaUsed() ) {
$maxAge = $config->get( 'CdnMaxageLagged' );
- $context->getOutput()->lowerCdnMaxage( $maxAge );
+ $output->lowerCdnMaxage( $maxAge );
$request->response()->header( "X-Database-Lagged: true" );
wfDebugLog( 'replication', "Lagged DB used; CDN cache TTL limited to $maxAge seconds" );
}
// Avoid long-term cache pollution due to message cache rebuild timeouts (T133069)
if ( MessageCache::singleton()->isDisabled() ) {
$maxAge = $config->get( 'CdnMaxageSubstitute' );
- $context->getOutput()->lowerCdnMaxage( $maxAge );
+ $output->lowerCdnMaxage( $maxAge );
$request->response()->header( "X-Response-Substitute: true" );
}
}
+ /**
+ * @param string $url
+ * @param IContextSource $context
+ * @return bool Whether $url is to something on this wiki farm
+ */
+ private function isWikiClusterURL( $url, IContextSource $context ) {
+ static $relevantKeys = [ 'host' => true, 'port' => true ];
+
+ $infoCandidate = wfParseUrl( $url );
+ if ( $infoCandidate === false ) {
+ return false;
+ }
+
+ $infoCandidate = array_intersect_key( $infoCandidate, $relevantKeys );
+ $clusterHosts = array_merge(
+ // Local wiki host (the most common case)
+ [ $context->getConfig()->get( 'CanonicalServer' ) ],
+ // Any local/remote wiki virtual hosts for this wiki farm
+ $context->getConfig()->get( 'LocalVirtualHosts' )
+ );
+
+ foreach ( $clusterHosts as $clusterHost ) {
+ $infoHost = array_intersect_key( wfParseUrl( $clusterHost ), $relevantKeys );
+ if ( $infoCandidate === $infoHost ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
/**
* This function does work that can be done *after* the
* user gets the HTTP response so they don't block on it
// Show visible profiling data if enabled (which cannot be post-send)
Profiler::instance()->logDataPageOutputOnly();
- $that = $this;
- $callback = function () use ( $that, $mode ) {
+ $callback = function () use ( $mode ) {
try {
- $that->restInPeace( $mode );
+ $this->restInPeace( $mode );
} catch ( Exception $e ) {
MWExceptionHandler::handleException( $e );
}
private function main() {
global $wgTitle;
+ $output = $this->context->getOutput();
$request = $this->context->getRequest();
// Send Ajax requests to the Ajax dispatcher.
$dispatcher = new AjaxDispatcher( $this->config );
$dispatcher->performAction( $this->context->getUser() );
+
return;
}
// Setup dummy Title, otherwise OutputPage::redirect will fail
$title = Title::newFromText( 'REDIR', NS_MAIN );
$this->context->setTitle( $title );
- $output = $this->context->getOutput();
// Since we only do this redir to change proto, always send a vary header
$output->addVaryHeader( 'X-Forwarded-Proto' );
$output->redirect( $redirUrl );
$output->output();
+
return;
}
}
if ( $cache->isCacheGood( /* Assume up to date */ ) ) {
// Check incoming headers to see if client has this cached
$timestamp = $cache->cacheTimestamp();
- if ( !$this->context->getOutput()->checkLastModified( $timestamp ) ) {
+ if ( !$output->checkLastModified( $timestamp ) ) {
$cache->loadFromFileCache( $this->context );
}
// Do any stats increment/watchlist stuff
// Assume we're viewing the latest revision (this should always be the case with file cache)
$this->context->getWikiPage()->doViewUpdates( $this->context->getUser() );
// Tell OutputPage that output is taken care of
- $this->context->getOutput()->disable();
+ $output->disable();
+
return;
}
}
// Actually do the work of the request and build up any output
$this->performRequest();
+ // GUI-ify and stash the page output in MediaWiki::doPreOutputCommit() while
+ // ChronologyProtector synchronizes DB positions or slaves accross all datacenters.
+ $buffer = null;
+ $outputWork = function () use ( $output, &$buffer ) {
+ if ( $buffer === null ) {
+ $buffer = $output->output( true );
+ }
+
+ return $buffer;
+ };
+
// Now commit any transactions, so that unreported errors after
// output() don't roll back the whole DB transaction and so that
// we avoid having both success and error text in the response
- $this->doPreOutputCommit();
+ $this->doPreOutputCommit( $outputWork );
- // Output everything!
- $this->context->getOutput()->output();
+ // Now send the actual output
+ print $outputWork();
}
/**
/**
* Finally, all the text has been munged and accumulated into
* the object, let's actually output it:
+ *
+ * @param bool $return Set to true to get the result as a string rather than sending it
+ * @return string|null
+ * @throws Exception
+ * @throws FatalError
+ * @throws MWException
*/
- public function output() {
+ public function output( $return = false ) {
if ( $this->mDoNothing ) {
- return;
+ return $return ? '' : null;
}
$response = $this->getRequest()->response();
}
}
- return;
+ return $return ? '' : null;
} elseif ( $this->mStatusCode ) {
$response->statusHeader( $this->mStatusCode );
}
$this->sendCacheControl();
- ob_end_flush();
-
+ if ( $return ) {
+ return ob_get_clean();
+ } else {
+ ob_end_flush();
+ return null;
+ }
}
/**
protected $key;
/** @var string Hash of client parameters */
protected $clientId;
+ /** @var float|null Minimum UNIX timestamp of 1+ expected startup positions */
+ protected $waitForPosTime;
+ /** @var int Max seconds to wait on positions to appear */
+ protected $waitForPosTimeout = self::POS_WAIT_TIMEOUT;
/** @var bool Whether to no-op all method calls */
protected $enabled = true;
/** @var bool Whether to check and wait on positions */
/** @var float[] Map of (DB master name => 1) */
protected $shutdownTouchDBs = [];
+ /** @var integer Seconds to store positions */
+ const POSITION_TTL = 60;
+ /** @var integer Max time to wait for positions to appear */
+ const POS_WAIT_TIMEOUT = 5;
+
/**
* @param BagOStuff $store
* @param array $client Map of (ip: <IP>, agent: <user-agent>)
+ * @param float $posTime UNIX timestamp
* @since 1.27
*/
- public function __construct( BagOStuff $store, array $client ) {
+ public function __construct( BagOStuff $store, array $client, $posTime = null ) {
$this->store = $store;
$this->clientId = md5( $client['ip'] . "\n" . $client['agent'] );
$this->key = $store->makeGlobalKey( __CLASS__, $this->clientId );
+ $this->waitForPosTime = $posTime;
}
/**
* Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
* May commit chronology data to persistent storage.
*
+ * @param callable|null $workCallback Work to do instead of waiting on syncing positions
+ * @param string $mode One of (sync, async); whether to wait on remote datacenters
* @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
*/
- public function shutdown() {
+ public function shutdown( callable $workCallback = null, $mode = 'sync' ) {
if ( !$this->enabled ) {
return [];
}
+ $store = $this->store;
// Some callers might want to know if a user recently touched a DB.
// These writes do not need to block on all datacenters receiving them.
foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
- $this->store->set(
+ $store->set(
$this->getTouchedKey( $this->store, $dbName ),
microtime( true ),
- BagOStuff::TTL_DAY
+ $store::TTL_DAY
);
}
// CP-protected writes should overwhemingly go to the master datacenter, so get DC-local
// lock to merge the values. Use a DC-local get() and a synchronous all-DC set(). This
// makes it possible for the BagOStuff class to write in parallel to all DCs with one RTT.
- if ( $this->store->lock( $this->key, 3 ) ) {
- $ok = $this->store->set(
+ if ( $store->lock( $this->key, 3 ) ) {
+ if ( $workCallback ) {
+ // Let the store run the work before blocking on a replication sync barrier. By the
+ // time it's done with the work, the barrier should be fast if replication caught up.
+ $store->addBusyCallback( $workCallback );
+ }
+ $ok = $store->set(
$this->key,
- self::mergePositions( $this->store->get( $this->key ), $this->shutdownPositions ),
- BagOStuff::TTL_MINUTE,
- BagOStuff::WRITE_SYNC
+ self::mergePositions( $store->get( $this->key ), $this->shutdownPositions ),
+ self::POSITION_TTL,
+ ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
);
- $this->store->unlock( $this->key );
+ $store->unlock( $this->key );
} else {
$ok = false;
}
if ( !$ok ) {
+ $bouncedPositions = $this->shutdownPositions;
// Raced out too many times or stash is down
wfDebugLog( 'replication',
__METHOD__ . ": failed to save master pos for " .
implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
);
-
- return $this->shutdownPositions;
+ } elseif ( $mode === 'sync' &&
+ $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
+ ) {
+ // Positions may not be in all datacenters, force LBFactory to play it safe
+ wfDebugLog( 'replication',
+ __METHOD__ . ": store does not report ability to sync replicas. " );
+ $bouncedPositions = $this->shutdownPositions;
+ } else {
+ $bouncedPositions = [];
}
- return [];
+ return $bouncedPositions;
}
/**
$this->initialized = true;
if ( $this->wait ) {
- $data = $this->store->get( $this->key );
+ // If there is an expectation to see master positions with a certain min
+ // timestamp, then block until they appear, or until a timeout is reached.
+ if ( $this->waitForPosTime ) {
+ $data = null;
+ $loop = new WaitConditionLoop(
+ function () use ( &$data ) {
+ $data = $this->store->get( $this->key );
+
+ return ( self::minPosTime( $data ) >= $this->waitForPosTime )
+ ? WaitConditionLoop::CONDITION_REACHED
+ : WaitConditionLoop::CONDITION_CONTINUE;
+ },
+ $this->waitForPosTimeout
+ );
+ $result = $loop->invoke();
+ $waitedMs = $loop->getLastWaitTime() * 1e3;
+
+ if ( $result == $loop::CONDITION_REACHED ) {
+ $msg = "expected and found pos time {$this->waitForPosTime} ({$waitedMs}ms)";
+ } else {
+ $msg = "expected but missed pos time {$this->waitForPosTime} ({$waitedMs}ms)";
+ }
+ wfDebugLog( 'replication', $msg );
+ } else {
+ $data = $this->store->get( $this->key );
+ }
+
$this->startupPositions = $data ? $data['positions'] : [];
wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (read)\n" );
} else {
}
}
+ /**
+ * @param array|bool $data
+ * @return float|null
+ */
+ private static function minPosTime( $data ) {
+ if ( !isset( $data['positions'] ) ) {
+ return null;
+ }
+
+ $min = null;
+ foreach ( $data['positions'] as $pos ) {
+ /** @var DBMasterPos $pos */
+ $min = $min ? min( $pos->asOfTime(), $min ) : $pos->asOfTime();
+ }
+
+ return $min;
+ }
+
/**
* @param array|bool $curValue
* @param DBMasterPos[] $shutdownPositions
/** @var callable[] */
protected $replicationWaitCallbacks = [];
- const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
+ const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
+ const SHUTDOWN_CHRONPROT_ASYNC = 1; // save DB positions, but don't wait on remote DCs
+ const SHUTDOWN_CHRONPROT_SYNC = 2; // save DB positions, waiting on all DCs
/**
* Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
* @see LoadBalancer::disable()
*/
public function destroy() {
- $this->shutdown();
+ $this->shutdown( self::SHUTDOWN_NO_CHRONPROT );
$this->forEachLBCallMethod( 'disable' );
}
/**
* Prepare all tracked load balancers for shutdown
- * @param integer $flags Supports SHUTDOWN_* flags
- */
- public function shutdown( $flags = 0 ) {
- if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
- $this->shutdownChronologyProtector( $this->chronProt );
+ * @param integer $mode One of the class SHUTDOWN_* constants
+ * @param callable|null $workCallback Work to mask ChronologyProtector writes
+ */
+ public function shutdown(
+ $mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
+ ) {
+ if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
+ $this->shutdownChronologyProtector( $this->chronProt, $workCallback, 'sync' );
+ } elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
+ $this->shutdownChronologyProtector( $this->chronProt, null, 'async' );
}
+
$this->commitMasterChanges( __METHOD__ ); // sanity
}
/**
* Determine if any master connection has pending/written changes from this request
+ * @param float $age How many seconds ago is "recent" [defaults to LB lag wait timeout]
* @return bool
* @since 1.27
*/
- public function hasOrMadeRecentMasterChanges() {
+ public function hasOrMadeRecentMasterChanges( $age = null ) {
$ret = false;
- $this->forEachLB( function ( LoadBalancer $lb ) use ( &$ret ) {
- $ret = $ret || $lb->hasOrMadeRecentMasterChanges();
+ $this->forEachLB( function ( LoadBalancer $lb ) use ( $age, &$ret ) {
+ $ret = $ret || $lb->hasOrMadeRecentMasterChanges( $age );
} );
return $ret;
}
ObjectCache::getMainStashInstance(),
[
'ip' => $request->getIP(),
- 'agent' => $request->getHeader( 'User-Agent' )
- ]
+ 'agent' => $request->getHeader( 'User-Agent' ),
+ ],
+ $request->getFloat( 'cpPosTime', null )
);
if ( PHP_SAPI === 'cli' ) {
$chronProt->setEnabled( false );
}
/**
+ * Get and record all of the staged DB positions into persistent memory storage
+ *
* @param ChronologyProtector $cp
+ * @param callable|null $workCallback Work to do instead of waiting on syncing positions
+ * @param string $mode One of (sync, async); whether to wait on remote datacenters
*/
- protected function shutdownChronologyProtector( ChronologyProtector $cp ) {
- // Get all the master positions needed
+ protected function shutdownChronologyProtector(
+ ChronologyProtector $cp, $workCallback, $mode
+ ) {
+ // Record all the master positions needed
$this->forEachLB( function ( LoadBalancer $lb ) use ( $cp ) {
$cp->shutdownLB( $lb );
} );
- // Write them to the stash
- $unsavedPositions = $cp->shutdown();
+ // Write them to the persistent stash. Try to do something useful by running $work
+ // while ChronologyProtector waits for the stash write to replicate to all DCs.
+ $unsavedPositions = $cp->shutdown( $workCallback, $mode );
+ if ( $unsavedPositions && $workCallback ) {
+ // Invoke callback in case it did not cache the result yet
+ $workCallback(); // work now to block for less time in waitForAll()
+ }
// If the positions failed to write to the stash, at least wait on local datacenter
// replica DBs to catch up before responding. Even if there are several DCs, this increases
// the chance that the user will see their own changes immediately afterwards. As long
}
}
+ /**
+ * Append ?cpPosTime parameter to a URL for ChronologyProtector purposes if needed
+ *
+ * Note that unlike cookies, this works accross domains
+ *
+ * @param string $url
+ * @param float $time UNIX timestamp just before shutdown() was called
+ * @return string
+ * @since 1.28
+ */
+ public function appendPreShutdownTimeAsQuery( $url, $time ) {
+ $usedCluster = 0;
+ $this->forEachLB( function ( LoadBalancer $lb ) use ( &$usedCluster ) {
+ $usedCluster |= ( $lb->getServerCount() > 1 );
+ } );
+
+ if ( !$usedCluster ) {
+ return $url; // no master/replica clusters touched
+ }
+
+ return wfAppendQuery( $url, [ 'cpPosTime' => $time ] );
+ }
+
/**
* Close all open database connections on all open load balancers.
* @since 1.28
public function closeAll() {
$this->forEachLBCallMethod( 'closeAll', [] );
}
-
}
// Medium attributes constants related to emulation or media type
const ATTR_EMULATION = 1;
const QOS_EMULATION_SQL = 1;
+ // Medium attributes constants related to replica consistency
+ const ATTR_SYNCWRITES = 2; // SYNC_WRITES flag support
+ const QOS_SYNCWRITES_NONE = 1; // replication only supports eventual consistency or less
+ const QOS_SYNCWRITES_BE = 2; // best effort synchronous with limited retries
+ const QOS_SYNCWRITES_QC = 3; // write quorum applied directly to state machines where R+W > N
+ const QOS_SYNCWRITES_SS = 4; // strict-serializable, nodes refuse reads if possible stale
// Generic "unknown" value that is useful for comparisons (e.g. always good enough)
const QOS_UNKNOWN = INF;
}
/** @var MemcachedClient|Memcached */
protected $client;
+ function __construct( array $params ) {
+ parent::__construct( $params );
+
+ $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE; // unreliable
+ }
+
/**
* Fill in some defaults for missing keys in $params.
*
}
// Make sure URL ends with /
$this->url = rtrim( $params['url'], '/' ) . '/';
+ // Default config, R+W > N; no locks on reads though; writes go straight to state-machine
+ $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_QC;
}
/**
} else {
$this->automaticFailover = true;
}
+
+ $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
}
protected function doGet( $key, $flags = 0 ) {
parent::__construct( $params );
$this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
+ $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
if ( isset( $params['servers'] ) ) {
$this->serverInfos = [];
// Default to using the main wiki's database servers
$this->serverInfos = false;
$this->numServers = 1;
+ $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE;
}
if ( isset( $params['purgePeriod'] ) ) {
$this->purgePeriod = intval( $params['purgePeriod'] );
// Commit and close up!
$factory = wfGetLBFactory();
$factory->commitMasterChanges( 'doMaintenance' );
-$factory->shutdown();
+$factory->shutdown( $factory::SHUTDOWN_NO_CHRONPROT );