From 6eef765b851295f1e94c0317aebb2d743dcdf3de Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Mon, 14 Jan 2013 14:06:11 -0800 Subject: [PATCH] [JobQueue] Optimized JobQueueGroup::pop(). * This also simplifies MediaWiki::doJobs(). Change-Id: I58ee2313453c64d4e8d91f3a65181aaa9c2e847f --- includes/DefaultSettings.php | 5 +- includes/Wiki.php | 43 ++++-------- includes/job/JobQueueGroup.php | 65 ++++++++++++++++--- maintenance/runJobs.php | 2 +- .../includes/TemplateCategoriesTest.php | 1 + 5 files changed, 75 insertions(+), 41 deletions(-) diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index f41c7aa0d7..c13dbfd84a 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -2830,7 +2830,7 @@ $wgShowRollbackEditCount = 10; /** * Output a tag on every page indicating the canonical - * server which should be used, i.e. $wgServer or $wgCanonicalServer. Since + * server which should be used, i.e. $wgServer or $wgCanonicalServer. Since * detection of the current server is unreliable, the link is sent * unconditionally. */ @@ -5453,7 +5453,8 @@ $wgJobClasses = array( /** - * Jobs that must be explicitly requested, i.e. aren't run by job runners unless special flags are set. + * Jobs that must be explicitly requested, i.e. aren't run by job runners unless + * special flags are set. The values here are keys of $wgJobClasses. * * These can be: * - Very long-running jobs. diff --git a/includes/Wiki.php b/includes/Wiki.php index 2b12dd76a2..760424199a 100644 --- a/includes/Wiki.php +++ b/includes/Wiki.php @@ -616,39 +616,22 @@ class MediaWiki { } $group = JobQueueGroup::singleton(); - $types = $group->getDefaultQueueTypes(); - shuffle( $types ); // avoid starvation - - // Scan the queues for a job N times... do { - $jobFound = false; // found a job in any queue? - // Find a queue with a job on it and run it... - foreach ( $types as $i => $type ) { - $queue = $group->get( $type ); - if ( $queue->isEmpty() ) { - unset( $types[$i] ); // don't keep checking this queue - continue; - } - $job = $queue->pop(); - if ( $job ) { - $jobFound = true; - $output = $job->toString() . "\n"; - $t = - microtime( true ); - $success = $job->run(); - $queue->ack( $job ); // done - $t += microtime( true ); - $t = round( $t * 1000 ); - if ( !$success ) { - $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n"; - } else { - $output .= "Success, Time: $t ms\n"; - } - wfDebugLog( 'jobqueue', $output ); - break; + $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue + if ( $job ) { + $output = $job->toString() . "\n"; + $t = - microtime( true ); + $success = $job->run(); + $group->ack( $job ); // done + $t += microtime( true ); + $t = round( $t * 1000 ); + if ( !$success ) { + $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n"; } else { - unset( $types[$i] ); // don't keep checking this queue + $output .= "Success, Time: $t ms\n"; } + wfDebugLog( 'jobqueue', $output ); } - } while ( --$n && $jobFound ); + } while ( --$n && $job ); } } diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index b9cad7fc48..eaa68d5c38 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -31,16 +31,24 @@ class JobQueueGroup { /** @var Array */ protected static $instances = array(); + /** @var ProcessCacheLRU */ + protected $cache; + protected $wiki; // string; wiki ID - const TYPE_DEFAULT = 1; // integer; job not in $wgJobTypesExcludedFromDefaultQueue + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job + const USE_CACHE = 1; // integer; use process cache + + const PROC_CACHE_TTL = 15; // integer; seconds + /** * @param $wiki string Wiki ID */ protected function __construct( $wiki ) { $this->wiki = $wiki; + $this->cache = new ProcessCacheLRU( 1 ); } /** @@ -55,6 +63,15 @@ class JobQueueGroup { return self::$instances[$wiki]; } + /** + * Destroy the singleton instances + * + * @return void + */ + public static function destroySingletons() { + self::$instances = array(); + } + /** * @param $type string * @return JobQueue Job queue object for a given queue type @@ -99,25 +116,44 @@ class JobQueueGroup { } } + if ( $this->cache->has( 'queues-ready', 'list' ) ) { + $list = $this->cache->get( 'queues-ready', 'list' ); + if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { + $this->cache->clear( 'queues-ready' ); + } + } + return $ok; } /** * Pop a job off one of the job queues * - * @param $type integer JobQueueGroup::TYPE_* constant + * @param $queueType integer JobQueueGroup::TYPE_* constant + * @param $flags integer Bitfield of JobQueueGroup::USE_* constants * @return Job|bool Returns false on failure */ - public function pop( $type = self::TYPE_DEFAULT ) { - $types = ( $type == self::TYPE_DEFAULT ) - ? $this->getDefaultQueueTypes() - : $this->getQueueTypes(); + public function pop( $queueType = self::TYPE_DEFAULT, $flags = 0 ) { + if ( $flags & self::USE_CACHE ) { + if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { + $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); + } + $types = $this->cache->get( 'queues-ready', 'list' ); + } else { + $types = $this->getQueuesWithJobs(); + } + + if ( $queueType == self::TYPE_DEFAULT ) { + $types = array_intersect( $types, $this->getDefaultQueueTypes() ); + } shuffle( $types ); // avoid starvation foreach ( $types as $type ) { // for each queue... $job = $this->get( $type )->pop(); - if ( $job ) { - return $job; // found + if ( $job ) { // found + return $job; + } else { // not found + $this->cache->clear( 'queues-ready' ); } } @@ -179,4 +215,17 @@ class JobQueueGroup { } return $types; } + + /** + * @return Array List of default job types that have non-empty queues + */ + public function getDefaultQueuesWithJobs() { + $types = array(); + foreach ( $this->getDefaultQueueTypes() as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } + return $types; + } } diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php index f06e6b0eac..d401dec32c 100644 --- a/maintenance/runJobs.php +++ b/maintenance/runJobs.php @@ -74,7 +74,7 @@ class RunJobs extends Maintenance { $group = JobQueueGroup::singleton(); do { $job = ( $type === false ) - ? $group->pop() // job from any queue + ? $group->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE ) : $group->get( $type )->pop(); // job from a single queue if ( $job ) { // found a job // Perform the job (logging success/failure and runtime)... diff --git a/tests/phpunit/includes/TemplateCategoriesTest.php b/tests/phpunit/includes/TemplateCategoriesTest.php index 03b94aea7f..a793babbba 100644 --- a/tests/phpunit/includes/TemplateCategoriesTest.php +++ b/tests/phpunit/includes/TemplateCategoriesTest.php @@ -23,6 +23,7 @@ class TemplateCategoriesTest extends MediaWikiLangTestCase { $status = $template->doEditContent( new WikitextContent( '[[Category:Solved bugs]]' ), 'Add a category through a template', 0, false, $user ); // Run the job queue + JobQueueGroup::destroySingletons(); $jobs = new RunJobs; $jobs->loadParamsAndArgs( null, array( 'quiet' => true ), null ); $jobs->execute(); -- 2.20.1