* This also simplifies MediaWiki::doJobs().
Change-Id: I58ee2313453c64d4e8d91f3a65181aaa9c2e847f
/**
* Output a <link rel="canonical"> tag on every page indicating the canonical
- * server which should be used, i.e. $wgServer or $wgCanonicalServer. Since
+ * server which should be used, i.e. $wgServer or $wgCanonicalServer. Since
* detection of the current server is unreliable, the link is sent
* unconditionally.
*/
/**
- * Jobs that must be explicitly requested, i.e. aren't run by job runners unless special flags are set.
+ * Jobs that must be explicitly requested, i.e. aren't run by job runners unless
+ * special flags are set. The values here are keys of $wgJobClasses.
*
* These can be:
* - Very long-running jobs.
}
$group = JobQueueGroup::singleton();
- $types = $group->getDefaultQueueTypes();
- shuffle( $types ); // avoid starvation
-
- // Scan the queues for a job N times...
do {
- $jobFound = false; // found a job in any queue?
- // Find a queue with a job on it and run it...
- foreach ( $types as $i => $type ) {
- $queue = $group->get( $type );
- if ( $queue->isEmpty() ) {
- unset( $types[$i] ); // don't keep checking this queue
- continue;
- }
- $job = $queue->pop();
- if ( $job ) {
- $jobFound = true;
- $output = $job->toString() . "\n";
- $t = - microtime( true );
- $success = $job->run();
- $queue->ack( $job ); // done
- $t += microtime( true );
- $t = round( $t * 1000 );
- if ( !$success ) {
- $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
- } else {
- $output .= "Success, Time: $t ms\n";
- }
- wfDebugLog( 'jobqueue', $output );
- break;
+ $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue
+ if ( $job ) {
+ $output = $job->toString() . "\n";
+ $t = - microtime( true );
+ $success = $job->run();
+ $group->ack( $job ); // done
+ $t += microtime( true );
+ $t = round( $t * 1000 );
+ if ( !$success ) {
+ $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
} else {
- unset( $types[$i] ); // don't keep checking this queue
+ $output .= "Success, Time: $t ms\n";
}
+ wfDebugLog( 'jobqueue', $output );
}
- } while ( --$n && $jobFound );
+ } while ( --$n && $job );
}
}
/** @var Array */
protected static $instances = array();
+ /** @var ProcessCacheLRU */
+ protected $cache;
+
protected $wiki; // string; wiki ID
- const TYPE_DEFAULT = 1; // integer; job not in $wgJobTypesExcludedFromDefaultQueue
+ const TYPE_DEFAULT = 1; // integer; jobs popped by default
const TYPE_ANY = 2; // integer; any job
+ const USE_CACHE = 1; // integer; use process cache
+
+ const PROC_CACHE_TTL = 15; // integer; seconds
+
/**
* @param $wiki string Wiki ID
*/
protected function __construct( $wiki ) {
$this->wiki = $wiki;
+ $this->cache = new ProcessCacheLRU( 1 );
}
/**
return self::$instances[$wiki];
}
+ /**
+ * Destroy the singleton instances
+ *
+ * @return void
+ */
+ public static function destroySingletons() {
+ self::$instances = array();
+ }
+
/**
* @param $type string
* @return JobQueue Job queue object for a given queue type
}
}
+ if ( $this->cache->has( 'queues-ready', 'list' ) ) {
+ $list = $this->cache->get( 'queues-ready', 'list' );
+ if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
+ $this->cache->clear( 'queues-ready' );
+ }
+ }
+
return $ok;
}
/**
* Pop a job off one of the job queues
*
- * @param $type integer JobQueueGroup::TYPE_* constant
+ * @param $queueType integer JobQueueGroup::TYPE_* constant
+ * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
* @return Job|bool Returns false on failure
*/
- public function pop( $type = self::TYPE_DEFAULT ) {
- $types = ( $type == self::TYPE_DEFAULT )
- ? $this->getDefaultQueueTypes()
- : $this->getQueueTypes();
+ public function pop( $queueType = self::TYPE_DEFAULT, $flags = 0 ) {
+ if ( $flags & self::USE_CACHE ) {
+ if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
+ $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+ }
+ $types = $this->cache->get( 'queues-ready', 'list' );
+ } else {
+ $types = $this->getQueuesWithJobs();
+ }
+
+ if ( $queueType == self::TYPE_DEFAULT ) {
+ $types = array_intersect( $types, $this->getDefaultQueueTypes() );
+ }
shuffle( $types ); // avoid starvation
foreach ( $types as $type ) { // for each queue...
$job = $this->get( $type )->pop();
- if ( $job ) {
- return $job; // found
+ if ( $job ) { // found
+ return $job;
+ } else { // not found
+ $this->cache->clear( 'queues-ready' );
}
}
}
return $types;
}
+
+ /**
+ * @return Array List of default job types that have non-empty queues
+ */
+ public function getDefaultQueuesWithJobs() {
+ $types = array();
+ foreach ( $this->getDefaultQueueTypes() as $type ) {
+ if ( !$this->get( $type )->isEmpty() ) {
+ $types[] = $type;
+ }
+ }
+ return $types;
+ }
}
$group = JobQueueGroup::singleton();
do {
$job = ( $type === false )
- ? $group->pop() // job from any queue
+ ? $group->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE )
: $group->get( $type )->pop(); // job from a single queue
if ( $job ) { // found a job
// Perform the job (logging success/failure and runtime)...
$status = $template->doEditContent( new WikitextContent( '[[Category:Solved bugs]]' ), 'Add a category through a template', 0, false, $user );
// Run the job queue
+ JobQueueGroup::destroySingletons();
$jobs = new RunJobs;
$jobs->loadParamsAndArgs( null, array( 'quiet' => true ), null );
$jobs->execute();