Decouple ChronologyProtector from user sessions
authorAaron Schulz <aschulz@wikimedia.org>
Sun, 18 Oct 2015 19:53:40 +0000 (12:53 -0700)
committerTim Starling <tstarling@wikimedia.org>
Thu, 12 Nov 2015 23:11:18 +0000 (23:11 +0000)
* It now works for users without using sessions.
  Sessions should not be cluttered with things
  unrelated to authentication and tokens.
* Public services doing api.php requests on behalf
  of a users only need to set XFF headers (as normal)
  for position wait logic to trigger. They can opt out
  of ChronologyProtector via a new HTTP header
  "ChronologyProtection: false".
* Requests across subdomains, such as the SUL2 handshake
  for CentralAuth on account creation, now have position
  wait logic applied. This helps avoid anomolies were a
  row just written in the last request may not be seen.
* Use merge() to avoid rolling back master positions if
  the user has multiple tabs open and doing writes at once.
* $_SESSION global state is gone from ChronologyProtector.
* Cleaned up post-send LBFactory::shutdown() logic for
  avoiding master position writes with an explicit flag.
* Use 'replication' debug log group in more places.

Bug: T111264
Change-Id: Ib25d05994d62b25c2f89e67b7f51009c54f4bca8

includes/MediaWiki.php
includes/db/ChronologyProtector.php
includes/db/loadbalancer/LBFactory.php
includes/db/loadbalancer/LBFactoryMulti.php
includes/db/loadbalancer/LBFactorySimple.php
includes/db/loadbalancer/LoadBalancer.php
includes/db/loadbalancer/LoadMonitorMySQL.php
tests/phpunit/includes/db/LBFactoryTest.php

index d048b57..d5aac07 100644 (file)
@@ -710,7 +710,7 @@ class MediaWiki {
                // Commit and close up!
                $factory = wfGetLBFactory();
                $factory->commitMasterChanges();
-               $factory->shutdown();
+               $factory->shutdown( LBFactory::SHUTDOWN_NO_CHRONPROT );
 
                wfDebug( "Request ended normally\n" );
        }
