*
* @file
*/
+use MediaWiki\MediaWikiServices;
/**
* Class to handle enqueueing of background jobs
/** @var ProcessCacheLRU */
protected $cache;
- /** @var string Wiki ID */
- protected $wiki;
+ /** @var string Wiki domain ID */
+ protected $domain;
/** @var string|bool Read only rationale (or false if r/w) */
protected $readOnlyReason;
/** @var bool Whether the wiki is not recognized in configuration */
- protected $invalidWiki = false;
+ protected $invalidDomain = false;
/** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
protected $coalescedQueues;
const CACHE_VERSION = 1; // integer; cache version
/**
- * @param string $wiki Wiki ID
+ * @param string $domain Wiki domain ID
* @param string|bool $readOnlyReason Read-only reason or false
*/
- protected function __construct( $wiki, $readOnlyReason ) {
- $this->wiki = $wiki;
+ protected function __construct( $domain, $readOnlyReason ) {
+ $this->domain = $domain;
$this->readOnlyReason = $readOnlyReason;
$this->cache = new MapCacheLRU( 10 );
}
/**
- * @param bool|string $wiki Wiki ID
+ * @param bool|string $domain Wiki domain ID
* @return JobQueueGroup
*/
- public static function singleton( $wiki = false ) {
+ public static function singleton( $domain = false ) {
global $wgLocalDatabases;
- $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( $domain === false ) {
+ $domain = WikiMap::getCurrentWikiDbDomain()->getId();
+ }
- if ( !isset( self::$instances[$wiki] ) ) {
- self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
+ if ( !isset( self::$instances[$domain] ) ) {
+ self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
// Make sure jobs are not getting pushed to bogus wikis. This can confuse
// the job runner system into spawning endless RPC requests that fail (T171371).
- if ( !WikiMap::isCurrentWikiId( $wiki ) && !in_array( $wiki, $wgLocalDatabases ) ) {
- self::$instances[$wiki]->invalidWiki = true;
+ $wikiId = WikiMap::getWikiIdFromDbDomain( $domain );
+ if (
+ !WikiMap::isCurrentWikiDbDomain( $domain ) &&
+ !in_array( $wikiId, $wgLocalDatabases )
+ ) {
+ self::$instances[$domain]->invalidDomain = true;
}
}
- return self::$instances[$wiki];
+ return self::$instances[$domain];
}
/**
public function get( $type ) {
global $wgJobTypeConf;
- $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
- $conf += $wgJobTypeConf[$type] ?? $wgJobTypeConf['default'];
- $conf['aggregator'] = JobQueueAggregator::singleton();
+ $conf = [ 'domain' => $this->domain, 'type' => $type ];
+ if ( isset( $wgJobTypeConf[$type] ) ) {
+ $conf = $conf + $wgJobTypeConf[$type];
+ } else {
+ $conf = $conf + $wgJobTypeConf['default'];
+ }
if ( !isset( $conf['readOnlyReason'] ) ) {
$conf['readOnlyReason'] = $this->readOnlyReason;
}
public function push( $jobs ) {
global $wgJobTypesExcludedFromDefaultQueue;
- if ( $this->invalidWiki ) {
+ if ( $this->invalidDomain ) {
// Do not enqueue job that cannot be run (T171371)
- $e = new LogicException( "Domain '{$this->wiki}' is not recognized." );
+ $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
MWExceptionHandler::logException( $e );
return;
}
$jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
- if ( !count( $jobs ) ) {
+ if ( $jobs === [] ) {
return;
}
$cache = ObjectCache::getLocalClusterInstance();
$cache->set(
- $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ),
+ $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
'true',
15
);
if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
$cache->set(
- $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ),
+ $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
'true',
15
);
* @since 1.26
*/
public function lazyPush( $jobs ) {
- if ( $this->invalidWiki ) {
+ if ( $this->invalidDomain ) {
// Do not enqueue job that cannot be run (T171371)
- throw new LogicException( "Domain '{$this->wiki}' is not recognized." );
+ throw new LogicException( "Domain '{$this->domain}' is not recognized." );
}
if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
// Throw errors now instead of on push(), when other jobs may be buffered
$this->assertValidJobs( $jobs );
- DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->wiki, $jobs ) );
+ DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
}
/**
*/
public function queuesHaveJobs( $type = self::TYPE_ANY ) {
$cache = ObjectCache::getLocalClusterInstance();
- $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type );
+ $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
$value = $cache->get( $key );
if ( $value === false ) {
/**
* Get the list of job types that have non-empty queues
*
- * @return array List of job types that have non-empty queues
+ * @return string[] List of job types that have non-empty queues
*/
public function getQueuesWithJobs() {
$types = [];
foreach ( $this->getCoalescedQueues() as $info ) {
- $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+ /** @var JobQueue $queue */
+ $queue = $info['queue'];
+ $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
if ( is_array( $nonEmpty ) ) { // batching features supported
$types = array_merge( $types, $nonEmpty );
} else { // we have to go through the queues in the bucket one-by-one
/**
* Get the size of the queus for a list of job types
*
- * @return array Map of (job type => size)
+ * @return int[] Map of (job type => size)
*/
public function getQueueSizes() {
$sizeMap = [];
foreach ( $this->getCoalescedQueues() as $info ) {
- $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+ /** @var JobQueue $queue */
+ $queue = $info['queue'];
+ $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
if ( is_array( $sizes ) ) { // batching features supported
$sizeMap = $sizeMap + $sizes;
} else { // we have to go through the queues in the bucket one-by-one
}
/**
- * @return array
+ * @return JobQueue[]
*/
protected function getCoalescedQueues() {
global $wgJobTypeConf;
$this->coalescedQueues = [];
foreach ( $wgJobTypeConf as $type => $conf ) {
$queue = JobQueue::factory(
- [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
+ [ 'domain' => $this->domain, 'type' => 'null' ] + $conf );
$loc = $queue->getCoalesceLocationInternal();
if ( !isset( $this->coalescedQueues[$loc] ) ) {
$this->coalescedQueues[$loc]['queue'] = $queue;
*/
private function getCachedConfigVar( $name ) {
// @TODO: cleanup this whole method with a proper config system
- if ( WikiMap::isCurrentWikiId( $this->wiki ) ) {
+ if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
return $GLOBALS[$name]; // common case
} else {
- $wiki = $this->wiki;
- $cache = ObjectCache::getMainWANInstance();
+ $wiki = WikiMap::getWikiIdFromDbDomain( $this->domain );
+ $cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
$value = $cache->getWithSetCallback(
- $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
+ $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
$cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
function () use ( $wiki, $name ) {
global $wgConf;
-
+ // @TODO: use the full domain ID here
return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
},
[ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]