* An interface for generating database load balancers
* @ingroup Database
*/
-abstract class LBFactory {
+abstract class LBFactory implements ILBFactory {
/** @var ChronologyProtector */
protected $chronProt;
+ /** @var object|string Class name or object With profileIn/profileOut methods */
+ protected $profiler;
/** @var TransactionProfiler */
protected $trxProfiler;
/** @var LoggerInterface */
/** @var WANObjectCache */
protected $wanCache;
- /** @var string Local domain */
- protected $domain;
+ /** @var DatabaseDomain Local domain */
+ protected $localDomain;
/** @var string Local hostname of the app server */
protected $hostname;
+ /** @var array Web request information about the client */
+ protected $requestInfo;
+
/** @var mixed */
protected $ticket;
/** @var string|bool String if a requested DBO_TRX transaction round is active */
/** @var callable[] */
protected $replicationWaitCallbacks = [];
- const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
- const SHUTDOWN_CHRONPROT_ASYNC = 1; // save DB positions, but don't wait on remote DCs
- const SHUTDOWN_CHRONPROT_SYNC = 2; // save DB positions, waiting on all DCs
+ /** @var bool Whether this PHP instance is for a CLI script */
+ protected $cliMode;
+ /** @var string Agent name for query profiling */
+ protected $agent;
private static $loggerFields =
[ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ];
- /**
- * @TODO: document base params here
- * @param array $conf
- */
public function __construct( array $conf ) {
- $this->domain = isset( $conf['domain'] ) ? $conf['domain'] : '';
+ $this->localDomain = isset( $conf['localDomain'] )
+ ? DatabaseDomain::newFromId( $conf['localDomain'] )
+ : DatabaseDomain::newUnspecified();
+
if ( isset( $conf['readOnlyReason'] ) && is_string( $conf['readOnlyReason'] ) ) {
$this->readOnlyReason = $conf['readOnlyReason'];
}
$this->errorLogger = isset( $conf['errorLogger'] )
? $conf['errorLogger']
: function ( Exception $e ) {
- trigger_error( E_WARNING, $e->getMessage() );
+ trigger_error( E_WARNING, get_class( $e ) . ': ' . $e->getMessage() );
};
- $this->hostname = isset( $conf['hostname'] )
- ? $conf['hostname']
- : gethostname();
- $this->chronProt = isset( $conf['chronProt'] )
- ? $conf['chronProt']
- : $this->newChronologyProtector();
+ $this->profiler = isset( $params['profiler'] ) ? $params['profiler'] : null;
$this->trxProfiler = isset( $conf['trxProfiler'] )
? $conf['trxProfiler']
: new TransactionProfiler();
+ $this->requestInfo = [
+ 'IPAddress' => isset( $_SERVER[ 'REMOTE_ADDR' ] ) ? $_SERVER[ 'REMOTE_ADDR' ] : '',
+ 'UserAgent' => isset( $_SERVER['HTTP_USER_AGENT'] ) ? $_SERVER['HTTP_USER_AGENT'] : '',
+ 'ChronologyProtection' => 'true'
+ ];
+
+ $this->cliMode = isset( $params['cliMode'] ) ? $params['cliMode'] : PHP_SAPI === 'cli';
+ $this->hostname = isset( $conf['hostname'] ) ? $conf['hostname'] : gethostname();
+ $this->agent = isset( $params['agent'] ) ? $params['agent'] : '';
+
$this->ticket = mt_rand();
}
- /**
- * Disables all load balancers. All connections are closed, and any attempt to
- * open a new connection will result in a DBAccessError.
- * @see LoadBalancer::disable()
- */
public function destroy() {
$this->shutdown( self::SHUTDOWN_NO_CHRONPROT );
$this->forEachLBCallMethod( 'disable' );
}
- /**
- * Create a new load balancer object. The resulting object will be untracked,
- * not chronology-protected, and the caller is responsible for cleaning it up.
- *
- * @param bool|string $domain Wiki ID, or false for the current wiki
- * @return ILoadBalancer
- */
- abstract public function newMainLB( $domain = false );
-
- /**
- * Get a cached (tracked) load balancer object.
- *
- * @param bool|string $domain Wiki ID, or false for the current wiki
- * @return ILoadBalancer
- */
- abstract public function getMainLB( $domain = false );
-
- /**
- * Create a new load balancer for external storage. The resulting object will be
- * untracked, not chronology-protected, and the caller is responsible for
- * cleaning it up.
- *
- * @param string $cluster External storage cluster, or false for core
- * @param bool|string $domain Wiki ID, or false for the current wiki
- * @return ILoadBalancer
- */
- abstract protected function newExternalLB( $cluster, $domain = false );
-
- /**
- * Get a cached (tracked) load balancer for external storage
- *
- * @param string $cluster External storage cluster, or false for core
- * @param bool|string $domain Wiki ID, or false for the current wiki
- * @return ILoadBalancer
- */
- abstract public function getExternalLB( $cluster, $domain = false );
-
- /**
- * Execute a function for each tracked load balancer
- * The callback is called with the load balancer as the first parameter,
- * and $params passed as the subsequent parameters.
- *
- * @param callable $callback
- * @param array $params
- */
- abstract public function forEachLB( $callback, array $params = [] );
-
- /**
- * Prepare all tracked load balancers for shutdown
- * @param integer $mode One of the class SHUTDOWN_* constants
- * @param callable|null $workCallback Work to mask ChronologyProtector writes
- */
public function shutdown(
$mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
) {
+ $chronProt = $this->getChronologyProtector();
if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
- $this->shutdownChronologyProtector( $this->chronProt, $workCallback, 'sync' );
+ $this->shutdownChronologyProtector( $chronProt, $workCallback, 'sync' );
} elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
- $this->shutdownChronologyProtector( $this->chronProt, null, 'async' );
+ $this->shutdownChronologyProtector( $chronProt, null, 'async' );
}
$this->commitMasterChanges( __METHOD__ ); // sanity
);
}
- /**
- * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
- *
- * @param string $fname Caller name
- * @since 1.28
- */
public function flushReplicaSnapshots( $fname = __METHOD__ ) {
$this->forEachLBCallMethod( 'flushReplicaSnapshots', [ $fname ] );
}
- /**
- * Commit on all connections. Done for two reasons:
- * 1. To commit changes to the masters.
- * 2. To release the snapshot on all connections, master and replica DB.
- * @param string $fname Caller name
- * @param array $options Options map:
- * - maxWriteDuration: abort if more than this much time was spent in write queries
- */
public function commitAll( $fname = __METHOD__, array $options = [] ) {
$this->commitMasterChanges( $fname, $options );
$this->forEachLBCallMethod( 'commitAll', [ $fname ] );
}
- /**
- * Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
- *
- * The DBO_TRX setting will be reverted to the default in each of these methods:
- * - commitMasterChanges()
- * - rollbackMasterChanges()
- * - commitAll()
- *
- * This allows for custom transaction rounds from any outer transaction scope.
- *
- * @param string $fname
- * @throws DBTransactionError
- * @since 1.28
- */
public function beginMasterChanges( $fname = __METHOD__ ) {
if ( $this->trxRoundId !== false ) {
throw new DBTransactionError(
$this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
}
- /**
- * Commit changes on all master connections
- * @param string $fname Caller name
- * @param array $options Options map:
- * - maxWriteDuration: abort if more than this much time was spent in write queries
- * @throws Exception
- */
public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
throw new DBTransactionError(
}
}
- /**
- * Rollback changes on all master connections
- * @param string $fname Caller name
- * @since 1.23
- */
public function rollbackMasterChanges( $fname = __METHOD__ ) {
$this->trxRoundId = false;
$this->forEachLBCallMethod( 'suppressTransactionEndCallbacks' );
}
}
- /**
- * Determine if any master connection has pending changes
- * @return bool
- * @since 1.23
- */
public function hasMasterChanges() {
$ret = false;
$this->forEachLB( function ( ILoadBalancer $lb ) use ( &$ret ) {
return $ret;
}
- /**
- * Detemine if any lagged replica DB connection was used
- * @return bool
- * @since 1.28
- */
public function laggedReplicaUsed() {
$ret = false;
$this->forEachLB( function ( ILoadBalancer $lb ) use ( &$ret ) {
return $ret;
}
- /**
- * Determine if any master connection has pending/written changes from this request
- * @param float $age How many seconds ago is "recent" [defaults to LB lag wait timeout]
- * @return bool
- * @since 1.27
- */
public function hasOrMadeRecentMasterChanges( $age = null ) {
$ret = false;
$this->forEachLB( function ( ILoadBalancer $lb ) use ( $age, &$ret ) {
return $ret;
}
- /**
- * Waits for the replica DBs to catch up to the current master position
- *
- * Use this when updating very large numbers of rows, as in maintenance scripts,
- * to avoid causing too much lag. Of course, this is a no-op if there are no replica DBs.
- *
- * By default this waits on all DB clusters actually used in this request.
- * This makes sense when lag being waiting on is caused by the code that does this check.
- * In that case, setting "ifWritesSince" can avoid the overhead of waiting for clusters
- * that were not changed since the last wait check. To forcefully wait on a specific cluster
- * for a given wiki, use the 'wiki' parameter. To forcefully wait on an "external" cluster,
- * use the "cluster" parameter.
- *
- * Never call this function after a large DB write that is *still* in a transaction.
- * It only makes sense to call this after the possible lag inducing changes were committed.
- *
- * @param array $opts Optional fields that include:
- * - wiki : wait on the load balancer DBs that handles the given wiki
- * - cluster : wait on the given external load balancer DBs
- * - timeout : Max wait time. Default: ~60 seconds
- * - ifWritesSince: Only wait if writes were done since this UNIX timestamp
- * @throws DBReplicationWaitError If a timeout or error occured waiting on a DB cluster
- * @since 1.27
- */
public function waitForReplication( array $opts = [] ) {
$opts += [
- 'wiki' => false,
+ 'domain' => false,
'cluster' => false,
'timeout' => 60,
'ifWritesSince' => null
];
+ if ( $opts['domain'] === false && isset( $opts['wiki'] ) ) {
+ $opts['domain'] = $opts['wiki']; // b/c
+ }
+
// Figure out which clusters need to be checked
/** @var ILoadBalancer[] $lbs */
$lbs = [];
if ( $opts['cluster'] !== false ) {
$lbs[] = $this->getExternalLB( $opts['cluster'] );
- } elseif ( $opts['wiki'] !== false ) {
- $lbs[] = $this->getMainLB( $opts['wiki'] );
+ } elseif ( $opts['domain'] !== false ) {
+ $lbs[] = $this->getMainLB( $opts['domain'] );
} else {
$this->forEachLB( function ( ILoadBalancer $lb ) use ( &$lbs ) {
$lbs[] = $lb;
$failed = [];
foreach ( $lbs as $i => $lb ) {
if ( $masterPositions[$i] ) {
- // The DBMS may not support getMasterPos() or the whole
- // load balancer might be fake (e.g. $wgAllDBsAreLocalhost).
+ // The DBMS may not support getMasterPos()
if ( !$lb->waitForAll( $masterPositions[$i], $opts['timeout'] ) ) {
$failed[] = $lb->getServerName( $lb->getWriterIndex() );
}
}
}
- /**
- * Add a callback to be run in every call to waitForReplication() before waiting
- *
- * Callbacks must clear any transactions that they start
- *
- * @param string $name Callback name
- * @param callable|null $callback Use null to unset a callback
- * @since 1.28
- */
public function setWaitForReplicationListener( $name, callable $callback = null ) {
if ( $callback ) {
$this->replicationWaitCallbacks[$name] = $callback;
}
}
- /**
- * Get a token asserting that no transaction writes are active
- *
- * @param string $fname Caller name (e.g. __METHOD__)
- * @return mixed A value to pass to commitAndWaitForReplication()
- * @since 1.28
- */
public function getEmptyTransactionTicket( $fname ) {
if ( $this->hasMasterChanges() ) {
- $this->queryLogger->error( __METHOD__ . ": $fname does not have outer scope." );
+ $this->queryLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" .
+ ( new RuntimeException() )->getTraceAsString() );
+
return null;
}
return $this->ticket;
}
- /**
- * Convenience method for safely running commitMasterChanges()/waitForReplication()
- *
- * This will commit and wait unless $ticket indicates it is unsafe to do so
- *
- * @param string $fname Caller name (e.g. __METHOD__)
- * @param mixed $ticket Result of getEmptyTransactionTicket()
- * @param array $opts Options to waitForReplication()
- * @throws DBReplicationWaitError
- * @since 1.28
- */
public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
if ( $ticket !== $this->ticket ) {
- $this->perfLogger->error( __METHOD__ . ": $fname does not have outer scope." );
+ $this->perfLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" .
+ ( new RuntimeException() )->getTraceAsString() );
+
return;
}
}
}
- /**
- * @param string $dbName DB master name (e.g. "db1052")
- * @return float|bool UNIX timestamp when client last touched the DB or false if not recent
- * @since 1.28
- */
public function getChronologyProtectorTouched( $dbName ) {
- return $this->chronProt->getTouched( $dbName );
+ return $this->getChronologyProtector()->getTouched( $dbName );
}
- /**
- * Disable the ChronologyProtector for all load balancers
- *
- * This can be called at the start of special API entry points
- *
- * @since 1.27
- */
public function disableChronologyProtection() {
- $this->chronProt->setEnabled( false );
+ $this->getChronologyProtector()->setEnabled( false );
}
/**
* @return ChronologyProtector
*/
- protected function newChronologyProtector() {
- $chronProt = new ChronologyProtector(
+ protected function getChronologyProtector() {
+ if ( $this->chronProt ) {
+ return $this->chronProt;
+ }
+
+ $this->chronProt = new ChronologyProtector(
$this->memCache,
[
- 'ip' => isset( $_SERVER[ 'REMOTE_ADDR' ] ) ? $_SERVER[ 'REMOTE_ADDR' ] : '',
- 'agent' => isset( $_SERVER['HTTP_USER_AGENT'] ) ? $_SERVER['HTTP_USER_AGENT'] : ''
+ 'ip' => $this->requestInfo['IPAddress'],
+ 'agent' => $this->requestInfo['UserAgent'],
],
isset( $_GET['cpPosTime'] ) ? $_GET['cpPosTime'] : null
);
- $chronProt->setLogger( $this->replLogger );
- if ( PHP_SAPI === 'cli' ) {
- $chronProt->setEnabled( false );
+ $this->chronProt->setLogger( $this->replLogger );
+
+ if ( $this->cliMode ) {
+ $this->chronProt->setEnabled( false );
+ } elseif ( $this->requestInfo['ChronologyProtection'] === 'false' ) {
+ // Request opted out of using position wait logic. This is useful for requests
+ // done by the job queue or background ETL that do not have a meaningful session.
+ $this->chronProt->setWaitEnabled( false );
}
- return $chronProt;
+ $this->replLogger->debug( __METHOD__ . ': using request info ' .
+ json_encode( $this->requestInfo, JSON_PRETTY_PRINT ) );
+
+ return $this->chronProt;
}
/**
*/
final protected function baseLoadBalancerParams() {
return [
- 'localDomain' => $this->domain,
+ 'localDomain' => $this->localDomain,
'readOnlyReason' => $this->readOnlyReason,
'srvCache' => $this->srvCache,
'wanCache' => $this->wanCache,
+ 'profiler' => $this->profiler,
'trxProfiler' => $this->trxProfiler,
'queryLogger' => $this->queryLogger,
'connLogger' => $this->connLogger,
'replLogger' => $this->replLogger,
'errorLogger' => $this->errorLogger,
- 'hostname' => $this->hostname
+ 'hostname' => $this->hostname,
+ 'cliMode' => $this->cliMode,
+ 'agent' => $this->agent
];
}
}
}
- /**
- * Define a new local domain (for testing)
- *
- * Caller should make sure no local connection are open to the old local domain
- *
- * @param string $domain
- * @since 1.28
- */
- public function setDomainPrefix( $domain ) {
- $this->domain = $domain;
+ public function setDomainPrefix( $prefix ) {
+ $this->localDomain = new DatabaseDomain(
+ $this->localDomain->getDatabase(),
+ null,
+ $prefix
+ );
+
+ $this->forEachLB( function( ILoadBalancer $lb ) use ( $prefix ) {
+ $lb->setDomainPrefix( $prefix );
+ } );
}
- /**
- * Close all open database connections on all open load balancers.
- * @since 1.28
- */
public function closeAll() {
$this->forEachLBCallMethod( 'closeAll', [] );
}
+
+ public function setAgentName( $agent ) {
+ $this->agent = $agent;
+ }
+
+ public function appendPreShutdownTimeAsQuery( $url, $time ) {
+ $usedCluster = 0;
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$usedCluster ) {
+ $usedCluster |= ( $lb->getServerCount() > 1 );
+ } );
+
+ if ( !$usedCluster ) {
+ return $url; // no master/replica clusters touched
+ }
+
+ return strpos( $url, '?' ) === false ? "$url?cpPosTime=$time" : "$url&cpPosTime=$time";
+ }
+
+ public function setRequestInfo( array $info ) {
+ $this->requestInfo = $info + $this->requestInfo;
+ }
+
+ function __destruct() {
+ $this->destroy();
+ }
}