$this->baseLoadBalancerParams(),
[
'servers' => $this->makeServerArray( $template, $loads, $groupLoads ),
- 'loadMonitor' => $this->loadMonitorClass,
+ 'loadMonitor' => [ 'class' => $this->loadMonitorClass ],
'readOnlyReason' => $readOnlyReason
]
) );
: [];
$this->loadMonitorClass = isset( $conf['loadMonitorClass'] )
? $conf['loadMonitorClass']
- : null;
+ : 'LoadMonitor';
}
/**
$this->baseLoadBalancerParams(),
[
'servers' => $servers,
- 'loadMonitor' => $this->loadMonitorClass,
+ 'loadMonitor' => [ 'class' => $this->loadMonitorClass ],
]
) );
$this->initLoadBalancer( $lb );
private $mServers;
/** @var array[] Map of (local/foreignUsed/foreignFree => server index => IDatabase array) */
private $mConns;
- /** @var array Map of (server index => weight) */
+ /** @var float[] Map of (server index => weight) */
private $mLoads;
/** @var array[] Map of (group => server index => weight) */
private $mGroupLoads;
private $mAllowLagged;
/** @var integer Seconds to spend waiting on replica DB lag to resolve */
private $mWaitTimeout;
- /** @var string The LoadMonitor subclass name */
- private $mLoadMonitorClass;
+ /** @var array The LoadMonitor configuration */
+ private $loadMonitorConfig;
/** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
private $tableAliases = [];
/** @var ILoadMonitor */
- private $mLoadMonitor;
+ private $loadMonitor;
/** @var BagOStuff */
private $srvCache;
/** @var BagOStuff */
}
if ( isset( $params['loadMonitor'] ) ) {
- $this->mLoadMonitorClass = $params['loadMonitor'];
+ $this->loadMonitorConfig = $params['loadMonitor'];
} else {
- $master = reset( $params['servers'] );
- if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) {
- $this->mLoadMonitorClass = 'LoadMonitorMySQL';
- } else {
- $this->mLoadMonitorClass = 'LoadMonitorNull';
- }
+ $this->loadMonitorConfig = [ 'class' => 'LoadMonitorNull' ];
}
foreach ( $params['servers'] as $i => $server ) {
* @return ILoadMonitor
*/
private function getLoadMonitor() {
- if ( !isset( $this->mLoadMonitor ) ) {
- $class = $this->mLoadMonitorClass;
- $this->mLoadMonitor = new $class( $this, $this->srvCache, $this->memCache );
- $this->mLoadMonitor->setLogger( $this->replLogger );
+ if ( !isset( $this->loadMonitor ) ) {
+ $class = $this->loadMonitorConfig['class'];
+ $this->loadMonitor = new $class(
+ $this, $this->srvCache, $this->memCache, $this->loadMonitorConfig );
+ $this->loadMonitor->setLogger( $this->replLogger );
}
- return $this->mLoadMonitor;
+ return $this->loadMonitor;
}
/**
* @param ILoadBalancer $lb LoadBalancer this instance serves
* @param BagOStuff $sCache Local server memory cache
* @param BagOStuff $cCache Local cluster memory cache
+ * @param array $options Options map
*/
- public function __construct( ILoadBalancer $lb, BagOStuff $sCache, BagOStuff $cCache );
+ public function __construct(
+ ILoadBalancer $lb, BagOStuff $sCache, BagOStuff $cCache, array $options = []
+ );
/**
* Perform pre-connection load ratio adjustment.
- * @param int[] &$loads
+ * @param int[] &$weightByServer Map of (server index => integer weight)
* @param string|bool $group The selected query group. Default: false
* @param string|bool $domain Default: false
*/
- public function scaleLoads( &$loads, $group = false, $domain = false );
+ public function scaleLoads( array &$weightByServer, $group = false, $domain = false );
/**
* Get an estimate of replication lag (in seconds) for each server
*
* @return array Map of (server index => float|int|bool)
*/
- public function getLagTimes( $serverIndexes, $domain );
+ public function getLagTimes( array $serverIndexes, $domain );
/**
* Clear any process and persistent cache of lag times
/** @var LoggerInterface */
protected $replLogger;
- public function __construct( ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache ) {
+ /** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */
+ private $movingAveRatio;
+
+ public function __construct(
+ ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache, array $options = []
+ ) {
$this->parent = $lb;
$this->srvCache = $srvCache;
$this->mainCache = $cache;
$this->replLogger = new \Psr\Log\NullLogger();
+
+ $this->movingAveRatio = isset( $options['movingAveRatio'] )
+ ? $options['movingAveRatio']
+ : 0.1;
}
public function setLogger( LoggerInterface $logger ) {
$this->replLogger = $logger;
}
- public function scaleLoads( &$loads, $group = false, $domain = false ) {
+ public function scaleLoads( array &$weightByServer, $group = false, $domain = false ) {
+ $states = $this->getServerStates( $weightByServer, $domain );
+ $coefficientsByServer = $states['weightScales'];
+ foreach ( $weightByServer as $i => $weight ) {
+ $weightByServer[$i] = $weight * $coefficientsByServer[$i];
+ }
+ }
+
+ public function getLagTimes( array $serverIndexes, $domain ) {
+ $states = $this->getServerStates( $serverIndexes, $domain );
+
+ return $states['lagTimes'];
}
- public function getLagTimes( $serverIndexes, $domain ) {
+ protected function getServerStates( array $serverIndexes, $domain ) {
if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == 0 ) {
# Single server only, just return zero without caching
- return [ 0 => 0 ];
+ return [
+ 'lagTimes' => [ $this->parent->getWriterIndex() => 0 ],
+ 'weightScales' => [ $this->parent->getWriterIndex() => 1 ]
+ ];
}
- $key = $this->getLagTimeCacheKey();
+ $key = $this->getCacheKey();
# Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
$ttl = mt_rand( 4e6, 5e6 ) / 1e6;
# Keep keys around longer as fallbacks
$value = $this->srvCache->get( $key );
if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
$this->replLogger->debug( __METHOD__ . ": got lag times ($key) from local cache" );
- return $value['lagTimes']; // cache hit
+ return $value; // cache hit
}
$staleValue = $value ?: false;
$this->srvCache->set( $key, $value, $staleTTL );
$this->replLogger->debug( __METHOD__ . ": got lag times ($key) from main cache" );
- return $value['lagTimes']; // cache hit
+ return $value; // cache hit
}
$staleValue = $value ?: $staleValue;
} );
} elseif ( $staleValue ) {
# Could not acquire lock but an old cache exists, so use it
- return $staleValue['lagTimes'];
+ return $staleValue;
}
$lagTimes = [];
+ $weightScales = [];
+ $movAveRatio = $this->movingAveRatio;
foreach ( $serverIndexes as $i ) {
if ( $i == $this->parent->getWriterIndex() ) {
$lagTimes[$i] = 0; // master always has no lag
+ $weightScales[$i] = 1.0; // nominal weight
continue;
}
$close = true; // new connection
}
+ $lastWeight = isset( $staleValue['weightScales'][$i] )
+ ? $staleValue['weightScales'][$i]
+ : 1.0;
+ $coefficient = $this->getWeightScale( $i, $conn ?: null );
+ $newWeight = $movAveRatio * $coefficient + ( 1 - $movAveRatio ) * $lastWeight;
+
+ // Scale from 10% to 100% of nominal weight
+ $weightScales[$i] = max( $newWeight, .10 );
+
if ( !$conn ) {
$lagTimes[$i] = false;
$host = $this->parent->getServerName( $i );
- $this->replLogger->error( __METHOD__ . ": host $host (#$i) is unreachable" );
+ $this->replLogger->error( __METHOD__ . ": host $host is unreachable" );
continue;
}
$lagTimes[$i] = $conn->getLag();
if ( $lagTimes[$i] === false ) {
$host = $this->parent->getServerName( $i );
- $this->replLogger->error( __METHOD__ . ": host $host (#$i) is not replicating?" );
+ $this->replLogger->error( __METHOD__ . ": host $host is not replicating?" );
}
if ( $close ) {
}
# Add a timestamp key so we know when it was cached
- $value = [ 'lagTimes' => $lagTimes, 'timestamp' => microtime( true ) ];
+ $value = [
+ 'lagTimes' => $lagTimes,
+ 'weightScales' => $weightScales,
+ 'timestamp' => microtime( true )
+ ];
$this->mainCache->set( $key, $value, $staleTTL );
$this->srvCache->set( $key, $value, $staleTTL );
$this->replLogger->info( __METHOD__ . ": re-calculated lag times ($key)" );
- return $value['lagTimes'];
+ return $value;
+ }
+
+ /**
+ * @param integer $index Server index
+ * @param IDatabase|null $conn Connection handle or null on connection failure
+ * @return float
+ */
+ protected function getWeightScale( $index, IDatabase $conn = null ) {
+ return $conn ? 1.0 : 0.0;
}
public function clearCaches() {
- $key = $this->getLagTimeCacheKey();
+ $key = $this->getCacheKey();
$this->srvCache->delete( $key );
$this->mainCache->delete( $key );
}
- private function getLagTimeCacheKey() {
- $writerIndex = $this->parent->getWriterIndex();
+ private function getCacheKey() {
// Lag is per-server, not per-DB, so key on the master DB name
return $this->srvCache->makeGlobalKey(
'lag-times',
- $this->parent->getServerName( $writerIndex )
+ $this->parent->getServerName( $this->parent->getWriterIndex() )
);
}
}
* @ingroup Database
*/
class LoadMonitorMySQL extends LoadMonitor {
- public function scaleLoads( &$loads, $group = false, $domain = false ) {
- // @TODO: maybe use Threads_running/Threads_created ratio to guess load
- // and Queries/Uptime to guess if a server is warming up the buffer pool
+ /** @var float What buffer pool use ratio counts as "warm" (e.g. 0.5 for 50% usage) */
+ private $warmCacheRatio;
+
+ public function __construct(
+ ILoadBalancer $lb, BagOStuff $srvCache, BagOStuff $cache, array $options = []
+ ) {
+ parent::__construct( $lb, $srvCache, $cache, $options );
+
+ $this->warmCacheRatio = isset( $options['warmCacheRatio'] )
+ ? $options['warmCacheRatio']
+ : 0.0;
+ }
+
+ protected function getWeightScale( $index, IDatabase $conn = null ) {
+ if ( !$conn ) {
+ return 0.0;
+ }
+
+ $weight = 1.0;
+ if ( $this->warmCacheRatio > 0 ) {
+ $res = $conn->query( 'SHOW STATUS', false );
+ $s = $res ? $conn->fetchObject( $res ) : false;
+ if ( $s === false ) {
+ $host = $this->parent->getServerName( $index );
+ $this->replLogger->error( __METHOD__ . ": could not get status for $host" );
+ } else {
+ // http://dev.mysql.com/doc/refman/5.7/en/server-status-variables.html
+ if ( $s->Innodb_buffer_pool_pages_total > 0 ) {
+ $ratio = $s->Innodb_buffer_pool_pages_data / $s->Innodb_buffer_pool_pages_total;
+ } elseif ( $s->Qcache_total_blocks > 0 ) {
+ $ratio = 1.0 - $s->Qcache_free_blocks / $s->Qcache_total_blocks;
+ } else {
+ $ratio = 1.0;
+ }
+ // Stop caring once $ratio >= $this->warmCacheRatio
+ $weight *= min( $ratio / $this->warmCacheRatio, 1.0 );
+ }
+ }
+
+ return $weight;
}
}
use Psr\Log\LoggerInterface;
class LoadMonitorNull implements ILoadMonitor {
- public function __construct( ILoadBalancer $lb, BagOStuff $sCache, BagOStuff $cCache ) {
+ public function __construct(
+ ILoadBalancer $lb, BagOStuff $sCache, BagOStuff $cCache, array $options = []
+ ) {
}
public function setLogger( LoggerInterface $logger ) {
}
- public function scaleLoads( &$loads, $group = false, $domain = false ) {
+ public function scaleLoads( array &$loads, $group = false, $domain = false ) {
}
- public function getLagTimes( $serverIndexes, $domain ) {
+ public function getLagTimes( array $serverIndexes, $domain ) {
return array_fill_keys( $serverIndexes, 0 );
}