);
$class = MWLBFactory::getLBFactoryClass( $lbConf );
- return new $class( $lbConf );
+ $instance = new $class( $lbConf );
+ MWLBFactory::setCacheUsageCallbacks( $instance, $services );
+
+ return $instance;
},
'DBLoadBalancer' => function( MediaWikiServices $services ) {
return $lbConf;
}
+ /**
+ * @param LBFactory $lbf New LBFactory instance that will be bound to $services
+ * @param MediaWikiServices $services
+ */
+ public static function setCacheUsageCallbacks( LBFactory $lbf, MediaWikiServices $services ) {
+ // Account for lag and pending updates by default in cache generator callbacks
+ $wCache = $services->getMainWANObjectCache();
+ $wCache->setDefaultCacheSetOptionCallbacks(
+ function () use ( $lbf ) {
+ return $lbf->declareUsageSectionStart();
+ },
+ function ( $id ) use ( $lbf ) {
+ $info = $lbf->declareUsageSectionEnd( $id );
+
+ return $info['cacheSetOptions'] ?: [];
+ }
+ );
+ }
+
/**
* Returns the LBFactory class to use and the load balancer configuration.
*
/** @var mixed[] Temporary warm-up cache */
private $warmupCache = [];
+ /** @var callable Callback used in generating default options in getWithSetCallback() */
+ private $sowSetOptsCallback;
+ /** @var callable Callback used in generating default options in getWithSetCallback() */
+ private $reapSetOptsCallback;
+
/** Max time expected to pass between delete() and DB commit finishing */
const MAX_COMMIT_DELAY = 3;
/** Max replication+snapshot lag before applying TTL_LAGGED or disallowing set() */
? $params['relayers']['purge']
: new EventRelayerNull( [] );
$this->setLogger( isset( $params['logger'] ) ? $params['logger'] : new NullLogger() );
+ $this->sowSetOptsCallback = function () {
+ return null; // no-op
+ };
+ $this->reapSetOptsCallback = function () {
+ return []; // no-op
+ };
}
public function setLogger( LoggerInterface $logger ) {
$setOpts = [];
++$this->callbackDepth;
try {
+ $tag = call_user_func( $this->sowSetOptsCallback );
$value = call_user_func_array( $callback, [ $cValue, &$ttl, &$setOpts, $asOf ] );
+ $setOptDefaults = call_user_func( $this->reapSetOptsCallback, $tag );
} finally {
--$this->callbackDepth;
}
$setOpts['lockTSE'] = $lockTSE;
// Use best known "since" timestamp if not provided
$setOpts += [ 'since' => $preCallbackTime ];
+ // Use default "lag" and "pending" values if not set
+ $setOpts += $setOptDefaults;
// Update the cache; this will fail if the key is tombstoned
$this->set( $key, $value, $ttl, $setOpts );
}
return (int)min( $maxTTL, max( $minTTL, $factor * $age ) );
}
+ /**
+ * Set the callbacks that provide the fallback values for cache set options
+ *
+ * The $reap callback returns default values to use for the "lag", "since", and "pending"
+ * options used by WANObjectCache::set(). It takes the ID from $sow as the sole parameter.
+ * An empty array should be returned if there is no usage to base the return value on.
+ *
+ * @param callable $sow Function that starts recording and returns an ID
+ * @param callable $reap Function that takes an ID, stops recording, and returns the options
+ * @since 1.28
+ */
+ public function setDefaultCacheSetOptionCallbacks( callable $sow, callable $reap ) {
+ $this->sowSetOptsCallback = $sow;
+ $this->reapSetOptsCallback = $reap;
+ }
+
/**
* Do the actual async bus purge of a key
*
return $this->__call( __FUNCTION__, func_get_args() );
}
+ public function declareUsageSectionStart( $id ) {
+ return $this->__call( __FUNCTION__, func_get_args() );
+ }
+
+ public function declareUsageSectionEnd( $id ) {
+ return $this->__call( __FUNCTION__, func_get_args() );
+ }
+
/**
* Clean up the connection when out of scope
*/
protected $cliMode;
/** @var string Agent name for query profiling */
protected $agent;
+ /** @var array[] Map of (section ID => info map) for usage section IDs */
+ protected $usageSectionInfo = [];
/** @var BagOStuff APC cache */
protected $srvCache;
}
private function doProfiledQuery( $sql, $commentedSql, $isWrite, $fname ) {
+ // Update usage information for all active usage tracking sections
+ foreach ( $this->usageSectionInfo as $id => &$info ) {
+ if ( $isWrite ) {
+ ++$info['writeQueries'];
+ } else {
+ ++$info['readQueries'];
+ }
+ if ( $info['cacheSetOptions'] === null ) {
+ $info['cacheSetOptions'] = self::getCacheSetOptions( $this );
+ }
+ }
+ unset( $info ); // destroy any reference
+
$isMaster = !is_null( $this->getLBInfo( 'master' ) );
- # generalizeSQL() will probably cut down the query to reasonable
- # logging size most of the time. The substr is really just a sanity check.
+ // generalizeSQL() will probably cut down the query to reasonable
+ // logging size most of the time. The substr is really just a sanity check.
if ( $isMaster ) {
$queryProf = 'query-m: ' . substr( self::generalizeSQL( $sql ), 0, 255 );
} else {
$queryProf = 'query: ' . substr( self::generalizeSQL( $sql ), 0, 255 );
}
- # Include query transaction state
+ // Include query transaction state
$queryProf .= $this->mTrxShortId ? " [TRX#{$this->mTrxShortId}]" : "";
$startTime = microtime( true );
* @since 1.27
*/
public static function getCacheSetOptions( IDatabase $db1 ) {
- $res = [ 'lag' => 0, 'since' => INF, 'pending' => false ];
+ $opts = [ 'lag' => 0, 'since' => INF, 'pending' => false ];
foreach ( func_get_args() as $db ) {
/** @var IDatabase $db */
- $status = $db->getSessionLagStatus();
- if ( $status['lag'] === false ) {
- $res['lag'] = false;
- } elseif ( $res['lag'] !== false ) {
- $res['lag'] = max( $res['lag'], $status['lag'] );
- }
- $res['since'] = min( $res['since'], $status['since'] );
- $res['pending'] = $res['pending'] ?: $db->writesPending();
+ $dbOpts = $db->getSessionLagStatus();
+ $dbOpts['pending'] = $db->writesPending();
+ $opts = self::mergeCacheSetOptions( $opts, $dbOpts );
}
- return $res;
+ return $opts;
+ }
+
+ /**
+ * @param array $base Map in the format of getCacheSetOptions() results
+ * @param array $other Map in the format of getCacheSetOptions() results
+ * @return array Pessimistically merged result of $base/$other in the format of $base
+ * @since 1.28
+ */
+ public static function mergeCacheSetOptions( array $base, array $other ) {
+ if ( $other['lag'] === false ) {
+ $base['lag'] = false;
+ } elseif ( $base['lag'] !== false ) {
+ $base['lag'] = max( $base['lag'], $other['lag'] );
+ }
+ $base['since'] = min( $base['since'], $other['since'] );
+ $base['pending'] = $base['pending'] ?: $other['pending'];
+
+ return $base;
}
public function getLag() {
$this->tableAliases = $aliases;
}
+ public function declareUsageSectionStart( $id ) {
+ $this->usageSectionInfo[$id] = [
+ 'readQueries' => 0,
+ 'writeQueries' => 0,
+ 'cacheSetOptions' => null
+ ];
+ }
+
+ public function declareUsageSectionEnd( $id ) {
+ if ( !isset( $this->usageSectionInfo[$id] ) ) {
+ throw new InvalidArgumentException( "No section with ID '$id'" );
+ }
+
+ $info = $this->usageSectionInfo[$id];
+ unset( $this->usageSectionInfo[$id] );
+
+ return $info;
+ }
+
/**
* @return bool Whether a DB user is required to access the DB
* @since 1.28
* @since 1.28
*/
public function setTableAliases( array $aliases );
+
+ /**
+ * Mark the beginning of a new section to track database usage information for
+ *
+ * @param string|integer Section ID
+ */
+ public function declareUsageSectionStart( $id );
+
+ /**
+ * End a section started by declareUsageSectionStart() and return the information map
+ *
+ * The map includes information about activity during the section:
+ * - readQueries: number of read queries issued.
+ * - writeQueries: number of write queries issued.
+ * - cacheSetOptions: result of getCacheSetOptions() before the first query.
+ * This is null if no actual queries took place in the section interval.
+ * @param integer|string $id Section ID passed to declareUsageSectionStart() earlier
+ * @return array
+ */
+ public function declareUsageSectionEnd( $id );
}
* - ChronologyProtection : cookie/header value specifying ChronologyProtector usage
*/
public function setRequestInfo( array $info );
+
+ /**
+ * Mark the beginning of a new section to track database usage information for
+ *
+ * This returns an ID which can be passed to declareUsageSectionEnd() to indicate
+ * the end of the section. If $id is provided, the returned ID equals $id.
+ * @param string|integer Section ID to use instead of auto-generated ID [optional]
+ * @return string|integer
+ */
+ public function declareUsageSectionStart( $id = null );
+
+ /**
+ * End a section started by declareUsageSectionStart() and return the information map
+ *
+ * The map includes information about activity during the section:
+ * - readQueries: number of read queries issued.
+ * - writeQueries: number of write queries issued.
+ * - cacheSetOptions: result of pessimistically merging the result of getCacheSetOptions()
+ * on each DB handle before the first query of the respective handle. This is null if
+ * no actual queries took place in the section interval.
+ *
+ * This can be called before cache value generation functions commence queries
+ * and then passed the caching storage layer to detect and avoid lag race conditions.
+ *
+ * @param integer|string $id Section ID passed to declareUsageSectionStart() earlier
+ * @return array
+ */
+ public function declareUsageSectionEnd( $id );
}
/** @var array Web request information about the client */
protected $requestInfo;
+ /** @var bool[] Map of (section ID => true) for usage section IDs */
+ protected $usageSections = [];
+
/** @var mixed */
protected $ticket;
/** @var string|bool String if a requested DBO_TRX transaction round is active */
}
/**
+ * Method called whenever a new LoadBalancer is created
+ *
* @param ILoadBalancer $lb
*/
protected function initLoadBalancer( ILoadBalancer $lb ) {
if ( $this->trxRoundId !== false ) {
$lb->beginMasterChanges( $this->trxRoundId ); // set DBO_TRX
}
+ foreach ( $this->usageSections as $id => $unused ) {
+ $lb->declareUsageSectionStart( $id );
+ }
}
public function setDomainPrefix( $prefix ) {
$this->requestInfo = $info + $this->requestInfo;
}
+ public function declareUsageSectionStart( $id = null ) {
+ static $nextId = 1;
+ if ( $id === null ) {
+ $id = $nextId;
+ ++$nextId;
+ }
+ // Handle existing load balancers
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( $id ) {
+ $lb->declareUsageSectionStart( $id );
+ } );
+ // Remember to set this for new load balancers
+ $this->usageSections[$id] = true;
+
+ return $id;
+ }
+
+ public function declareUsageSectionEnd( $id ) {
+ $info = [ 'readQueries' => 0, 'writeQueries' => 0, 'cacheSetOptions' => null ];
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( $id, &$info ) {
+ $lbInfo = $lb->declareUsageSectionEnd( $id );
+ $info['readQueries'] += $lbInfo['readQueries'];
+ $info['writeQueries'] += $lbInfo['writeQueries'];
+ $dbCacheOpts = $lbInfo['cacheSetOptions'];
+ if ( $dbCacheOpts ) {
+ $info['cacheSetOptions'] = $info['cacheSetOptions']
+ ? Database::mergeCacheSetOptions( $info['cacheSetOptions'], $dbCacheOpts )
+ : $dbCacheOpts;
+ }
+ } );
+ unset( $this->usageSections[$id] );
+
+ return $info;
+ }
+
/**
* Make PHP ignore user aborts/disconnects until the returned
* value leaves scope. This returns null and does nothing in CLI mode.
* @param array[] $aliases Map of (table => (dbname, schema, prefix) map)
*/
public function setTableAliases( array $aliases );
+
+ /**
+ * Mark the beginning of a new section to track database usage information for
+ *
+ * This returns an ID which can be passed to declareUsageSectionEnd() to indicate
+ * the end of the section. If $id is provided, the returned ID equals $id.
+ * @param string|integer Section ID to use instead of auto-generated ID [optional]
+ * @return string|integer
+ */
+ public function declareUsageSectionStart( $id = null );
+
+ /**
+ * End a section started by declareUsageSectionStart() and return the information map
+ *
+ * The map includes information about activity during the section:
+ * - readQueries: number of read queries issued.
+ * - writeQueries: number of write queries issued.
+ * - cacheSetOptions: result of pessimistically merging the result of getCacheSetOptions()
+ * on each DB handle before the first query of the respective handle. This is null if
+ * no actual queries took place in the section interval.
+ *
+ * This can be called before cache value generation functions commence queries
+ * and then passed the caching storage layer to detect and avoid lag race conditions.
+ *
+ * @param integer|string $id Section ID passed to declareUsageSectionStart() earlier
+ * @return array
+ */
+ public function declareUsageSectionEnd( $id );
}
/** @var string Current server name */
private $host;
/** @var bool Whether this PHP instance is for a CLI script */
- protected $cliMode;
+ private $cliMode;
/** @var string Agent name for query profiling */
- protected $agent;
+ private $agent;
+ /** @var bool[] Map of (section ID => true) for usage section IDs */
+ private $usageSections = [];
/** @var callable Exception logger */
private $errorLogger;
}
}
+ foreach ( $this->usageSections as $id => $unused ) {
+ $db->declareUsageSectionStart( $id );
+ }
+
return $db;
}
} );
}
+ public function declareUsageSectionStart( $id = null ) {
+ static $nextId = 1;
+ if ( $id === null ) {
+ $id = $nextId;
+ ++$nextId;
+ }
+ // Handle existing connections
+ $this->forEachOpenConnection( function ( IDatabase $db ) use ( $id ) {
+ $db->declareUsageSectionStart( $id );
+ } );
+ // Remember to set this for new connections
+ $this->usageSections[$id] = true;
+
+ return $id;
+ }
+
+ public function declareUsageSectionEnd( $id ) {
+ $info = [ 'readQueries' => 0, 'writeQueries' => 0, 'cacheSetOptions' => null ];
+ $this->forEachOpenConnection( function ( IDatabase $db ) use ( $id, &$info ) {
+ $dbInfo = $db->declareUsageSectionEnd( $id );
+ $info['readQueries'] += $dbInfo['readQueries'];
+ $info['writeQueries'] += $dbInfo['writeQueries'];
+ $dbCacheOpts = $dbInfo['cacheSetOptions'];
+ if ( $dbCacheOpts ) {
+ $info['cacheSetOptions'] = $info['cacheSetOptions']
+ ? Database::mergeCacheSetOptions( $info['cacheSetOptions'], $dbCacheOpts )
+ : $dbCacheOpts;
+ }
+ } );
+ unset( $this->usageSections[$id] );
+
+ return $info;
+ }
+
/**
* Make PHP ignore user aborts/disconnects until the returned
* value leaves scope. This returns null and does nothing in CLI mode.
protected function reallyOpenConnection( array $server, $dbNameOverride = false ) {
return $this->db;
}
+
+ public function forEachOpenConnection( $callback, array $params = [] ) {
+ $mergedParams = array_merge( [ $this->db ], $params );
+ call_user_func_array( $callback, $mergedParams );
+ }
+
+ public function forEachOpenMasterConnection( $callback, array $params = [] ) {
+ return $this->forEachOpenConnection( $callback, $params );
+ }
+
+ public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
+ return $this->forEachOpenConnection( $callback, $params );
+ }
}
$cp->shutdown();
}
- private function newLBFactoryMulti( array $baseOverride = [], array $serverOverride = [] ) {
- global $wgDBserver, $wgDBuser, $wgDBpassword, $wgDBname, $wgDBtype, $wgSQLiteDataDir;
-
- return new LBFactoryMulti( $baseOverride + [
- 'sectionsByDB' => [],
- 'sectionLoads' => [
- 'DEFAULT' => [
- 'test-db1' => 1,
- ],
- ],
- 'serverTemplate' => $serverOverride + [
- 'dbname' => $wgDBname,
- 'user' => $wgDBuser,
- 'password' => $wgDBpassword,
- 'type' => $wgDBtype,
- 'dbDirectory' => $wgSQLiteDataDir,
- 'flags' => DBO_DEFAULT
- ],
- 'hostsByName' => [
- 'test-db1' => $wgDBserver,
- ],
- 'loadMonitorClass' => 'LoadMonitorNull',
- 'localDomain' => wfWikiID()
- ] );
- }
-
public function testNiceDomains() {
global $wgDBname, $wgDBtype;
$factory->destroy();
}
+ /**
+ * @covers LBFactory::declareUsageSectionStart()
+ * @covers LBFactory::declareUsageSectionEnd()
+ * @covers LoadBalancer::declareUsageSectionStart()
+ * @covers LoadBalancer::declareUsageSectionEnd()
+ */
+ public function testUsageInfo() {
+ $wallTime = microtime( true );
+
+ $mockDB = $this->getMockBuilder( 'DatabaseMysql' )
+ ->disableOriginalConstructor()
+ ->setMethods( [
+ 'doQuery',
+ 'affectedRows',
+ 'getLag',
+ 'assertOpen',
+ 'getSessionLagStatus',
+ 'getApproximateLagStatus'
+ ] )
+ ->getMock();
+ $mockDB->method( 'doQuery' )->willReturn( new FakeResultWrapper( [] ) );
+ $mockDB->method( 'affectedRows' )->willReturn( 0 );
+ $mockDB->method( 'getLag' )->willReturn( 3 );
+ $mockDB->method( 'getSessionLagStatus' )->willReturn( [
+ 'lag' => 3, 'since' => $wallTime
+ ] );
+ $mockDB->method( 'getApproximateLagStatus' )->willReturn( [
+ 'lag' => 3, 'since' => $wallTime
+ ] );
+ $mockDBProbe = TestingAccessWrapper::newFromObject( $mockDB );
+ $mockDBProbe->profiler = new ProfilerStub( [] );
+ $mockDBProbe->trxProfiler = new TransactionProfiler();
+ $mockDBProbe->connLogger = new \Psr\Log\NullLogger();
+ $mockDBProbe->queryLogger = new \Psr\Log\NullLogger();
+ $lbFactory = new LBFactorySingle( [
+ 'connection' => $mockDB
+ ] );
+ $mockDB->setLBInfo( 'replica', true );
+
+ $id = $lbFactory->declareUsageSectionStart( 'test' );
+ $mockDB->query( "SELECT 1" );
+ $mockDB->query( "SELECT 1" );
+ $mockDB->query( "SELECT 1" );
+ $info = $lbFactory->declareUsageSectionEnd( $id );
+
+ $this->assertEquals( 3, $info['readQueries'] );
+ $this->assertEquals( 0, $info['writeQueries'] );
+ $this->assertEquals( false, $info['cacheSetOptions']['pending'] );
+ $this->assertEquals( 3, $info['cacheSetOptions']['lag'] );
+ $this->assertGreaterThanOrEqual( $wallTime - 10, $info['cacheSetOptions']['since'] );
+ $this->assertLessThan( $wallTime + 10, $info['cacheSetOptions']['since'] );
+
+ $mockDB->begin();
+ $mockDB->query( "UPDATE x SET y=1" );
+ $id = $lbFactory->declareUsageSectionStart( 'k' );
+ $mockDB->query( "UPDATE x SET y=2" );
+ $mockDB->commit();
+ $info = $lbFactory->declareUsageSectionEnd( $id );
+
+ $this->assertEquals( 2, $info['readQueries'] ); // +1 for ping()
+ $this->assertEquals( 1, $info['writeQueries'] );
+ $this->assertEquals( true, $info['cacheSetOptions']['pending'] );
+ $this->assertEquals( 3, $info['cacheSetOptions']['lag'] );
+ $this->assertGreaterThanOrEqual( $wallTime - 10, $info['cacheSetOptions']['since'] );
+ $this->assertLessThan( $wallTime + 10, $info['cacheSetOptions']['since'] );
+ }
+
+ private function newLBFactoryMulti( array $baseOverride = [], array $serverOverride = [] ) {
+ global $wgDBserver, $wgDBuser, $wgDBpassword, $wgDBname, $wgDBtype, $wgSQLiteDataDir;
+
+ return new LBFactoryMulti( $baseOverride + [
+ 'sectionsByDB' => [],
+ 'sectionLoads' => [
+ 'DEFAULT' => [
+ 'test-db1' => 1,
+ ],
+ ],
+ 'serverTemplate' => $serverOverride + [
+ 'dbname' => $wgDBname,
+ 'user' => $wgDBuser,
+ 'password' => $wgDBpassword,
+ 'type' => $wgDBtype,
+ 'dbDirectory' => $wgSQLiteDataDir,
+ 'flags' => DBO_DEFAULT
+ ],
+ 'hostsByName' => [
+ 'test-db1' => $wgDBserver,
+ ],
+ 'loadMonitorClass' => 'LoadMonitorNull',
+ 'localDomain' => wfWikiID()
+ ] );
+ }
+
private function quoteTable( Database $db, $table ) {
if ( $db->getType() === 'sqlite' ) {
return $table;
[ null, 86400, 800, .2, 800 ]
];
}
+
+ public function testDefaultCacheOptions() {
+ $wCache = clone $this->cache;
+ $key = wfRandomString();
+
+ $called = false;
+ $infos = [];
+ $wCache->setDefaultCacheSetOptionCallbacks(
+ function () use ( &$infos ) {
+ $infos['sometag'] = [ 'since' => 1999, 'lag' => 4, 'pending' => false ];
+
+ return 'sometag';
+ },
+ function ( $tag ) use ( &$infos, &$called ) {
+ $res = $infos[$tag];
+ unset( $infos[$tag] );
+ $called = true;
+
+ return $res;
+ }
+ );
+
+ $callback = function () {
+ return 42;
+ };
+
+ $value = $wCache->getWithSetCallback( $key, 5, $callback );
+
+ $this->assertEquals( 42, $value, 'Correct value' );
+ $this->assertTrue( $called, 'Options callback ran' );
+ }
}