* It now works for users without using sessions.
Sessions should not be cluttered with things
unrelated to authentication and tokens.
* Public services doing api.php requests on behalf
of a users only need to set XFF headers (as normal)
for position wait logic to trigger. They can opt out
of ChronologyProtector via a new HTTP header
"ChronologyProtection: false".
* Requests across subdomains, such as the SUL2 handshake
for CentralAuth on account creation, now have position
wait logic applied. This helps avoid anomolies were a
row just written in the last request may not be seen.
* Use merge() to avoid rolling back master positions if
the user has multiple tabs open and doing writes at once.
* $_SESSION global state is gone from ChronologyProtector.
* Cleaned up post-send LBFactory::shutdown() logic for
avoiding master position writes with an explicit flag.
* Use 'replication' debug log group in more places.
Bug: T111264
Change-Id: Ib25d05994d62b25c2f89e67b7f51009c54f4bca8
// Commit and close up!
$factory = wfGetLBFactory();
$factory->commitMasterChanges();
- $factory->shutdown();
+ $factory->shutdown( LBFactory::SHUTDOWN_NO_CHRONPROT );
wfDebug( "Request ended normally\n" );
}
* Kind of like Hawking's [[Chronology Protection Agency]].
*/
class ChronologyProtector {
- /** @var array (DB master name => position) */
- protected $startupPositions = array();
+ /** @var BagOStuff */
+ protected $store;
- /** @var array (DB master name => position) */
- protected $shutdownPositions = array();
+ /** @var string Storage key name */
+ protected $key;
+ /** @var array Map of (ip: <IP>, agent: <user-agent>) */
+ protected $client;
+ /** @var bool Whether to no-op all method calls */
+ protected $enabled = true;
+ /** @var bool Whether to check and wait on positions */
+ protected $wait = true;
- /** @var bool Whether the session data was loaded */
+ /** @var bool Whether the client data was loaded */
protected $initialized = false;
+ /** @var DBMasterPos[] Map of (DB master name => position) */
+ protected $startupPositions = array();
+ /** @var DBMasterPos[] Map of (DB master name => position) */
+ protected $shutdownPositions = array();
+
+ /**
+ * @param BagOStuff $store
+ * @param array $client Map of (ip: <IP>, agent: <user-agent>)
+ * @since 1.27
+ */
+ public function __construct( BagOStuff $store, array $client ) {
+ $this->store = $store;
+ $this->client = $client;
+ $this->key = $store->makeGlobalKey(
+ 'ChronologyProtector',
+ md5( $client['ip'] . "\n" . $client['agent'] )
+ );
+ }
+
+ /**
+ * @param bool $enabled Whether to no-op all method calls
+ * @since 1.27
+ */
+ public function setEnabled( $enabled ) {
+ $this->enabled = $enabled;
+ }
+
+ /**
+ * @param bool $enabled Whether to check and wait on positions
+ * @since 1.27
+ */
+ public function setWaitEnabled( $enabled ) {
+ $this->wait = $enabled;
+ }
/**
* Initialise a LoadBalancer to give it appropriate chronology protection.
*
- * If the session has a previous master position recorded, this will try to
+ * If the stash has a previous master position recorded, this will try to
* make sure that the next query to a slave of that master will see changes up
* to that position by delaying execution. The delay may timeout and allow stale
* data if no non-lagged slaves are available.
* @return void
*/
public function initLB( LoadBalancer $lb ) {
- if ( $lb->getServerCount() <= 1 ) {
- return; // non-replicated setup
- }
- if ( !$this->initialized ) {
- $this->initialized = true;
- if ( isset( $_SESSION[__CLASS__] ) && is_array( $_SESSION[__CLASS__] ) ) {
- $this->startupPositions = $_SESSION[__CLASS__];
- }
+ if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
+ return; // non-replicated setup or disabled
}
- $masterName = $lb->getServerName( 0 );
+
+ $this->initPositions();
+
+ $masterName = $lb->getServerName( $lb->getWriterIndex() );
if ( !empty( $this->startupPositions[$masterName] ) ) {
$info = $lb->parentInfo();
$pos = $this->startupPositions[$masterName];
- wfDebug( __METHOD__ . ": LB '" . $info['id'] . "' waiting for master pos $pos\n" );
+ wfDebugLog( 'replication', __METHOD__ .
+ ": LB '" . $info['id'] . "' waiting for master pos $pos\n" );
$lb->waitFor( $pos );
}
}
* @return void
*/
public function shutdownLB( LoadBalancer $lb ) {
- if ( session_id() == '' || $lb->getServerCount() <= 1 ) {
- return; // don't start a session; don't bother with non-replicated setups
- }
- $masterName = $lb->getServerName( 0 );
- if ( isset( $this->shutdownPositions[$masterName] ) ) {
- return; // already done
+ if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
+ return; // non-replicated setup or disabled
}
- // Only save the position if writes have been done on the connection
- $db = $lb->getAnyOpenConnection( 0 );
+
$info = $lb->parentInfo();
+ $masterName = $lb->getServerName( $lb->getWriterIndex() );
+
+ // Only save the position if writes have been done on the connection
+ $db = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
if ( !$db || !$db->doneWrites() ) {
- wfDebug( __METHOD__ . ": LB {$info['id']}, no writes done\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": LB {$info['id']}, no writes done\n" );
- return;
+ return; // nothing to do
}
+
$pos = $db->getMasterPos();
- wfDebug( __METHOD__ . ": LB {$info['id']} has master pos $pos\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": LB {$info['id']} has master pos $pos\n" );
$this->shutdownPositions[$masterName] = $pos;
}
* Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
* May commit chronology data to persistent storage.
*
- * @return void
+ * @return array Empty on success; returns the (db name => position) map on failure
*/
public function shutdown() {
- if ( session_id() != '' && count( $this->shutdownPositions ) ) {
- wfDebug( __METHOD__ . ": saving master pos for " .
- count( $this->shutdownPositions ) . " master(s)\n" );
- $_SESSION[__CLASS__] = $this->shutdownPositions;
+ if ( !$this->enabled || !count( $this->shutdownPositions ) ) {
+ return true; // nothing to save
+ }
+
+ wfDebugLog( 'replication',
+ __METHOD__ . ": saving master pos for " .
+ implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
+ );
+
+ $shutdownPositions = $this->shutdownPositions;
+ $ok = $this->store->merge(
+ $this->key,
+ function ( $store, $key, $curValue ) use ( $shutdownPositions ) {
+ /** @var $curPositions DBMasterPos[] */
+ if ( $curValue === false ) {
+ $curPositions = $shutdownPositions;
+ } else {
+ $curPositions = $curValue['positions'];
+ // Use the newest positions for each DB master
+ foreach ( $shutdownPositions as $db => $pos ) {
+ if ( !isset( $curPositions[$db] )
+ || $pos->asOfTime() > $curPositions[$db]->asOfTime()
+ ) {
+ $curPositions[$db] = $pos;
+ }
+ }
+ }
+
+ return array( 'positions' => $curPositions );
+ },
+ BagOStuff::TTL_MINUTE,
+ 10,
+ BagOStuff::WRITE_SYNC // visible in all datacenters
+ );
+
+ if ( !$ok ) {
+ // Raced out too many times or stash is down
+ wfDebugLog( 'replication',
+ __METHOD__ . ": failed to save master pos for " .
+ implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
+ );
+
+ return $this->shutdownPositions;
+ }
+
+ return array();
+ }
+
+ /**
+ * Load in previous master positions for the client
+ */
+ protected function initPositions() {
+ if ( $this->initialized ) {
+ return;
+ }
+
+ $this->initialized = true;
+ if ( $this->wait ) {
+ $data = $this->store->get( $this->key );
+ $this->startupPositions = $data ? $data['positions'] : array();
+
+ wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (read)\n" );
+ } else {
+ $this->startupPositions = array();
+
+ wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (unread)\n" );
}
}
}
/** @var string|bool Reason all LBs are read-only or false if not */
protected $readOnlyReason = false;
+ const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
+
/**
* Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
* @param array $conf
/**
* Prepare all tracked load balancers for shutdown
+ * @param integer $flags Supports SHUTDOWN_* flags
* STUB
*/
- public function shutdown() {
+ public function shutdown( $flags = 0 ) {
}
/**
} );
return $ret;
}
+
+ /**
+ * @return ChronologyProtector
+ */
+ protected function newChronologyProtector() {
+ $request = RequestContext::getMain()->getRequest();
+ $chronProt = new ChronologyProtector(
+ ObjectCache::getMainStashInstance(),
+ array(
+ 'ip' => $request->getIP(),
+ 'agent' => $request->getHeader( 'User-Agent' )
+ )
+ );
+ if ( PHP_SAPI === 'cli' ) {
+ $chronProt->setEnabled( false );
+ } elseif ( $request->getHeader( 'ChronologyProtection' ) === 'false' ) {
+ // Request opted out of using position wait logic. This is useful for requests
+ // done by the job queue or background ETL that do not have a meaningful session.
+ $chronProt->setWaitEnabled( false );
+ }
+
+ return $chronProt;
+ }
+
+ /**
+ * @param ChronologyProtector $cp
+ */
+ protected function shutdownChronologyProtector( ChronologyProtector $cp ) {
+ // Get all the master positions needed
+ $this->forEachLB( function ( LoadBalancer $lb ) use ( $cp ) {
+ $cp->shutdownLB( $lb );
+ } );
+ // Write them to the stash
+ $unsavedPositions = $cp->shutdown();
+ // If the positions failed to write to the stash, at least wait on local datacenter
+ // slaves to catch up before responding. Even if there are several DCs, this increases
+ // the chance that the user will see their own changes immediately afterwards. As long
+ // as the sticky DC cookie applies (same domain), this is not even an issue.
+ $this->forEachLB( function ( LoadBalancer $lb ) use ( $unsavedPositions ) {
+ $masterName = $lb->getServerName( $lb->getWriterIndex() );
+ if ( isset( $unsavedPositions[$masterName] ) ) {
+ $lb->waitForAll( $unsavedPositions[$masterName] );
+ }
+ } );
+ }
}
/**
* @ingroup Database
*/
class LBFactoryMulti extends LBFactory {
- // Required settings
+ /** @var ChronologyProtector */
+ private $chronProt;
/** @var array A map of database names to section names */
private $sectionsByDB;
public function __construct( array $conf ) {
parent::__construct( $conf );
- $this->chronProt = new ChronologyProtector;
$this->conf = $conf;
$required = array( 'sectionsByDB', 'sectionLoads', 'serverTemplate' );
$optional = array( 'groupLoadsBySection', 'groupLoadsByDB', 'hostsByName',
$this->$key = $conf[$key];
}
}
+
+ $this->chronProt = $this->newChronologyProtector();
}
/**
}
}
- public function shutdown() {
- foreach ( $this->mainLBs as $lb ) {
- $this->chronProt->shutdownLB( $lb );
- }
- foreach ( $this->extLBs as $extLB ) {
- $this->chronProt->shutdownLB( $extLB );
+ public function shutdown( $flags = 0 ) {
+ if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
+ $this->shutdownChronologyProtector( $this->chronProt );
}
- $this->chronProt->shutdown();
- $this->commitMasterChanges();
+ $this->commitMasterChanges(); // sanity
}
}
public function __construct( array $conf ) {
parent::__construct( $conf );
- $this->chronProt = new ChronologyProtector;
$this->loadMonitorClass = isset( $conf['loadMonitorClass'] )
? $conf['loadMonitorClass']
: null;
+
+ $this->chronProt = $this->newChronologyProtector();
}
/**
}
}
- public function shutdown() {
- if ( $this->mainLB ) {
- $this->chronProt->shutdownLB( $this->mainLB );
- }
- foreach ( $this->extLBs as $extLB ) {
- $this->chronProt->shutdownLB( $extLB );
+ public function shutdown( $flags = 0 ) {
+ if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
+ $this->shutdownChronologyProtector( $this->chronProt );
}
- $this->chronProt->shutdown();
- $this->commitMasterChanges();
+ $this->commitMasterChanges(); // sanity
}
}
/** @var DBMasterPos $knownReachedPos */
$knownReachedPos = $this->srvCache->get( $key );
if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
- wfDebugLog( 'replication', __METHOD__ . ": Slave $server known to be caught up.\n" );
+ wfDebugLog( 'replication', __METHOD__ .
+ ": slave $server known to be caught up (pos >= $knownReachedPos).\n" );
return true;
}
} else {
$conn = $this->openConnection( $index, '' );
if ( !$conn ) {
- wfDebugLog( 'replication', __METHOD__ . ": failed to open connection to $server\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": failed to connect to $server\n" );
return false;
}
# (a) Check the local APC cache
$value = $this->srvCache->get( $key );
if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
- wfDebugLog( 'replication', __FUNCTION__ . ": got lag times ($key) from local cache" );
+ wfDebugLog( 'replication', __METHOD__ . ": got lag times ($key) from local cache" );
return $value['lagTimes']; // cache hit
}
$staleValue = $value ?: false;
$value = $this->mainCache->get( $key );
if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
$this->srvCache->set( $key, $value, $staleTTL );
- wfDebugLog( 'replication', __FUNCTION__ . ": got lag times ($key) from main cache" );
+ wfDebugLog( 'replication', __METHOD__ . ": got lag times ($key) from main cache" );
return $value['lagTimes']; // cache hit
}
$value = array( 'lagTimes' => $lagTimes, 'timestamp' => microtime( true ) );
$this->mainCache->set( $key, $value, $staleTTL );
$this->srvCache->set( $key, $value, $staleTTL );
- wfDebugLog( 'replication', __FUNCTION__ . ": re-calculated lag times ($key)" );
+ wfDebugLog( 'replication', __METHOD__ . ": re-calculated lag times ($key)" );
return $value['lagTimes'];
}
$factory->shutdown();
$lb->closeAll();
}
+
+ public function testChronologyProtector() {
+ // (a) First HTTP request
+ $mPos = new MySQLMasterPos( 'db1034-bin.000976', '843431247' );
+
+ $mockDB = $this->getMockBuilder( 'DatabaseMysql' )
+ ->disableOriginalConstructor()
+ ->getMock();
+ $mockDB->expects( $this->any() )
+ ->method( 'doneWrites' )->will( $this->returnValue( true ) );
+ $mockDB->expects( $this->any() )
+ ->method( 'getMasterPos' )->will( $this->returnValue( $mPos ) );
+
+ $lb = $this->getMockBuilder( 'LoadBalancer' )
+ ->disableOriginalConstructor()
+ ->getMock();
+ $lb->expects( $this->any() )
+ ->method( 'getConnection' )->will( $this->returnValue( $mockDB ) );
+ $lb->expects( $this->any() )
+ ->method( 'getServerCount' )->will( $this->returnValue( 2 ) );
+ $lb->expects( $this->any() )
+ ->method( 'parentInfo' )->will( $this->returnValue( array( 'id' => "main-DEFAULT" ) ) );
+ $lb->expects( $this->any() )
+ ->method( 'getAnyOpenConnection' )->will( $this->returnValue( $mockDB ) );
+
+ $bag = new HashBagOStuff();
+ $cp = new ChronologyProtector(
+ $bag,
+ array(
+ 'ip' => '127.0.0.1',
+ 'agent' => "Totally-Not-FireFox"
+ )
+ );
+
+ $mockDB->expects( $this->exactly( 2 ) )->method( 'doneWrites' );
+
+ // Nothing to wait for
+ $cp->initLB( $lb );
+ // Record in stash
+ $cp->shutdownLB( $lb );
+ $cp->shutdown();
+
+ // (b) Second HTTP request
+ $cp = new ChronologyProtector(
+ $bag,
+ array(
+ 'ip' => '127.0.0.1',
+ 'agent' => "Totally-Not-FireFox"
+ )
+ );
+
+ $lb->expects( $this->once() )
+ ->method( 'waitFor' )->with( $this->equalTo( $mPos ) );
+
+ // Wait
+ $cp->initLB( $lb );
+ // Record in stash
+ $cp->shutdownLB( $lb );
+ $cp->shutdown();
+ }
}