* @since 1.21
*/
class JobQueueGroup {
- /** @var Array */
+ /** @var array */
protected static $instances = array();
/** @var ProcessCacheLRU */
protected $cache;
- protected $wiki; // string; wiki ID
+ /** @var string Wiki ID */
+ protected $wiki;
/** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
protected $coalescedQueues;
}
/**
- * @param string $wiki Wiki ID
+ * @param bool|string $wiki Wiki ID
* @return JobQueueGroup
*/
public static function singleton( $wiki = false ) {
if ( !isset( self::$instances[$wiki] ) ) {
self::$instances[$wiki] = new self( $wiki );
}
+
return self::$instances[$wiki];
}
/**
* Get the job queue object for a given queue type
*
- * @param $type string
+ * @param string $type
* @return JobQueue
*/
public function get( $type ) {
* This inserts the jobs into the queue specified by $wgJobTypeConf
* and updates the aggregate job queue information cache as needed.
*
- * @param $jobs Job|array A single Job or a list of Jobs
+ * @param Job|array $jobs A single Job or a list of Jobs
* @throws MWException
* @return bool
*/
public function push( $jobs ) {
$jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ if ( !count( $jobs ) ) {
+ return true;
+ }
$jobsByType = array(); // (job type => list of jobs)
foreach ( $jobs as $job ) {
* This pops a job off a queue as specified by $wgJobTypeConf and
* updates the aggregate job queue information cache as needed.
*
- * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type string
- * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
+ * @param int|string $qtype JobQueueGroup::TYPE_DEFAULT or type string
+ * @param int $flags Bitfield of JobQueueGroup::USE_* constants
* @return Job|bool Returns false on failure
*/
public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
if ( !$job ) {
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
}
+
return $job;
} else { // any job in the "default" jobs types
if ( $flags & self::USE_CACHE ) {
/**
* Acknowledge that a job was completed
*
- * @param $job Job
+ * @param Job $job
* @return bool
*/
public function ack( Job $job ) {
* Register the "root job" of a given job into the queue for de-duplication.
* This should only be called right *after* all the new jobs have been inserted.
*
- * @param $job Job
+ * @param Job $job
* @return bool
*/
public function deduplicateRootJob( Job $job ) {
/**
* Get the list of job types that have non-empty queues
*
- * @return Array List of job types that have non-empty queues
+ * @return array List of job types that have non-empty queues
*/
public function getQueuesWithJobs() {
$types = array();
}
}
}
+
return $types;
}
/**
* Get the size of the queus for a list of job types
*
- * @return Array Map of (job type => size)
+ * @return array Map of (job type => size)
*/
public function getQueueSizes() {
$sizeMap = array();
}
}
}
+
return $sizeMap;
}
* This is only used for performance, such as to avoid spamming
* the queue with many sub-jobs before they actually get run.
*
- * @param $type string
+ * @param string $type
* @return bool
*/
public function isQueueDeprioritized( $type ) {
return $this->cache->get( 'isDeprioritized', $type );
}
if ( $type === 'refreshLinks2' ) {
- // Don't keep converting refreshLinks2 => refreshLinks jobs if the
+ // Don't keep converting refreshLinksPartition => refreshLinks jobs if the
// later jobs have not been done yet. This helps throttle queue spam.
- $deprioritized = !$this->get( 'refreshLinks' )->isEmpty();
+ // @TODO: this is mostly a WMF-specific hack and should be removed when
+ // refreshLinks2 jobs are drained.
+ $deprioritized = !$this->get( 'refreshLinks' )->getSize() > 10000;
$this->cache->set( 'isDeprioritized', $type, $deprioritized );
+
return $deprioritized;
}
+
return false;
}
* the defined run period. Concurrent calls to this function will cause tasks
* to be attempted twice, so they may need their own methods of mutual exclusion.
*
- * @return integer Number of tasks run
+ * @return int Number of tasks run
*/
public function executeReadyPeriodicTasks() {
global $wgMemc;
if ( $definition['period'] <= 0 ) {
continue; // disabled
} elseif ( !isset( $lastRuns[$type][$task] )
- || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
- {
+ || $lastRuns[$type][$task] < ( time() - $definition['period'] )
+ ) {
try {
if ( call_user_func( $definition['callback'] ) !== null ) {
$tasksRun[$type][$task] = time();
++$count;
}
} catch ( JobQueueError $e ) {
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
}
}
}
- $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
+ $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
if ( is_array( $lastRuns ) ) {
foreach ( $tasksRun as $type => $tasks ) {
foreach ( $tasks as $task => $timestamp ) {
if ( !isset( $lastRuns[$type][$task] )
- || $timestamp > $lastRuns[$type][$task] )
- {
+ || $timestamp > $lastRuns[$type][$task]
+ ) {
$lastRuns[$type][$task] = $timestamp;
}
}
} else {
$lastRuns = $tasksRun;
}
+
return $lastRuns;
} );
} else {
$value = $wgConf->getConfig( $this->wiki, $name );
$wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
+
return $value;
}
}