*/
class LoadBalancer implements ILoadBalancer {
/** @var array[] Map of (server index => server config array) */
- private $mServers;
+ private $servers;
/** @var Database[][][] Map of (connection category => server index => IDatabase[]) */
- private $mConns;
+ private $conns;
/** @var float[] Map of (server index => weight) */
- private $mLoads;
+ private $loads;
/** @var array[] Map of (group => server index => weight) */
- private $mGroupLoads;
+ private $groupLoads;
/** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
- private $mAllowLagged;
+ private $allowLagged;
/** @var int Seconds to spend waiting on replica DB lag to resolve */
private $waitTimeout;
/** @var array The LoadMonitor configuration */
/** @var Database DB connection object that caused a problem */
private $errorConnection;
/** @var int The generic (not query grouped) replica DB index (of $mServers) */
- private $mReadIndex;
+ private $readIndex;
/** @var bool|DBMasterPos False if not set */
- private $mWaitForPos;
+ private $waitForPos;
/** @var bool Whether the generic reader fell back to a lagged replica DB */
private $laggedReplicaMode = false;
/** @var bool Whether the generic reader fell back to a lagged replica DB */
private $allReplicasDownMode = false;
/** @var string The last DB selection or connection error */
- private $mLastError = 'Unknown error';
+ private $lastError = 'Unknown error';
/** @var string|bool Reason the LB is read-only or false if not */
private $readOnlyReason = false;
/** @var int Total connections opened */
if ( !isset( $params['servers'] ) ) {
throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
}
- $this->mServers = $params['servers'];
- foreach ( $this->mServers as $i => $server ) {
+ $this->servers = $params['servers'];
+ foreach ( $this->servers as $i => $server ) {
if ( $i == 0 ) {
- $this->mServers[$i]['master'] = true;
+ $this->servers[$i]['master'] = true;
} else {
- $this->mServers[$i]['replica'] = true;
+ $this->servers[$i]['replica'] = true;
}
}
? $params['waitTimeout']
: self::MAX_WAIT_DEFAULT;
- $this->mReadIndex = -1;
- $this->mConns = [
+ $this->readIndex = -1;
+ $this->conns = [
// Connection were transaction rounds may be applied
self::KEY_LOCAL => [],
self::KEY_FOREIGN_INUSE => [],
self::KEY_FOREIGN_INUSE_NOROUND => [],
self::KEY_FOREIGN_FREE_NOROUND => []
];
- $this->mLoads = [];
- $this->mWaitForPos = false;
- $this->mAllowLagged = false;
+ $this->loads = [];
+ $this->waitForPos = false;
+ $this->allowLagged = false;
if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
$this->readOnlyReason = $params['readOnlyReason'];
$this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
foreach ( $params['servers'] as $i => $server ) {
- $this->mLoads[$i] = $server['load'];
+ $this->loads[$i] = $server['load'];
if ( isset( $server['groupLoads'] ) ) {
foreach ( $server['groupLoads'] as $group => $ratio ) {
- if ( !isset( $this->mGroupLoads[$group] ) ) {
- $this->mGroupLoads[$group] = [];
+ if ( !isset( $this->groupLoads[$group] ) ) {
+ $this->groupLoads[$group] = [];
}
- $this->mGroupLoads[$group][$i] = $ratio;
+ $this->groupLoads[$group][$i] = $ratio;
}
}
}
foreach ( $lags as $i => $lag ) {
if ( $i != 0 ) {
# How much lag this server nominally is allowed to have
- $maxServerLag = isset( $this->mServers[$i]['max lag'] )
- ? $this->mServers[$i]['max lag']
+ $maxServerLag = isset( $this->servers[$i]['max lag'] )
+ ? $this->servers[$i]['max lag']
: $this->maxLag; // default
# Constrain that futher by $maxLag argument
$maxServerLag = min( $maxServerLag, $maxLag );
}
public function getReaderIndex( $group = false, $domain = false ) {
- if ( count( $this->mServers ) == 1 ) {
+ if ( count( $this->servers ) == 1 ) {
// Skip the load balancing if there's only one server
return $this->getWriterIndex();
- } elseif ( $group === false && $this->mReadIndex >= 0 ) {
+ } elseif ( $group === false && $this->readIndex >= 0 ) {
// Shortcut if the generic reader index was already cached
- return $this->mReadIndex;
+ return $this->readIndex;
}
if ( $group !== false ) {
// Use the server weight array for this load group
- if ( isset( $this->mGroupLoads[$group] ) ) {
- $loads = $this->mGroupLoads[$group];
+ if ( isset( $this->groupLoads[$group] ) ) {
+ $loads = $this->groupLoads[$group];
} else {
// No loads for this group, return false and the caller can use some other group
$this->connLogger->info( __METHOD__ . ": no loads for group $group" );
}
} else {
// Use the generic load group
- $loads = $this->mLoads;
+ $loads = $this->loads;
}
// Scale the configured load ratios according to each server's load and state
return false;
}
- if ( $this->mWaitForPos && $i != $this->getWriterIndex() ) {
+ if ( $this->waitForPos && $i != $this->getWriterIndex() ) {
// Before any data queries are run, wait for the server to catch up to the
// specified position. This is used to improve session consistency. Note that
// when LoadBalancer::waitFor() sets mWaitForPos, the waiting triggers here,
}
}
- if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
+ if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group === false ) {
// Cache the generic reader index for future ungrouped DB_REPLICA handles
- $this->mReadIndex = $i;
+ $this->readIndex = $i;
// Record if the generic reader index is in "lagged replica DB" mode
if ( $laggedReplicaMode ) {
$this->laggedReplicaMode = true;
// Quickly look through the available servers for a server that meets criteria...
$currentLoads = $loads;
while ( count( $currentLoads ) ) {
- if ( $this->mAllowLagged || $laggedReplicaMode ) {
+ if ( $this->allowLagged || $laggedReplicaMode ) {
$i = ArrayUtils::pickRandom( $currentLoads );
} else {
$i = false;
- if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
+ if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
// ChronologyProtecter sets mWaitForPos for session consistency.
// This triggers doWait() after connect, so it's especially good to
// avoid lagged servers so as to avoid excessive delay in that method.
- $ago = microtime( true ) - $this->mWaitForPos->asOfTime();
+ $ago = microtime( true ) - $this->waitForPos->asOfTime();
// Aim for <= 1 second of waiting (being too picky can backfire)
$i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
}
}
public function waitFor( $pos ) {
- $oldPos = $this->mWaitForPos;
+ $oldPos = $this->waitForPos;
try {
- $this->mWaitForPos = $pos;
+ $this->waitForPos = $pos;
// If a generic reader connection was already established, then wait now
- $i = $this->mReadIndex;
+ $i = $this->readIndex;
if ( $i > 0 ) {
if ( !$this->doWait( $i ) ) {
$this->laggedReplicaMode = true;
}
public function waitForOne( $pos, $timeout = null ) {
- $oldPos = $this->mWaitForPos;
+ $oldPos = $this->waitForPos;
try {
- $this->mWaitForPos = $pos;
+ $this->waitForPos = $pos;
- $i = $this->mReadIndex;
+ $i = $this->readIndex;
if ( $i <= 0 ) {
// Pick a generic replica DB if there isn't one yet
- $readLoads = $this->mLoads;
+ $readLoads = $this->loads;
unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
$readLoads = array_filter( $readLoads ); // with non-zero load
$i = ArrayUtils::pickRandom( $readLoads );
}
} finally {
# Restore the old position, as this is not used for lag-protection but for throttling
- $this->mWaitForPos = $oldPos;
+ $this->waitForPos = $oldPos;
}
return $ok;
public function waitForAll( $pos, $timeout = null ) {
$timeout = $timeout ?: $this->waitTimeout;
- $oldPos = $this->mWaitForPos;
+ $oldPos = $this->waitForPos;
try {
- $this->mWaitForPos = $pos;
- $serverCount = count( $this->mServers );
+ $this->waitForPos = $pos;
+ $serverCount = count( $this->servers );
$ok = true;
for ( $i = 1; $i < $serverCount; $i++ ) {
- if ( $this->mLoads[$i] > 0 ) {
+ if ( $this->loads[$i] > 0 ) {
$start = microtime( true );
$ok = $this->doWait( $i, true, $timeout ) && $ok;
$timeout -= ( microtime( true ) - $start );
}
} finally {
# Restore the old position, as this is not used for lag-protection but for throttling
- $this->mWaitForPos = $oldPos;
+ $this->waitForPos = $oldPos;
}
return $ok;
return;
}
- if ( !$this->mWaitForPos || $pos->hasReached( $this->mWaitForPos ) ) {
- $this->mWaitForPos = $pos;
+ if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
+ $this->waitForPos = $pos;
}
}
* @return IDatabase|bool
*/
public function getAnyOpenConnection( $i ) {
- foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $this->conns as $connsByServer ) {
if ( !empty( $connsByServer[$i] ) ) {
/** @var IDatabase[] $serverConns */
$serverConns = $connsByServer[$i];
$knownReachedPos = $this->srvCache->get( $key );
if (
$knownReachedPos instanceof DBMasterPos &&
- $knownReachedPos->hasReached( $this->mWaitForPos )
+ $knownReachedPos->hasReached( $this->waitForPos )
) {
$this->replLogger->debug(
__METHOD__ .
[ 'dbserver' => $server ]
);
- $result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
+ $result = $conn->masterPosWait( $this->waitForPos, $timeout );
if ( $result === null ) {
$this->replLogger->warning(
__METHOD__ . ': Errored out waiting on {host} pos {pos}',
[
'host' => $server,
- 'pos' => $this->mWaitForPos,
+ 'pos' => $this->waitForPos,
'trace' => ( new RuntimeException() )->getTraceAsString()
]
);
__METHOD__ . ': Timed out waiting on {host} pos {pos}',
[
'host' => $server,
- 'pos' => $this->mWaitForPos,
+ 'pos' => $this->waitForPos,
'trace' => ( new RuntimeException() )->getTraceAsString()
]
);
$this->replLogger->info( __METHOD__ . ": Done" );
$ok = true;
// Remember that the DB reached this point
- $this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
+ $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
}
if ( $close ) {
# Operation-based index
if ( $i == self::DB_REPLICA ) {
- $this->mLastError = 'Unknown error'; // reset error string
+ $this->lastError = 'Unknown error'; // reset error string
# Try the general server pool if $groups are unavailable.
$i = ( $groups === [ false ] )
? false // don't bother with this if that is what was tried above
: $this->getReaderIndex( false, $domain );
# Couldn't find a working server in getReaderIndex()?
if ( $i === false ) {
- $this->mLastError = 'No working replica DB server: ' . $this->mLastError;
+ $this->lastError = 'No working replica DB server: ' . $this->lastError;
// Throw an exception
$this->reportConnectionError();
return null; // not reached
}
$domain = $conn->getDomainID();
- if ( !isset( $this->mConns[$connInUseKey][$serverIndex][$domain] ) ) {
+ if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
throw new InvalidArgumentException( __METHOD__ .
": connection $serverIndex/$domain not found; it may have already been freed." );
- } elseif ( $this->mConns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
+ } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
throw new InvalidArgumentException( __METHOD__ .
": connection $serverIndex/$domain mismatched; it may have already been freed." );
}
$conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
if ( $refCount <= 0 ) {
- $this->mConns[$connFreeKey][$serverIndex][$domain] = $conn;
- unset( $this->mConns[$connInUseKey][$serverIndex][$domain] );
- if ( !$this->mConns[$connInUseKey][$serverIndex] ) {
- unset( $this->mConns[$connInUseKey][$serverIndex] ); // clean up
+ $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
+ unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
+ if ( !$this->conns[$connInUseKey][$serverIndex] ) {
+ unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
}
$this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
} else {
} else {
// Connection is to the local domain
$connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
- if ( isset( $this->mConns[$connKey][$i][0] ) ) {
- $conn = $this->mConns[$connKey][$i][0];
+ if ( isset( $this->conns[$connKey][$i][0] ) ) {
+ $conn = $this->conns[$connKey][$i][0];
} else {
- if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
+ if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
throw new InvalidArgumentException( "No server with index '$i'." );
}
// Open a new connection
- $server = $this->mServers[$i];
+ $server = $this->servers[$i];
$server['serverIndex'] = $i;
$server['autoCommitOnly'] = $autoCommit;
if ( $this->localDomain->getDatabase() !== null ) {
$host = $this->getServerName( $i );
if ( $conn->isOpen() ) {
$this->connLogger->debug( "Connected to database $i at '$host'." );
- $this->mConns[$connKey][$i][0] = $conn;
+ $this->conns[$connKey][$i][0] = $conn;
} else {
$this->connLogger->warning( "Failed to connect to database $i at '$host'." );
$this->errorConnection = $conn;
$connInUseKey = self::KEY_FOREIGN_INUSE;
}
- if ( isset( $this->mConns[$connInUseKey][$i][$domain] ) ) {
+ if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
// Reuse an in-use connection for the same domain
- $conn = $this->mConns[$connInUseKey][$i][$domain];
+ $conn = $this->conns[$connInUseKey][$i][$domain];
$this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
- } elseif ( isset( $this->mConns[$connFreeKey][$i][$domain] ) ) {
+ } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
// Reuse a free connection for the same domain
- $conn = $this->mConns[$connFreeKey][$i][$domain];
- unset( $this->mConns[$connFreeKey][$i][$domain] );
- $this->mConns[$connInUseKey][$i][$domain] = $conn;
+ $conn = $this->conns[$connFreeKey][$i][$domain];
+ unset( $this->conns[$connFreeKey][$i][$domain] );
+ $this->conns[$connInUseKey][$i][$domain] = $conn;
$this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
- } elseif ( !empty( $this->mConns[$connFreeKey][$i] ) ) {
+ } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
// Reuse a free connection from another domain
- $conn = reset( $this->mConns[$connFreeKey][$i] );
- $oldDomain = key( $this->mConns[$connFreeKey][$i] );
+ $conn = reset( $this->conns[$connFreeKey][$i] );
+ $oldDomain = key( $this->conns[$connFreeKey][$i] );
if ( strlen( $dbName ) && !$conn->selectDB( $dbName ) ) {
- $this->mLastError = "Error selecting database '$dbName' on server " .
+ $this->lastError = "Error selecting database '$dbName' on server " .
$conn->getServer() . " from client host {$this->host}";
$this->errorConnection = $conn;
$conn = false;
} else {
$conn->tablePrefix( $prefix );
- unset( $this->mConns[$connFreeKey][$i][$oldDomain] );
+ unset( $this->conns[$connFreeKey][$i][$oldDomain] );
// Note that if $domain is an empty string, getDomainID() might not match it
- $this->mConns[$connInUseKey][$i][$conn->getDomainId()] = $conn;
+ $this->conns[$connInUseKey][$i][$conn->getDomainId()] = $conn;
$this->connLogger->debug( __METHOD__ .
": reusing free connection from $oldDomain for $domain" );
}
} else {
- if ( !isset( $this->mServers[$i] ) || !is_array( $this->mServers[$i] ) ) {
+ if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
throw new InvalidArgumentException( "No server with index '$i'." );
}
// Open a new connection
- $server = $this->mServers[$i];
+ $server = $this->servers[$i];
$server['serverIndex'] = $i;
$server['foreignPoolRefCount'] = 0;
$server['foreign'] = true;
} else {
$conn->tablePrefix( $prefix ); // as specified
// Note that if $domain is an empty string, getDomainID() might not match it
- $this->mConns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
+ $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
$this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
}
}
$conn = $this->errorConnection; // the connection which caused the error
$context = [
'method' => __METHOD__,
- 'last_error' => $this->mLastError,
+ 'last_error' => $this->lastError,
];
if ( $conn instanceof IDatabase ) {
);
// throws DBConnectionError
- $conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
+ $conn->reportConnectionError( "{$this->lastError} ({$context['db_server']})" );
} else {
// No last connection, probably due to all servers being too busy
$this->connLogger->error(
);
// If all servers were busy, mLastError will contain something sensible
- throw new DBConnectionError( null, $this->mLastError );
+ throw new DBConnectionError( null, $this->lastError );
}
}
}
public function haveIndex( $i ) {
- return array_key_exists( $i, $this->mServers );
+ return array_key_exists( $i, $this->servers );
}
public function isNonZeroLoad( $i ) {
- return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
+ return array_key_exists( $i, $this->servers ) && $this->loads[$i] != 0;
}
public function getServerCount() {
- return count( $this->mServers );
+ return count( $this->servers );
}
public function getServerName( $i ) {
- if ( isset( $this->mServers[$i]['hostName'] ) ) {
- $name = $this->mServers[$i]['hostName'];
- } elseif ( isset( $this->mServers[$i]['host'] ) ) {
- $name = $this->mServers[$i]['host'];
+ if ( isset( $this->servers[$i]['hostName'] ) ) {
+ $name = $this->servers[$i]['hostName'];
+ } elseif ( isset( $this->servers[$i]['host'] ) ) {
+ $name = $this->servers[$i]['host'];
} else {
$name = '';
}
}
public function getServerType( $i ) {
- return isset( $this->mServers[$i]['type'] ) ? $this->mServers[$i]['type'] : 'unknown';
+ return isset( $this->servers[$i]['type'] ) ? $this->servers[$i]['type'] : 'unknown';
}
public function getMasterPos() {
# master (however unlikely that may be), then we can fetch the position from the replica DB.
$masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
if ( !$masterConn ) {
- $serverCount = count( $this->mServers );
+ $serverCount = count( $this->servers );
for ( $i = 1; $i < $serverCount; $i++ ) {
$conn = $this->getAnyOpenConnection( $i );
if ( $conn ) {
$conn->close();
} );
- $this->mConns = [
+ $this->conns = [
self::KEY_LOCAL => [],
self::KEY_FOREIGN_INUSE => [],
self::KEY_FOREIGN_FREE => [],
public function closeConnection( IDatabase $conn ) {
$serverIndex = $conn->getLBInfo( 'serverIndex' ); // second index level of mConns
- foreach ( $this->mConns as $type => $connsByServer ) {
+ foreach ( $this->conns as $type => $connsByServer ) {
if ( !isset( $connsByServer[$serverIndex] ) ) {
continue;
}
if ( $conn === $trackedConn ) {
$host = $this->getServerName( $i );
$this->connLogger->debug( "Closing connection to database $i at '$host'." );
- unset( $this->mConns[$type][$serverIndex][$i] );
+ unset( $this->conns[$type][$serverIndex][$i] );
--$this->connsOpened;
break 2;
}
public function allowLagged( $mode = null ) {
if ( $mode === null ) {
- return $this->mAllowLagged;
+ return $this->allowLagged;
}
- $this->mAllowLagged = $mode;
+ $this->allowLagged = $mode;
- return $this->mAllowLagged;
+ return $this->allowLagged;
}
public function pingAll() {
}
public function forEachOpenConnection( $callback, array $params = [] ) {
- foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $this->conns as $connsByServer ) {
foreach ( $connsByServer as $serverConns ) {
foreach ( $serverConns as $conn ) {
$mergedParams = array_merge( [ $conn ], $params );
public function forEachOpenMasterConnection( $callback, array $params = [] ) {
$masterIndex = $this->getWriterIndex();
- foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $this->conns as $connsByServer ) {
if ( isset( $connsByServer[$masterIndex] ) ) {
/** @var IDatabase $conn */
foreach ( $connsByServer[$masterIndex] as $conn ) {
}
public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
- foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $this->conns as $connsByServer ) {
foreach ( $connsByServer as $i => $serverConns ) {
if ( $i === $this->getWriterIndex() ) {
continue; // skip master
$lagTimes = $this->getLagTimes( $domain );
foreach ( $lagTimes as $i => $lag ) {
- if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
+ if ( $this->loads[$i] > 0 && $lag > $maxLag ) {
$maxLag = $lag;
- $host = $this->mServers[$i]['host'];
+ $host = $this->servers[$i]['host'];
$maxIndex = $i;
}
}
$knownLagTimes = []; // map of (server index => 0 seconds)
$indexesWithLag = [];
- foreach ( $this->mServers as $i => $server ) {
+ foreach ( $this->servers as $i => $server ) {
if ( empty( $server['is static'] ) ) {
$indexesWithLag[] = $i; // DB server might have replication lag
} else {