'JavaScriptMinifier' => __DIR__ . '/includes/libs/JavaScriptMinifier.php',
'Job' => __DIR__ . '/includes/jobqueue/Job.php',
'JobQueue' => __DIR__ . '/includes/jobqueue/JobQueue.php',
- 'JobQueueAggregator' => __DIR__ . '/includes/jobqueue/aggregator/JobQueueAggregator.php',
- 'JobQueueAggregatorNull' => __DIR__ . '/includes/jobqueue/aggregator/JobQueueAggregatorNull.php',
- 'JobQueueAggregatorRedis' => __DIR__ . '/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php',
'JobQueueConnectionError' => __DIR__ . '/includes/jobqueue/exception/JobQueueConnectionError.php',
'JobQueueDB' => __DIR__ . '/includes/jobqueue/JobQueueDB.php',
'JobQueueEnqueueUpdate' => __DIR__ . '/includes/deferred/JobQueueEnqueueUpdate.php',
'default' => [ 'class' => JobQueueDB::class, 'order' => 'random', 'claimTTL' => 3600 ],
];
-/**
- * Which aggregator to use for tracking which queues have jobs.
- * These settings should be global to all wikis.
- */
-$wgJobQueueAggregator = [
- 'class' => JobQueueAggregatorNull::class
-];
-
/**
* Whether to include the number of jobs that are queued
* for the API's maxlag parameter.
/** @var BagOStuff */
protected $dupCache;
- /** @var JobQueueAggregator */
- protected $aggr;
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
}
$this->dupCache = wfGetCache( CACHE_ANYTHING );
- $this->aggr = $params['aggregator'] ?? new JobQueueAggregatorNull( [] );
$this->readOnlyReason = $params['readOnlyReason'] ?? false;
}
}
$this->doBatchPush( $jobs, $flags );
- $this->aggr->notifyQueueNonEmpty( $this->domain, $this->type );
foreach ( $jobs as $job ) {
if ( $job->isRootJob() ) {
$job = $this->doPop();
- if ( !$job ) {
- $this->aggr->notifyQueueEmpty( $this->domain, $this->type );
- }
-
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
$affected = $dbw->affectedRows();
$count += $affected;
JobQueue::incrStats( 'recycles', $this->type, $affected );
- $this->aggr->notifyQueueNonEmpty( $this->domain, $this->type );
}
}
} else {
$conf = $conf + $wgJobTypeConf['default'];
}
- $conf['aggregator'] = JobQueueAggregator::singleton();
if ( !isset( $conf['readOnlyReason'] ) ) {
$conf['readOnlyReason'] = $this->readOnlyReason;
}
+++ /dev/null
-<?php
-/**
- * Job queue aggregator code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- */
-
-/**
- * Class to handle tracking information about all queues
- *
- * @ingroup JobQueue
- * @since 1.21
- */
-abstract class JobQueueAggregator {
- /** @var JobQueueAggregator */
- protected static $instance = null;
-
- /**
- * @param array $params
- */
- public function __construct( array $params ) {
- }
-
- /**
- * @throws MWException
- * @return JobQueueAggregator
- */
- final public static function singleton() {
- global $wgJobQueueAggregator;
-
- if ( !isset( self::$instance ) ) {
- $class = $wgJobQueueAggregator['class'];
- $obj = new $class( $wgJobQueueAggregator );
- if ( !( $obj instanceof JobQueueAggregator ) ) {
- throw new MWException( "Class '$class' is not a JobQueueAggregator class." );
- }
- self::$instance = $obj;
- }
-
- return self::$instance;
- }
-
- /**
- * Destroy the singleton instance
- *
- * @return void
- */
- final public static function destroySingleton() {
- self::$instance = null;
- }
-
- /**
- * Mark a queue as being empty
- *
- * @param string $wiki
- * @param string $type
- * @return bool Success
- */
- final public function notifyQueueEmpty( $wiki, $type ) {
- $ok = $this->doNotifyQueueEmpty( $wiki, $type );
-
- return $ok;
- }
-
- /**
- * @see JobQueueAggregator::notifyQueueEmpty()
- * @param string $wiki
- * @param string $type
- * @return bool
- */
- abstract protected function doNotifyQueueEmpty( $wiki, $type );
-
- /**
- * Mark a queue as being non-empty
- *
- * @param string $wiki
- * @param string $type
- * @return bool Success
- */
- final public function notifyQueueNonEmpty( $wiki, $type ) {
- $ok = $this->doNotifyQueueNonEmpty( $wiki, $type );
-
- return $ok;
- }
-
- /**
- * @see JobQueueAggregator::notifyQueueNonEmpty()
- * @param string $wiki
- * @param string $type
- * @return bool
- */
- abstract protected function doNotifyQueueNonEmpty( $wiki, $type );
-
- /**
- * Get the list of all of the queues with jobs
- *
- * @return array (job type => (list of wiki IDs))
- */
- final public function getAllReadyWikiQueues() {
- $res = $this->doGetAllReadyWikiQueues();
-
- return $res;
- }
-
- /**
- * @see JobQueueAggregator::getAllReadyWikiQueues()
- */
- abstract protected function doGetAllReadyWikiQueues();
-
- /**
- * Purge all of the aggregator information
- *
- * @return bool Success
- */
- final public function purge() {
- $res = $this->doPurge();
-
- return $res;
- }
-
- /**
- * @see JobQueueAggregator::purge()
- */
- abstract protected function doPurge();
-
- /**
- * Get all databases that have a pending job.
- * This poll all the queues and is this expensive.
- *
- * @return array (job type => (list of wiki IDs))
- */
- protected function findPendingWikiQueues() {
- global $wgLocalDatabases;
-
- $pendingDBs = []; // (job type => (db list))
- foreach ( $wgLocalDatabases as $wikiId ) {
- foreach ( JobQueueGroup::singleton( $wikiId )->getQueuesWithJobs() as $type ) {
- $pendingDBs[$type][] = $wikiId;
- }
- }
-
- return $pendingDBs;
- }
-}
+++ /dev/null
-<?php
-/**
- * Job queue aggregator code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- */
-
-/**
- * @ingroup JobQueue
- */
-class JobQueueAggregatorNull extends JobQueueAggregator {
- protected function doNotifyQueueEmpty( $wiki, $type ) {
- return true;
- }
-
- protected function doNotifyQueueNonEmpty( $wiki, $type ) {
- return true;
- }
-
- protected function doGetAllReadyWikiQueues() {
- return [];
- }
-
- protected function doPurge() {
- return true;
- }
-}
+++ /dev/null
-<?php
-/**
- * Job queue aggregator code that uses PhpRedis.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- */
-use Psr\Log\LoggerInterface;
-
-/**
- * Class to handle tracking information about all queues using PhpRedis
- *
- * The mediawiki/services/jobrunner background service must be set up and running.
- *
- * @ingroup JobQueue
- * @ingroup Redis
- * @since 1.21
- */
-class JobQueueAggregatorRedis extends JobQueueAggregator {
- /** @var RedisConnectionPool */
- protected $redisPool;
- /** @var LoggerInterface */
- protected $logger;
- /** @var array List of Redis server addresses */
- protected $servers;
-
- /**
- * @param array $params Possible keys:
- * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
- * - redisServers : Array of server entries, the first being the primary and the
- * others being fallback servers. Each entry is either a hostname/port
- * combination or the absolute path of a UNIX socket.
- * If a hostname is specified but no port, the standard port number
- * 6379 will be used. Required.
- */
- public function __construct( array $params ) {
- parent::__construct( $params );
- $this->servers = $params['redisServers'] ?? [ $params['redisServer'] ]; // b/c
- $params['redisConfig']['serializer'] = 'none';
- $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
- $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' );
- }
-
- protected function doNotifyQueueEmpty( $wiki, $type ) {
- return true; // managed by the service
- }
-
- protected function doNotifyQueueNonEmpty( $wiki, $type ) {
- return true; // managed by the service
- }
-
- protected function doGetAllReadyWikiQueues() {
- $conn = $this->getConnection();
- if ( !$conn ) {
- return [];
- }
- try {
- $map = $conn->hGetAll( $this->getReadyQueueKey() );
-
- if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
- unset( $map['_epoch'] ); // ignore
- $pendingDBs = []; // (type => list of wikis)
- foreach ( $map as $key => $time ) {
- list( $type, $wiki ) = $this->decodeQueueName( $key );
- $pendingDBs[$type][] = $wiki;
- }
- } else {
- throw new UnexpectedValueException(
- "No queue listing found; make sure redisJobChronService is running."
- );
- }
-
- return $pendingDBs;
- } catch ( RedisException $e ) {
- $this->redisPool->handleError( $conn, $e );
-
- return [];
- }
- }
-
- protected function doPurge() {
- return true; // fully and only refreshed by the service
- }
-
- /**
- * Get a connection to the server that handles all sub-queues for this queue
- *
- * @return RedisConnRef|bool Returns false on failure
- * @throws MWException
- */
- protected function getConnection() {
- $conn = false;
- foreach ( $this->servers as $server ) {
- $conn = $this->redisPool->getConnection( $server, $this->logger );
- if ( $conn ) {
- break;
- }
- }
-
- return $conn;
- }
-
- /**
- * @return string
- */
- private function getReadyQueueKey() {
- return "jobqueue:aggregator:h-ready-queues:v2"; // global
- }
-
- /**
- * @param string $name
- * @return string[]
- */
- private function decodeQueueName( $name ) {
- list( $type, $wiki ) = explode( '/', $name, 2 );
-
- return [ rawurldecode( $type ), rawurldecode( $wiki ) ];
- }
-}