index 6840d17..1ef26d2 100644 (file)
  * Kind of like Hawking's [[Chronology Protection Agency]].
  */
 class ChronologyProtector {
-       /** @var array (DB master name => position) */
-       protected $startupPositions = array();
+       /** @var BagOStuff */
+       protected $store;
 
-       /** @var array (DB master name => position) */
-       protected $shutdownPositions = array();
+       /** @var string Storage key name */
+       protected $key;
+       /** @var array Map of (ip: <IP>, agent: <user-agent>) */
+       protected $client;
+       /** @var bool Whether to no-op all method calls */
+       protected $enabled = true;
+       /** @var bool Whether to check and wait on positions */
+       protected $wait = true;
 
-       /** @var bool Whether the session data was loaded */
+       /** @var bool Whether the client data was loaded */
        protected $initialized = false;
+       /** @var DBMasterPos[] Map of (DB master name => position) */
+       protected $startupPositions = array();
+       /** @var DBMasterPos[] Map of (DB master name => position) */
+       protected $shutdownPositions = array();
+
+       /**
+        * @param BagOStuff $store
+        * @param array $client Map of (ip: <IP>, agent: <user-agent>)
+        * @since 1.27
+        */
+       public function __construct( BagOStuff $store, array $client ) {
+               $this->store = $store;
+               $this->client = $client;
+               $this->key = $store->makeGlobalKey(
+                       'ChronologyProtector',
+                       md5( $client['ip'] . "\n" . $client['agent'] )
+               );
+       }
+
+       /**
+        * @param bool $enabled Whether to no-op all method calls
+        * @since 1.27
+        */
+       public function setEnabled( $enabled ) {
+               $this->enabled = $enabled;
+       }
+
+       /**
+        * @param bool $enabled Whether to check and wait on positions
+        * @since 1.27
+        */
+       public function setWaitEnabled( $enabled ) {
+               $this->wait = $enabled;
+       }
 
        /**
         * Initialise a LoadBalancer to give it appropriate chronology protection.
         *
-        * If the session has a previous master position recorded, this will try to
+        * If the stash has a previous master position recorded, this will try to
         * make sure that the next query to a slave of that master will see changes up
         * to that position by delaying execution. The delay may timeout and allow stale
         * data if no non-lagged slaves are available.
@@ -47,20 +87,18 @@ class ChronologyProtector {
         * @return void
         */
        public function initLB( LoadBalancer $lb ) {
-               if ( $lb->getServerCount() <= 1 ) {
-                       return; // non-replicated setup
-               }
-               if ( !$this->initialized ) {
-                       $this->initialized = true;
-                       if ( isset( $_SESSION[__CLASS__] ) && is_array( $_SESSION[__CLASS__] ) ) {
-                               $this->startupPositions = $_SESSION[__CLASS__];
-                       }
+               if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
+                       return; // non-replicated setup or disabled
                }
-               $masterName = $lb->getServerName( 0 );
+
+               $this->initPositions();
+
+               $masterName = $lb->getServerName( $lb->getWriterIndex() );
                if ( !empty( $this->startupPositions[$masterName] ) ) {
                        $info = $lb->parentInfo();
                        $pos = $this->startupPositions[$masterName];
-                       wfDebug( __METHOD__ . ": LB '" . $info['id'] . "' waiting for master pos $pos\n" );
+                       wfDebugLog( 'replication', __METHOD__ .
+                               ": LB '" . $info['id'] . "' waiting for master pos $pos\n" );
                        $lb->waitFor( $pos );
                }
        }
@@ -73,23 +111,23 @@ class ChronologyProtector {
         * @return void
         */
        public function shutdownLB( LoadBalancer $lb ) {
-               if ( session_id() == '' || $lb->getServerCount() <= 1 ) {
-                       return; // don't start a session; don't bother with non-replicated setups
-               }
-               $masterName = $lb->getServerName( 0 );
-               if ( isset( $this->shutdownPositions[$masterName] ) ) {
-                       return; // already done
+               if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
+                       return; // non-replicated setup or disabled
                }
-               // Only save the position if writes have been done on the connection
-               $db = $lb->getAnyOpenConnection( 0 );
+
                $info = $lb->parentInfo();
+               $masterName = $lb->getServerName( $lb->getWriterIndex() );
+
+               // Only save the position if writes have been done on the connection
+               $db = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
                if ( !$db || !$db->doneWrites() ) {
-                       wfDebug( __METHOD__ . ": LB {$info['id']}, no writes done\n" );
+                       wfDebugLog( 'replication', __METHOD__ . ": LB {$info['id']}, no writes done\n" );
 
-                       return;
+                       return; // nothing to do
                }
+
                $pos = $db->getMasterPos();
-               wfDebug( __METHOD__ . ": LB {$info['id']} has master pos $pos\n" );
+               wfDebugLog( 'replication', __METHOD__ . ": LB {$info['id']} has master pos $pos\n" );
                $this->shutdownPositions[$masterName] = $pos;
        }
 
@@ -97,13 +135,75 @@ class ChronologyProtector {
         * Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
         * May commit chronology data to persistent storage.
         *
-        * @return void
+        * @return array Empty on success; returns the (db name => position) map on failure
         */
        public function shutdown() {
-               if ( session_id() != '' && count( $this->shutdownPositions ) ) {
-                       wfDebug( __METHOD__ . ": saving master pos for " .
-                               count( $this->shutdownPositions ) . " master(s)\n" );
-                       $_SESSION[__CLASS__] = $this->shutdownPositions;
+               if ( !$this->enabled || !count( $this->shutdownPositions ) ) {
+                       return true; // nothing to save
+               }
+
+               wfDebugLog( 'replication',
+                       __METHOD__ . ": saving master pos for " .
+                       implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
+               );
+
+               $shutdownPositions = $this->shutdownPositions;
+               $ok = $this->store->merge(
+                       $this->key,
+                       function ( $store, $key, $curValue ) use ( $shutdownPositions ) {
+                               /** @var $curPositions DBMasterPos[] */
+                               if ( $curValue === false ) {
+                                       $curPositions = $shutdownPositions;
+                               } else {
+                                       $curPositions = $curValue['positions'];
+                                       // Use the newest positions for each DB master
+                                       foreach ( $shutdownPositions as $db => $pos ) {
+                                               if ( !isset( $curPositions[$db] )
+                                                       || $pos->asOfTime() > $curPositions[$db]->asOfTime()
+                                               ) {
+                                                       $curPositions[$db] = $pos;
+                                               }
+                                       }
+                               }
+
+                               return array( 'positions' => $curPositions );
+                       },
+                       BagOStuff::TTL_MINUTE,
+                       10,
+                       BagOStuff::WRITE_SYNC // visible in all datacenters
+               );
+
+               if ( !$ok ) {
+                       // Raced out too many times or stash is down
+                       wfDebugLog( 'replication',
+                               __METHOD__ . ": failed to save master pos for " .
+                               implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
+                       );
+
+                       return $this->shutdownPositions;
+               }
+
+               return array();
+       }
+
+       /**
+        * Load in previous master positions for the client
+        */
+       protected function initPositions() {
+               if ( $this->initialized ) {
+                       return;
+               }
+
+               $this->initialized = true;
+               if ( $this->wait ) {
+                       $data = $this->store->get( $this->key );
+                       $this->startupPositions = $data ? $data['positions'] : array();
+
+                       wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (read)\n" );
+               } else {
+                       $this->startupPositions = array();
+
+                       wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (unread)\n" );
                }
        }
 }
index 86f0110..8d298e0 100644 (file)
@@ -32,6 +32,8 @@ abstract class LBFactory {
        /** @var string|bool Reason all LBs are read-only or false if not */
        protected $readOnlyReason = false;
 
+       const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
+
        /**
         * Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
         * @param array $conf
@@ -170,9 +172,10 @@ abstract class LBFactory {
 
        /**
         * Prepare all tracked load balancers for shutdown
+        * @param integer $flags Supports SHUTDOWN_* flags
         * STUB
         */
-       public function shutdown() {
+       public function shutdown( $flags = 0 ) {
        }
 
        /**
@@ -254,6 +257,51 @@ abstract class LBFactory {
                } );
                return $ret;
        }
+
+       /**
+        * @return ChronologyProtector
+        */
+       protected function newChronologyProtector() {
+               $request = RequestContext::getMain()->getRequest();
+               $chronProt = new ChronologyProtector(
+                       ObjectCache::getMainStashInstance(),
+                       array(
+                               'ip' => $request->getIP(),
+                               'agent' => $request->getHeader( 'User-Agent' )
+                       )
+               );
+               if ( PHP_SAPI === 'cli' ) {
+                       $chronProt->setEnabled( false );
+               } elseif ( $request->getHeader( 'ChronologyProtection' ) === 'false' ) {
+                       // Request opted out of using position wait logic. This is useful for requests
+                       // done by the job queue or background ETL that do not have a meaningful session.
+                       $chronProt->setWaitEnabled( false );
+               }
+
+               return $chronProt;
+       }
+
+       /**
+        * @param ChronologyProtector $cp
+        */
+       protected function shutdownChronologyProtector( ChronologyProtector $cp ) {
+               // Get all the master positions needed
+               $this->forEachLB( function ( LoadBalancer $lb ) use ( $cp ) {
+                       $cp->shutdownLB( $lb );
+               } );
+               // Write them to the stash
+               $unsavedPositions = $cp->shutdown();
+               // If the positions failed to write to the stash, at least wait on local datacenter
+               // slaves to catch up before responding. Even if there are several DCs, this increases
+               // the chance that the user will see their own changes immediately afterwards. As long
+               // as the sticky DC cookie applies (same domain), this is not even an issue.
+               $this->forEachLB( function ( LoadBalancer $lb ) use ( $unsavedPositions ) {
+                       $masterName = $lb->getServerName( $lb->getWriterIndex() );
+                       if ( isset( $unsavedPositions[$masterName] ) ) {
+                               $lb->waitForAll( $unsavedPositions[$masterName] );
+                       }
+               } );
+       }
 }
 
 /**
index 089dfd3..556fd30 100644 (file)
@@ -76,7 +76,8 @@
  * @ingroup Database
  */
 class LBFactoryMulti extends LBFactory {
-       // Required settings
+       /** @var ChronologyProtector */
+       private $chronProt;
 
        /** @var array A map of database names to section names */
        private $sectionsByDB;
@@ -160,7 +161,6 @@ class LBFactoryMulti extends LBFactory {
        public function __construct( array $conf ) {
                parent::__construct( $conf );
 
-               $this->chronProt = new ChronologyProtector;
                $this->conf = $conf;
                $required = array( 'sectionsByDB', 'sectionLoads', 'serverTemplate' );
                $optional = array( 'groupLoadsBySection', 'groupLoadsByDB', 'hostsByName',
@@ -180,6 +180,8 @@ class LBFactoryMulti extends LBFactory {
                                $this->$key = $conf[$key];
                        }
                }
+
+               $this->chronProt = $this->newChronologyProtector();
        }
 
        /**
@@ -401,14 +403,10 @@ class LBFactoryMulti extends LBFactory {
                }
        }
 
-       public function shutdown() {
-               foreach ( $this->mainLBs as $lb ) {
-                       $this->chronProt->shutdownLB( $lb );
-               }
-               foreach ( $this->extLBs as $extLB ) {
-                       $this->chronProt->shutdownLB( $extLB );
+       public function shutdown( $flags = 0 ) {
+               if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
+                       $this->shutdownChronologyProtector( $this->chronProt );
                }
-               $this->chronProt->shutdown();
-               $this->commitMasterChanges();
+               $this->commitMasterChanges(); // sanity
        }
 }
index 353c47a..b9c6cd1 100644 (file)
@@ -38,10 +38,11 @@ class LBFactorySimple extends LBFactory {
        public function __construct( array $conf ) {
                parent::__construct( $conf );
 
-               $this->chronProt = new ChronologyProtector;
                $this->loadMonitorClass = isset( $conf['loadMonitorClass'] )
                        ? $conf['loadMonitorClass']
                        : null;
+
+               $this->chronProt = $this->newChronologyProtector();
        }
 
        /**
@@ -159,14 +160,10 @@ class LBFactorySimple extends LBFactory {
                }
        }
 
-       public function shutdown() {
-               if ( $this->mainLB ) {
-                       $this->chronProt->shutdownLB( $this->mainLB );
-               }
-               foreach ( $this->extLBs as $extLB ) {
-                       $this->chronProt->shutdownLB( $extLB );
+       public function shutdown( $flags = 0 ) {
+               if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
+                       $this->shutdownChronologyProtector( $this->chronProt );
                }
-               $this->chronProt->shutdown();
-               $this->commitMasterChanges();
+               $this->commitMasterChanges(); // sanity
        }
 }
index 052a0fa..afd4ba8 100644 (file)
@@ -455,7 +455,8 @@ class LoadBalancer {
                /** @var DBMasterPos $knownReachedPos */
                $knownReachedPos = $this->srvCache->get( $key );
                if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
-                       wfDebugLog( 'replication', __METHOD__ . ": Slave $server known to be caught up.\n" );
+                       wfDebugLog( 'replication', __METHOD__ .
+                               ": slave $server known to be caught up (pos >= $knownReachedPos).\n" );
                        return true;
                }
 
@@ -469,7 +470,7 @@ class LoadBalancer {
                        } else {
                                $conn = $this->openConnection( $index, '' );
                                if ( !$conn ) {
-                                       wfDebugLog( 'replication', __METHOD__ . ": failed to open connection to $server\n" );
+                                       wfDebugLog( 'replication', __METHOD__ . ": failed to connect to $server\n" );
 
                                        return false;
                                }
index 59d6ef6..02e75d3 100644 (file)
@@ -58,7 +58,7 @@ class LoadMonitorMySQL implements LoadMonitor {
                # (a) Check the local APC cache
                $value = $this->srvCache->get( $key );
                if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
-                       wfDebugLog( 'replication', __FUNCTION__ . ": got lag times ($key) from local cache" );
+                       wfDebugLog( 'replication', __METHOD__ . ": got lag times ($key) from local cache" );
                        return $value['lagTimes']; // cache hit
                }
                $staleValue = $value ?: false;
@@ -67,7 +67,7 @@ class LoadMonitorMySQL implements LoadMonitor {
                $value = $this->mainCache->get( $key );
                if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
                        $this->srvCache->set( $key, $value, $staleTTL );
-                       wfDebugLog( 'replication', __FUNCTION__ . ": got lag times ($key) from main cache" );
+                       wfDebugLog( 'replication', __METHOD__ . ": got lag times ($key) from main cache" );
 
                        return $value['lagTimes']; // cache hit
                }
@@ -106,7 +106,7 @@ class LoadMonitorMySQL implements LoadMonitor {
                $value = array( 'lagTimes' => $lagTimes, 'timestamp' => microtime( true ) );
                $this->mainCache->set( $key, $value, $staleTTL );
                $this->srvCache->set( $key, $value, $staleTTL );
-               wfDebugLog( 'replication', __FUNCTION__ . ": re-calculated lag times ($key)" );
+               wfDebugLog( 'replication', __METHOD__ . ": re-calculated lag times ($key)" );
 
                return $value['lagTimes'];
        }
index cb2d7db..519d3a0 100644 (file)
@@ -146,4 +146,64 @@ class LBFactoryTest extends MediaWikiTestCase {
                $factory->shutdown();
                $lb->closeAll();
        }
+
+       public function testChronologyProtector() {
+               // (a) First HTTP request
+               $mPos = new MySQLMasterPos( 'db1034-bin.000976', '843431247' );
+
+               $mockDB = $this->getMockBuilder( 'DatabaseMysql' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $mockDB->expects( $this->any() )
+                       ->method( 'doneWrites' )->will( $this->returnValue( true ) );
+               $mockDB->expects( $this->any() )
+                       ->method( 'getMasterPos' )->will( $this->returnValue( $mPos ) );
+
+               $lb = $this->getMockBuilder( 'LoadBalancer' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $lb->expects( $this->any() )
+                       ->method( 'getConnection' )->will( $this->returnValue( $mockDB ) );
+               $lb->expects( $this->any() )
+                       ->method( 'getServerCount' )->will( $this->returnValue( 2 ) );
+               $lb->expects( $this->any() )
+                       ->method( 'parentInfo' )->will( $this->returnValue( array( 'id' => "main-DEFAULT" ) ) );
+               $lb->expects( $this->any() )
+                       ->method( 'getAnyOpenConnection' )->will( $this->returnValue( $mockDB ) );
+
+               $bag = new HashBagOStuff();
+               $cp = new ChronologyProtector(
+                       $bag,
+                       array(
+                               'ip' => '127.0.0.1',
+                               'agent' => "Totally-Not-FireFox"
+                       )
+               );
+
+               $mockDB->expects( $this->exactly( 2 ) )->method( 'doneWrites' );
+
+               // Nothing to wait for
+               $cp->initLB( $lb );
+               // Record in stash
+               $cp->shutdownLB( $lb );
+               $cp->shutdown();
+
+               // (b) Second HTTP request
+               $cp = new ChronologyProtector(
+                       $bag,
+                       array(
+                               'ip' => '127.0.0.1',
+                               'agent' => "Totally-Not-FireFox"
+                       )
+               );
+
+               $lb->expects( $this->once() )
+                       ->method( 'waitFor' )->with( $this->equalTo( $mPos ) );
+
+               // Wait
+               $cp->initLB( $lb );
+               // Record in stash
+               $cp->shutdownLB( $lb );
+               $cp->shutdown();
+       }
 }