*/
class LoadBalancer implements ILoadBalancer {
/** @var array[] Map of (server index => server config array) */
- private $mServers;
+ private $servers;
/** @var Database[][][] Map of (connection category => server index => IDatabase[]) */
- private $mConns;
+ private $conns;
/** @var float[] Map of (server index => weight) */
- private $mLoads;
+ private $loads;
/** @var array[] Map of (group => server index => weight) */
- private $mGroupLoads;
+ private $groupLoads;
/** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
- private $mAllowLagged;
+ private $allowLagged;
/** @var int Seconds to spend waiting on replica DB lag to resolve */
- private $mWaitTimeout;
+ private $waitTimeout;
/** @var array The LoadMonitor configuration */
private $loadMonitorConfig;
/** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
/** @var Database DB connection object that caused a problem */
private $errorConnection;
/** @var int The generic (not query grouped) replica DB index (of $mServers) */
- private $mReadIndex;
+ private $readIndex;
/** @var bool|DBMasterPos False if not set */
- private $mWaitForPos;
+ private $waitForPos;
/** @var bool Whether the generic reader fell back to a lagged replica DB */
private $laggedReplicaMode = false;
/** @var bool Whether the generic reader fell back to a lagged replica DB */
private $allReplicasDownMode = false;
/** @var string The last DB selection or connection error */
- private $mLastError = 'Unknown error';
+ private $lastError = 'Unknown error';
/** @var string|bool Reason the LB is read-only or false if not */
private $readOnlyReason = false;
/** @var int Total connections opened */
/** @var int Default 'maxLag' when unspecified */
const MAX_LAG_DEFAULT = 10;
+ /** @var int Default 'waitTimeout' when unspecified */
+ const MAX_WAIT_DEFAULT = 10;
/** @var int Seconds to cache master server read-only status */
const TTL_CACHE_READONLY = 5;
if ( !isset( $params['servers'] ) ) {
throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
}
- $this->mServers = $params['servers'];
- foreach ( $this->mServers as $i => $server ) {
+ $this->servers = $params['servers'];
+ foreach ( $this->servers as $i => $server ) {
if ( $i == 0 ) {
- $this->mServers[$i]['master'] = true;
+ $this->servers[$i]['master'] = true;
} else {
- $this->mServers[$i]['replica'] = true;
+ $this->servers[$i]['replica'] = true;
}
}
: DatabaseDomain::newUnspecified();
$this->setLocalDomain( $localDomain );
- $this->mWaitTimeout = isset( $params['waitTimeout'] ) ? $params['waitTimeout'] : 10;
+ $this->waitTimeout = isset( $params['waitTimeout'] )
+ ? $params['waitTimeout']
+ : self::MAX_WAIT_DEFAULT;
- $this->mReadIndex = -1;
- $this->mConns = [
+ $this->readIndex = -1;
+ $this->conns = [
// Connection were transaction rounds may be applied
self::KEY_LOCAL => [],
self::KEY_FOREIGN_INUSE => [],
self::KEY_FOREIGN_INUSE_NOROUND => [],
self::KEY_FOREIGN_FREE_NOROUND => []
];
- $this->mLoads = [];
- $this->mWaitForPos = false;
- $this->mAllowLagged = false;
+ $this->loads = [];
+ $this->waitForPos = false;
+ $this->allowLagged = false;
if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
$this->readOnlyReason = $params['readOnlyReason'];
$this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
foreach ( $params['servers'] as $i => $server ) {
- $this->mLoads[$i] = $server['load'];
+ $this->loads[$i] = $server['load'];
if ( isset( $server['groupLoads'] ) ) {
foreach ( $server['groupLoads'] as $group => $ratio ) {
- if ( !isset( $this->mGroupLoads[$group] ) ) {
- $this->mGroupLoads[$group] = [];
+ if ( !isset( $this->groupLoads[$group] ) ) {
+ $this->groupLoads[$group] = [];
}
- $this->mGroupLoads[$group][$i] = $ratio;
+ $this->groupLoads[$group][$i] = $ratio;
}
}
}
foreach ( $lags as $i => $lag ) {
if ( $i != 0 ) {
# How much lag this server nominally is allowed to have
- $maxServerLag = isset( $this->mServers[$i]['max lag'] )
- ? $this->mServers[$i]['max lag']
+ $maxServerLag = isset( $this->servers[$i]['max lag'] )
+ ? $this->servers[$i]['max lag']
: $this->maxLag; // default
# Constrain that futher by $maxLag argument
$maxServerLag = min( $maxServerLag, $maxLag );
$host = $this->getServerName( $i );
if ( $lag === false && !is_infinite( $maxServerLag ) ) {
$this->replLogger->error(
- "Server {host} is not replicating?", [ 'host' => $host ] );
+ __METHOD__ .
+ ": server {host} is not replicating?", [ 'host' => $host ] );
unset( $loads[$i] );
} elseif ( $lag > $maxServerLag ) {
$this->replLogger->info(
- "Server {host} has {lag} seconds of lag (>= {maxlag})",
+ __METHOD__ .
+ ": server {host} has {lag} seconds of lag (>= {maxlag})",
[ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
);
unset( $loads[$i] );
}
public function getReaderIndex( $group = false, $domain = false ) {
- if ( count( $this->mServers ) == 1 ) {
+ if ( count( $this->servers ) == 1 ) {
// Skip the load balancing if there's only one server
return $this->getWriterIndex();
- } elseif ( $group === false && $this->mReadIndex >= 0 ) {
+ } elseif ( $group === false && $this->readIndex >= 0 ) {
// Shortcut if the generic reader index was already cached
- return $this->mReadIndex;
+ return $this->readIndex;
}
if ( $group !== false ) {
// Use the server weight array for this load group
- if ( isset( $this->mGroupLoads[$group] ) ) {
- $loads = $this->mGroupLoads[$group];
+ if ( isset( $this->groupLoads[$group] ) ) {
+ $loads = $this->groupLoads[$group];
} else {
// No loads for this group, return false and the caller can use some other group
$this->connLogger->info( __METHOD__ . ": no loads for group $group" );
}
} else {
// Use the generic load group
- $loads = $this->mLoads;
+ $loads = $this->loads;
}
// Scale the configured load ratios according to each server's load and state
return false;
}
- if ( $this->mWaitForPos && $i != $this->getWriterIndex() ) {
+ if ( $this->waitForPos && $i != $this->getWriterIndex() ) {
// Before any data queries are run, wait for the server to catch up to the
// specified position. This is used to improve session consistency. Note that
// when LoadBalancer::waitFor() sets mWaitForPos, the waiting triggers here,
}
}
- if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
+ if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group === false ) {
// Cache the generic reader index for future ungrouped DB_REPLICA handles
- $this->mReadIndex = $i;
+ $this->readIndex = $i;
// Record if the generic reader index is in "lagged replica DB" mode
if ( $laggedReplicaMode ) {
$this->laggedReplicaMode = true;
// Quickly look through the available servers for a server that meets criteria...
$currentLoads = $loads;
while ( count( $currentLoads ) ) {
- if ( $this->mAllowLagged || $laggedReplicaMode ) {
+ if ( $this->allowLagged || $laggedReplicaMode ) {
$i = ArrayUtils::pickRandom( $currentLoads );
} else {
$i = false;
- if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
+ if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
// ChronologyProtecter sets mWaitForPos for session consistency.
// This triggers doWait() after connect, so it's especially good to
// avoid lagged servers so as to avoid excessive delay in that method.
- $ago = microtime( true ) - $this->mWaitForPos->asOfTime();
+ $ago = microtime( true ) - $this->waitForPos->asOfTime();
// Aim for <= 1 second of waiting (being too picky can backfire)
$i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
}
}
if ( $i === false && count( $currentLoads ) != 0 ) {
// All replica DBs lagged. Switch to read-only mode
- $this->replLogger->error( "All replica DBs lagged. Switch to read-only mode" );
+ $this->replLogger->error(
+ __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
$i = ArrayUtils::pickRandom( $currentLoads );
$laggedReplicaMode = true;
}
// If all servers were down, quit now
if ( !count( $currentLoads ) ) {
- $this->connLogger->error( "All servers down" );
+ $this->connLogger->error( __METHOD__ . ": all servers down" );
}
return [ $i, $laggedReplicaMode ];
}
public function waitFor( $pos ) {
- $oldPos = $this->mWaitForPos;
+ $oldPos = $this->waitForPos;
try {
- $this->mWaitForPos = $pos;
+ $this->waitForPos = $pos;
// If a generic reader connection was already established, then wait now
- $i = $this->mReadIndex;
+ $i = $this->readIndex;
if ( $i > 0 ) {
if ( !$this->doWait( $i ) ) {
$this->laggedReplicaMode = true;
}
public function waitForOne( $pos, $timeout = null ) {
- $oldPos = $this->mWaitForPos;
+ $oldPos = $this->waitForPos;
try {
- $this->mWaitForPos = $pos;
+ $this->waitForPos = $pos;
- $i = $this->mReadIndex;
+ $i = $this->readIndex;
if ( $i <= 0 ) {
// Pick a generic replica DB if there isn't one yet
- $readLoads = $this->mLoads;
+ $readLoads = $this->loads;
unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
$readLoads = array_filter( $readLoads ); // with non-zero load
$i = ArrayUtils::pickRandom( $readLoads );
}
} finally {
# Restore the old position, as this is not used for lag-protection but for throttling
- $this->mWaitForPos = $oldPos;
+ $this->waitForPos = $oldPos;
}
return $ok;
}
public function waitForAll( $pos, $timeout = null ) {
- $timeout = $timeout ?: $this->mWaitTimeout;
+ $timeout = $timeout ?: $this->waitTimeout;
- $oldPos = $this->mWaitForPos;
+ $oldPos = $this->waitForPos;
try {
- $this->mWaitForPos = $pos;
- $serverCount = count( $this->mServers );
+ $this->waitForPos = $pos;
+ $serverCount = count( $this->servers );
$ok = true;
for ( $i = 1; $i < $serverCount; $i++ ) {
- if ( $this->mLoads[$i] > 0 ) {
+ if ( $this->loads[$i] > 0 ) {
$start = microtime( true );
- $ok = $this->doWait( $i, true, max( 1, (int)$timeout ) ) && $ok;
+ $ok = $this->doWait( $i, true, $timeout ) && $ok;
$timeout -= ( microtime( true ) - $start );
if ( $timeout <= 0 ) {
break; // timeout reached
}
} finally {
# Restore the old position, as this is not used for lag-protection but for throttling
- $this->mWaitForPos = $oldPos;
+ $this->waitForPos = $oldPos;
}
return $ok;
return;
}
- if ( !$this->mWaitForPos || $pos->hasReached( $this->mWaitForPos ) ) {
- $this->mWaitForPos = $pos;
+ if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
+ $this->waitForPos = $pos;
}
}
* @return IDatabase|bool
*/
public function getAnyOpenConnection( $i ) {
- foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $this->conns as $connsByServer ) {
if ( !empty( $connsByServer[$i] ) ) {
/** @var IDatabase[] $serverConns */
$serverConns = $connsByServer[$i];
* Wait for a given replica DB to catch up to the master pos stored in $this
* @param int $index Server index
* @param bool $open Check the server even if a new connection has to be made
- * @param int $timeout Max seconds to wait; default is mWaitTimeout
+ * @param int $timeout Max seconds to wait; default is "waitTimeout" given to __construct()
* @return bool
*/
protected function doWait( $index, $open = false, $timeout = null ) {
- $close = false; // close the connection afterwards
+ $timeout = max( 1, $timeout ?: $this->waitTimeout );
// Check if we already know that the DB has reached this point
$server = $this->getServerName( $index );
$knownReachedPos = $this->srvCache->get( $key );
if (
$knownReachedPos instanceof DBMasterPos &&
- $knownReachedPos->hasReached( $this->mWaitForPos )
+ $knownReachedPos->hasReached( $this->waitForPos )
) {
- $this->replLogger->debug( __METHOD__ .
+ $this->replLogger->debug(
+ __METHOD__ .
': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
- [ 'dbserver' => $server ] );
+ [ 'dbserver' => $server ]
+ );
return true;
}
// Find a connection to wait on, creating one if needed and allowed
+ $close = false; // close the connection afterwards
$conn = $this->getAnyOpenConnection( $index );
if ( !$conn ) {
if ( !$open ) {
- $this->replLogger->debug( __METHOD__ . ': no connection open for {dbserver}',
- [ 'dbserver' => $server ] );
+ $this->replLogger->debug(
+ __METHOD__ . ': no connection open for {dbserver}',
+ [ 'dbserver' => $server ]
+ );
return false;
} else {
$conn = $this->openConnection( $index, self::DOMAIN_ANY );
if ( !$conn ) {
- $this->replLogger->warning( __METHOD__ . ': failed to connect to {dbserver}',
- [ 'dbserver' => $server ] );
+ $this->replLogger->warning(
+ __METHOD__ . ': failed to connect to {dbserver}',
+ [ 'dbserver' => $server ]
+ );
return false;
}
}
}
- $this->replLogger->info( __METHOD__ . ': Waiting for replica DB {dbserver} to catch up...',
- [ 'dbserver' => $server ] );
- $timeout = $timeout ?: $this->mWaitTimeout;
- $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
+ $this->replLogger->info(
+ __METHOD__ .
+ ': waiting for replica DB {dbserver} to catch up...',
+ [ 'dbserver' => $server ]
+ );
+
+ $result = $conn->masterPosWait( $this->waitForPos, $timeout );
- if ( $result == -1 || is_null( $result ) ) {
- // Timed out waiting for replica DB, use master instead
+ if ( $result === null ) {
+ $this->replLogger->warning(
+ __METHOD__ . ': Errored out waiting on {host} pos {pos}',
+ [
+ 'host' => $server,
+ 'pos' => $this->waitForPos,
+ 'trace' => ( new RuntimeException() )->getTraceAsString()
+ ]
+ );
+ $ok = false;
+ } elseif ( $result == -1 ) {
$this->replLogger->warning(
__METHOD__ . ': Timed out waiting on {host} pos {pos}',
- [ 'host' => $server, 'pos' => $this->mWaitForPos ]
+ [
+ 'host' => $server,
+ 'pos' => $this->waitForPos,
+ 'trace' => ( new RuntimeException() )->getTraceAsString()
+ ]
);
$ok = false;
} else {
- $this->replLogger->info( __METHOD__ . ": Done" );
+ $this->replLogger->debug( __METHOD__ . ": done waiting" );
$ok = true;
// Remember that the DB reached this point
- $this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
+ $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
}
if ( $close ) {
# Operation-based index
if ( $i == self::DB_REPLICA ) {
- $this->mLastError = 'Unknown error'; // reset error string
+ $this->lastError = 'Unknown error'; // reset error string
# Try the general server pool if $groups are unavailable.
$i = ( $groups === [ false ] )
? false // don't bother with this if that is what was tried above
: $this->getReaderIndex( false, $domain );
# Couldn't find a working server in getReaderIndex()?
if ( $i === false ) {
- $this->mLastError = 'No working replica DB server: ' . $this->mLastError;
+ $this->lastError = 'No working replica DB server: ' . $this->lastError;
// Throw an exception
$this->reportConnectionError();
return null; // not reached
return $conn;
}
- public function reuseConnection( $conn ) {
+ public function reuseConnection( IDatabase $conn ) {
$serverIndex = $conn->getLBInfo( 'serverIndex' );
$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
if ( $serverIndex === null || $refCount === null ) {
} elseif ( $conn instanceof DBConnRef ) {
// DBConnRef already handles calling reuseConnection() and only passes the live
// Database instance to this method. Any caller passing in a DBConnRef is broken.
- $this->connLogger->error( __METHOD__ . ": got DBConnRef instance.\n" .
+ $this->connLogger->error(
+ __METHOD__ . ": got DBConnRef instance.\n" .
( new RuntimeException() )->getTraceAsString() );
return;
}
$domain = $conn->getDomainID();
- if ( !isset( $this->mConns[$connInUseKey][$serverIndex][$domain] ) ) {
+ if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
throw new InvalidArgumentException( __METHOD__ .
": connection $serverIndex/$domain not found; it may have already been freed." );
- } elseif ( $this->mConns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
+ } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
throw new InvalidArgumentException( __METHOD__ .
": connection $serverIndex/$domain mismatched; it may have already been freed." );
}
$conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
if ( $refCount <= 0 ) {
- $this->mConns[$connFreeKey][$serverIndex][$domain] = $conn;
- unset( $this->mConns[$connInUseKey][$serverIndex][$domain] );
- if ( !$this->mConns[$connInUseKey][$serverIndex] ) {
- unset( $this->mConns[$connInUseKey][$serverIndex] ); // clean up
+ $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
+ unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
+ if ( !$this->conns[$connInUseKey][$serverIndex] ) {
+ unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
}
$this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
} else {
} else {
// Connection is to the local domain
$connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
- if ( isset( $this->mConns[$connKey][$i][0] ) ) {
- $conn = $this->mConns[$connKey][$i][0];
+ if ( isset( $this->conns[$connKey][$i][0] ) ) {
+ $conn = $this->conns[$connKey][$i][0];
} else {
- if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
+ if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
throw new InvalidArgumentException( "No server with index '$i'." );
}
// Open a new connection
- $server = $this->mServers[$i];
+ $server = $this->servers[$i];
$server['serverIndex'] = $i;
$server['autoCommitOnly'] = $autoCommit;
if ( $this->localDomain->getDatabase() !== null ) {
$conn = $this->reallyOpenConnection( $server, $this->localDomain );
$host = $this->getServerName( $i );
if ( $conn->isOpen() ) {
- $this->connLogger->debug( "Connected to database $i at '$host'." );
- $this->mConns[$connKey][$i][0] = $conn;
+ $this->connLogger->debug(
+ __METHOD__ . ": connected to database $i at '$host'." );
+ $this->conns[$connKey][$i][0] = $conn;
} else {
- $this->connLogger->warning( "Failed to connect to database $i at '$host'." );
+ $this->connLogger->warning(
+ __METHOD__ . ": failed to connect to database $i at '$host'." );
$this->errorConnection = $conn;
$conn = false;
}
$connInUseKey = self::KEY_FOREIGN_INUSE;
}
- if ( isset( $this->mConns[$connInUseKey][$i][$domain] ) ) {
+ if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
// Reuse an in-use connection for the same domain
- $conn = $this->mConns[$connInUseKey][$i][$domain];
+ $conn = $this->conns[$connInUseKey][$i][$domain];
$this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
- } elseif ( isset( $this->mConns[$connFreeKey][$i][$domain] ) ) {
+ } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
// Reuse a free connection for the same domain
- $conn = $this->mConns[$connFreeKey][$i][$domain];
- unset( $this->mConns[$connFreeKey][$i][$domain] );
- $this->mConns[$connInUseKey][$i][$domain] = $conn;
+ $conn = $this->conns[$connFreeKey][$i][$domain];
+ unset( $this->conns[$connFreeKey][$i][$domain] );
+ $this->conns[$connInUseKey][$i][$domain] = $conn;
$this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
- } elseif ( !empty( $this->mConns[$connFreeKey][$i] ) ) {
+ } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
// Reuse a free connection from another domain
- $conn = reset( $this->mConns[$connFreeKey][$i] );
- $oldDomain = key( $this->mConns[$connFreeKey][$i] );
+ $conn = reset( $this->conns[$connFreeKey][$i] );
+ $oldDomain = key( $this->conns[$connFreeKey][$i] );
if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
- $this->mLastError = "Error selecting database '$dbName' on server " .
+ $this->lastError = "Error selecting database '$dbName' on server " .
$conn->getServer() . " from client host {$this->host}";
$this->errorConnection = $conn;
$conn = false;
} else {
$conn->tablePrefix( $prefix );
- unset( $this->mConns[$connFreeKey][$i][$oldDomain] );
+ unset( $this->conns[$connFreeKey][$i][$oldDomain] );
// Note that if $domain is an empty string, getDomainID() might not match it
- $this->mConns[$connInUseKey][$i][$conn->getDomainId()] = $conn;
+ $this->conns[$connInUseKey][$i][$conn->getDomainId()] = $conn;
$this->connLogger->debug( __METHOD__ .
": reusing free connection from $oldDomain for $domain" );
}
} else {
- if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
+ if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
throw new InvalidArgumentException( "No server with index '$i'." );
}
// Open a new connection
- $server = $this->mServers[$i];
+ $server = $this->servers[$i];
$server['serverIndex'] = $i;
$server['foreignPoolRefCount'] = 0;
$server['foreign'] = true;
} else {
$conn->tablePrefix( $prefix ); // as specified
// Note that if $domain is an empty string, getDomainID() might not match it
- $this->mConns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
+ $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
$this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
}
}
throw new DBAccessError();
}
- if ( $domainOverride->getDatabase() !== null ) {
+ // Handle $domainOverride being a specified or an unspecified domain
+ if ( $domainOverride->getDatabase() === null ) {
+ // Normally, an RDBMS requires a DB name specified on connection and the $server
+ // configuration array is assumed to already specify an appropriate DB name.
+ if ( $server['type'] === 'mysql' ) {
+ // For MySQL, DATABASE and SCHEMA are synonyms, connections need not specify a DB,
+ // and the DB name in $server might not exist due to legacy reasons (the default
+ // domain used to ignore the local LB domain, even when mismatched).
+ $server['dbname'] = null;
+ }
+ } else {
$server['dbname'] = $domainOverride->getDatabase();
$server['schema'] = $domainOverride->getSchema();
}
$conn = $this->errorConnection; // the connection which caused the error
$context = [
'method' => __METHOD__,
- 'last_error' => $this->mLastError,
+ 'last_error' => $this->lastError,
];
if ( $conn instanceof IDatabase ) {
$context['db_server'] = $conn->getServer();
$this->connLogger->warning(
- "Connection error: {last_error} ({db_server})",
+ __METHOD__ . ": connection error: {last_error} ({db_server})",
$context
);
// throws DBConnectionError
- $conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
+ $conn->reportConnectionError( "{$this->lastError} ({$context['db_server']})" );
} else {
// No last connection, probably due to all servers being too busy
$this->connLogger->error(
- "LB failure with no last connection. Connection error: {last_error}",
+ __METHOD__ .
+ ": LB failure with no last connection. Connection error: {last_error}",
$context
);
// If all servers were busy, mLastError will contain something sensible
- throw new DBConnectionError( null, $this->mLastError );
+ throw new DBConnectionError( null, $this->lastError );
}
}
}
public function haveIndex( $i ) {
- return array_key_exists( $i, $this->mServers );
+ return array_key_exists( $i, $this->servers );
}
public function isNonZeroLoad( $i ) {
- return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
+ return array_key_exists( $i, $this->servers ) && $this->loads[$i] != 0;
}
public function getServerCount() {
- return count( $this->mServers );
+ return count( $this->servers );
}
public function getServerName( $i ) {
- if ( isset( $this->mServers[$i]['hostName'] ) ) {
- $name = $this->mServers[$i]['hostName'];
- } elseif ( isset( $this->mServers[$i]['host'] ) ) {
- $name = $this->mServers[$i]['host'];
+ if ( isset( $this->servers[$i]['hostName'] ) ) {
+ $name = $this->servers[$i]['hostName'];
+ } elseif ( isset( $this->servers[$i]['host'] ) ) {
+ $name = $this->servers[$i]['host'];
} else {
$name = '';
}
}
public function getServerType( $i ) {
- return isset( $this->mServers[$i]['type'] ) ? $this->mServers[$i]['type'] : 'unknown';
+ return isset( $this->servers[$i]['type'] ) ? $this->servers[$i]['type'] : 'unknown';
}
public function getMasterPos() {
# master (however unlikely that may be), then we can fetch the position from the replica DB.
$masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
if ( !$masterConn ) {
- $serverCount = count( $this->mServers );
+ $serverCount = count( $this->servers );
for ( $i = 1; $i < $serverCount; $i++ ) {
$conn = $this->getAnyOpenConnection( $i );
if ( $conn ) {
public function closeAll() {
$this->forEachOpenConnection( function ( IDatabase $conn ) {
$host = $conn->getServer();
- $this->connLogger->debug( "Closing connection to database '$host'." );
+ $this->connLogger->debug(
+ __METHOD__ . ": closing connection to database '$host'." );
$conn->close();
} );
- $this->mConns = [
+ $this->conns = [
self::KEY_LOCAL => [],
self::KEY_FOREIGN_INUSE => [],
self::KEY_FOREIGN_FREE => [],
public function closeConnection( IDatabase $conn ) {
$serverIndex = $conn->getLBInfo( 'serverIndex' ); // second index level of mConns
- foreach ( $this->mConns as $type => $connsByServer ) {
+ foreach ( $this->conns as $type => $connsByServer ) {
if ( !isset( $connsByServer[$serverIndex] ) ) {
continue;
}
foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
if ( $conn === $trackedConn ) {
$host = $this->getServerName( $i );
- $this->connLogger->debug( "Closing connection to database $i at '$host'." );
- unset( $this->mConns[$type][$serverIndex][$i] );
+ $this->connLogger->debug(
+ __METHOD__ . ": closing connection to database $i at '$host'." );
+ unset( $this->conns[$type][$serverIndex][$i] );
--$this->connsOpened;
break 2;
}
$this->trxRoundId = false;
$this->forEachOpenMasterConnection(
function ( IDatabase $conn ) use ( $fname, $restore ) {
- if ( $conn->writesOrCallbacksPending() ) {
+ if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
$conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
}
if ( $restore ) {
}
public function hasOrMadeRecentMasterChanges( $age = null ) {
- $age = ( $age === null ) ? $this->mWaitTimeout : $age;
+ $age = ( $age === null ) ? $this->waitTimeout : $age;
return ( $this->hasMasterChanges()
|| $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
public function allowLagged( $mode = null ) {
if ( $mode === null ) {
- return $this->mAllowLagged;
+ return $this->allowLagged;
}
- $this->mAllowLagged = $mode;
+ $this->allowLagged = $mode;
- return $this->mAllowLagged;
+ return $this->allowLagged;
}
public function pingAll() {
}
public function forEachOpenConnection( $callback, array $params = [] ) {
- foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $this->conns as $connsByServer ) {
foreach ( $connsByServer as $serverConns ) {
foreach ( $serverConns as $conn ) {
$mergedParams = array_merge( [ $conn ], $params );
public function forEachOpenMasterConnection( $callback, array $params = [] ) {
$masterIndex = $this->getWriterIndex();
- foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $this->conns as $connsByServer ) {
if ( isset( $connsByServer[$masterIndex] ) ) {
/** @var IDatabase $conn */
foreach ( $connsByServer[$masterIndex] as $conn ) {
}
public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
- foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $this->conns as $connsByServer ) {
foreach ( $connsByServer as $i => $serverConns ) {
if ( $i === $this->getWriterIndex() ) {
continue; // skip master
$lagTimes = $this->getLagTimes( $domain );
foreach ( $lagTimes as $i => $lag ) {
- if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
+ if ( $this->loads[$i] > 0 && $lag > $maxLag ) {
$maxLag = $lag;
- $host = $this->mServers[$i]['host'];
+ $host = $this->servers[$i]['host'];
$maxIndex = $i;
}
}
$knownLagTimes = []; // map of (server index => 0 seconds)
$indexesWithLag = [];
- foreach ( $this->mServers as $i => $server ) {
+ foreach ( $this->servers as $i => $server ) {
if ( empty( $server['is static'] ) ) {
$indexesWithLag[] = $i; // DB server might have replication lag
} else {
/**
* @param IDatabase $conn
* @param DBMasterPos|bool $pos
- * @param int $timeout
+ * @param int|null $timeout
* @return bool
*/
- public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
+ public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
+ $timeout = max( 1, $timeout ?: $this->waitTimeout );
+
if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
return true; // server is not a replica DB
}
if ( $pos instanceof DBMasterPos ) {
$result = $conn->masterPosWait( $pos, $timeout );
if ( $result == -1 || is_null( $result ) ) {
- $msg = __METHOD__ . ': Timed out waiting on {host} pos {pos}';
- $this->replLogger->warning( $msg,
- [ 'host' => $conn->getServer(), 'pos' => $pos ] );
+ $msg = __METHOD__ . ': timed out waiting on {host} pos {pos}';
+ $this->replLogger->warning( $msg, [
+ 'host' => $conn->getServer(),
+ 'pos' => $pos,
+ 'trace' => ( new RuntimeException() )->getTraceAsString()
+ ] );
$ok = false;
} else {
- $this->replLogger->info( __METHOD__ . ': Done' );
+ $this->replLogger->debug( __METHOD__ . ': done waiting' );
$ok = true;
}
} else {
$ok = false; // something is misconfigured
- $this->replLogger->error( 'Could not get master pos for {host}',
- [ 'host' => $conn->getServer() ] );
+ $this->replLogger->error(
+ __METHOD__ . ': could not get master pos for {host}',
+ [
+ 'host' => $conn->getServer(),
+ 'trace' => ( new RuntimeException() )->getTraceAsString()
+ ]
+ );
}
return $ok;