/** @var callable[] */
protected $replicationWaitCallbacks = [];
- const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
+ const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
+ const SHUTDOWN_CHRONPROT_ASYNC = 1; // save DB positions, but don't wait on remote DCs
+ const SHUTDOWN_CHRONPROT_SYNC = 2; // save DB positions, waiting on all DCs
/**
* Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
* @see LoadBalancer::disable()
*/
public function destroy() {
- $this->shutdown();
+ $this->shutdown( self::SHUTDOWN_NO_CHRONPROT );
$this->forEachLBCallMethod( 'disable' );
}
* @deprecated since 1.27, use LBFactory::destroy()
*/
public static function destroyInstance() {
- self::singleton()->destroy();
+ MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->destroy();
}
/**
/**
* Prepare all tracked load balancers for shutdown
- * @param integer $flags Supports SHUTDOWN_* flags
- */
- public function shutdown( $flags = 0 ) {
- if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
- $this->shutdownChronologyProtector( $this->chronProt );
+ * @param integer $mode One of the class SHUTDOWN_* constants
+ * @param callable|null $workCallback Work to mask ChronologyProtector writes
+ */
+ public function shutdown(
+ $mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
+ ) {
+ if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
+ $this->shutdownChronologyProtector( $this->chronProt, $workCallback, 'sync' );
+ } elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
+ $this->shutdownChronologyProtector( $this->chronProt, null, 'async' );
}
+
$this->commitMasterChanges( __METHOD__ ); // sanity
}
/**
* Determine if any master connection has pending/written changes from this request
+ * @param float $age How many seconds ago is "recent" [defaults to LB lag wait timeout]
* @return bool
* @since 1.27
*/
- public function hasOrMadeRecentMasterChanges() {
+ public function hasOrMadeRecentMasterChanges( $age = null ) {
$ret = false;
- $this->forEachLB( function ( LoadBalancer $lb ) use ( &$ret ) {
- $ret = $ret || $lb->hasOrMadeRecentMasterChanges();
+ $this->forEachLB( function ( LoadBalancer $lb ) use ( $age, &$ret ) {
+ $ret = $ret || $lb->hasOrMadeRecentMasterChanges( $age );
} );
return $ret;
}
'ifWritesSince' => null
];
- foreach ( $this->replicationWaitCallbacks as $callback ) {
- $callback();
- }
-
// Figure out which clusters need to be checked
/** @var LoadBalancer[] $lbs */
$lbs = [];
$masterPositions[$i] = $lb->getMasterPos();
}
+ // Run any listener callbacks *after* getting the DB positions. The more
+ // time spent in the callbacks, the less time is spent in waitForAll().
+ foreach ( $this->replicationWaitCallbacks as $callback ) {
+ $callback();
+ }
+
$failed = [];
foreach ( $lbs as $i => $lb ) {
if ( $masterPositions[$i] ) {
ObjectCache::getMainStashInstance(),
[
'ip' => $request->getIP(),
- 'agent' => $request->getHeader( 'User-Agent' )
- ]
+ 'agent' => $request->getHeader( 'User-Agent' ),
+ ],
+ $request->getFloat( 'cpPosTime', null )
);
if ( PHP_SAPI === 'cli' ) {
$chronProt->setEnabled( false );
}
/**
+ * Get and record all of the staged DB positions into persistent memory storage
+ *
* @param ChronologyProtector $cp
+ * @param callable|null $workCallback Work to do instead of waiting on syncing positions
+ * @param string $mode One of (sync, async); whether to wait on remote datacenters
*/
- protected function shutdownChronologyProtector( ChronologyProtector $cp ) {
- // Get all the master positions needed
+ protected function shutdownChronologyProtector(
+ ChronologyProtector $cp, $workCallback, $mode
+ ) {
+ // Record all the master positions needed
$this->forEachLB( function ( LoadBalancer $lb ) use ( $cp ) {
$cp->shutdownLB( $lb );
} );
- // Write them to the stash
- $unsavedPositions = $cp->shutdown();
+ // Write them to the persistent stash. Try to do something useful by running $work
+ // while ChronologyProtector waits for the stash write to replicate to all DCs.
+ $unsavedPositions = $cp->shutdown( $workCallback, $mode );
+ if ( $unsavedPositions && $workCallback ) {
+ // Invoke callback in case it did not cache the result yet
+ $workCallback(); // work now to block for less time in waitForAll()
+ }
// If the positions failed to write to the stash, at least wait on local datacenter
// replica DBs to catch up before responding. Even if there are several DCs, this increases
// the chance that the user will see their own changes immediately afterwards. As long
} );
}
+ /**
+ * Base parameters to LoadBalancer::__construct()
+ */
+ final protected function baseLoadBalancerParams() {
+ return [
+ 'readOnlyReason' => $this->readOnlyReason,
+ 'trxProfiler' => $this->trxProfiler,
+ 'srvCache' => $this->srvCache,
+ 'wanCache' => $this->wanCache,
+ 'localDomain' => wfWikiID(),
+ 'errorLogger' => [ MWExceptionHandler::class, 'logException' ]
+ ];
+ }
+
/**
* @param LoadBalancer $lb
*/
}
}
+ /**
+ * Append ?cpPosTime parameter to a URL for ChronologyProtector purposes if needed
+ *
+ * Note that unlike cookies, this works accross domains
+ *
+ * @param string $url
+ * @param float $time UNIX timestamp just before shutdown() was called
+ * @return string
+ * @since 1.28
+ */
+ public function appendPreShutdownTimeAsQuery( $url, $time ) {
+ $usedCluster = 0;
+ $this->forEachLB( function ( LoadBalancer $lb ) use ( &$usedCluster ) {
+ $usedCluster |= ( $lb->getServerCount() > 1 );
+ } );
+
+ if ( !$usedCluster ) {
+ return $url; // no master/replica clusters touched
+ }
+
+ return wfAppendQuery( $url, [ 'cpPosTime' => $time ] );
+ }
+
/**
* Close all open database connections on all open load balancers.
* @since 1.28
public function closeAll() {
$this->forEachLBCallMethod( 'closeAll', [] );
}
-
}
private $trxRoundId = false;
/** @var array[] Map of (name => callable) */
private $trxRecurringCallbacks = [];
+ /** @var string Local Wiki ID and default for selectDB() calls */
+ private $localDomain;
+ /** @var callable Exception logger */
+ private $errorLogger;
/** @var integer Warn when this many connection are held */
const CONN_HELD_WARN_THRESHOLD = 10;
* - waitTimeout : Maximum time to wait for replicas for consistency [optional]
* - srvCache : BagOStuff object [optional]
* - wanCache : WANObjectCache object [optional]
+ * - localDomain: The wiki ID of the "local"/"current" wiki [optional]
+ * - errorLogger: Callback that takes an Exception and logs it [optional]
* @throws MWException
*/
public function __construct( array $params ) {
$this->mWaitTimeout = isset( $params['waitTimeout'] )
? $params['waitTimeout']
: self::POS_WAIT_TIMEOUT;
+ $this->localDomain = isset( $params['localDomain'] ) ? $params['localDomain'] : '';
$this->mReadIndex = -1;
$this->mWriteIndex = -1;
} else {
$this->trxProfiler = new TransactionProfiler();
}
+
+ $this->errorLogger = isset( $params['errorLogger'] )
+ ? $params['errorLogger']
+ : function ( Exception $e ) {
+ trigger_error( E_WARNING, $e->getMessage() );
+ };
}
/**
* @return bool|int|string
*/
public function getReaderIndex( $group = false, $wiki = false ) {
- global $wgDBtype;
-
- # @todo FIXME: For now, only go through all this for mysql databases
- if ( $wgDBtype != 'mysql' ) {
- return $this->getWriterIndex();
- }
-
if ( count( $this->mServers ) == 1 ) {
# Skip the load balancing if there's only one server
- return 0;
+ return $this->getWriterIndex();
} elseif ( $group === false && $this->mReadIndex >= 0 ) {
# Shortcut if generic reader exists already
return $this->mReadIndex;
throw new MWException( "Empty server array given to LoadBalancer" );
}
- # Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
+ # Scale the configured load ratios according to the dynamic load if supported
$this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
$laggedReplicaMode = false;
' with invalid server index' );
}
- if ( $wiki === wfWikiID() ) {
+ if ( $wiki === $this->localDomain ) {
$wiki = false;
}
if ( $this->connsOpened > $oldConnsOpened ) {
$host = $conn->getServer();
$dbname = $conn->getDBname();
- $trxProf = Profiler::instance()->getTransactionProfiler();
- $trxProf->recordConnection( $host, $dbname, $masterOnly );
+ $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
}
if ( $masterOnly ) {
* @return DBConnRef
*/
public function getLazyConnectionRef( $db, $groups = [], $wiki = false ) {
+ $wiki = ( $wiki !== false ) ? $wiki : $this->localDomain;
+
return new DBConnRef( $this, [ $db, $groups, $wiki ] );
}
* @return DatabaseBase
*/
private function openForeignConnection( $i, $wiki ) {
- list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
+ list( $dbName, $prefix ) = explode( '-', $wiki, 2 ) + [ '', '' ];
+
if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
// Reuse an already-used connection
$conn = $this->mConns['foreignUsed'][$i][$wiki];
try {
$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
} catch ( DBError $e ) {
- MWExceptionHandler::logException( $e );
+ call_user_func( $this->errorLogger, $e );
$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
}
if ( $restore && $conn->getLBInfo( 'master' ) ) {
try {
$conn->flushSnapshot( $fname );
} catch ( DBError $e ) {
- MWExceptionHandler::logException( $e );
+ call_user_func( $this->errorLogger, $e );
$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
}
$conn->setTrxEndCallbackSuppression( false );
$conn->flushSnapshot( $fname );
}
} catch ( DBError $e ) {
- MWExceptionHandler::logException( $e );
+ call_user_func( $this->errorLogger, $e );
$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
}
if ( $restore ) {
*
* @param IDatabase $conn Replica DB
* @param DBMasterPos|bool $pos Master position; default: current position
- * @param integer $timeout Timeout in seconds
+ * @param integer|null $timeout Timeout in seconds [optional]
* @return bool Success
* @since 1.27
*/
- public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
+ public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
if ( $this->getServerCount() == 1 || !$conn->getLBInfo( 'replica' ) ) {
return true; // server is not a replica DB
}
return false; // something is misconfigured
}
+ $timeout = $timeout ?: $this->mWaitTimeout;
$result = $conn->masterPosWait( $pos, $timeout );
if ( $result == -1 || is_null( $result ) ) {
$msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
}
);
}
+
+ /**
+ * Set a new table prefix for the existing local wiki ID for testing
+ *
+ * @param string $prefix
+ * @since 1.28
+ */
+ public function setDomainPrefix( $prefix ) {
+ list( $dbName, ) = explode( '-', $this->localDomain, 2 );
+
+ $this->localDomain = "{$dbName}-{$prefix}";
+ }
}