[JobQueue] Optimized JobQueueGroup::pop().
authorAaron Schulz <aschulz@wikimedia.org>
Mon, 14 Jan 2013 22:06:11 +0000 (14:06 -0800)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 15 Jan 2013 00:40:27 +0000 (16:40 -0800)
* This also simplifies MediaWiki::doJobs().

Change-Id: I58ee2313453c64d4e8d91f3a65181aaa9c2e847f

includes/DefaultSettings.php
includes/Wiki.php
includes/job/JobQueueGroup.php
maintenance/runJobs.php
tests/phpunit/includes/TemplateCategoriesTest.php

index f41c7aa..c13dbfd 100644 (file)
@@ -2830,7 +2830,7 @@ $wgShowRollbackEditCount = 10;
 
 /**
  * Output a <link rel="canonical"> tag on every page indicating the canonical
- * server which should be used, i.e. $wgServer or $wgCanonicalServer. Since 
+ * server which should be used, i.e. $wgServer or $wgCanonicalServer. Since
  * detection of the current server is unreliable, the link is sent
  * unconditionally.
  */
@@ -5453,7 +5453,8 @@ $wgJobClasses = array(
 
 /**
 
- * Jobs that must be explicitly requested, i.e. aren't run by job runners unless special flags are set.
+ * Jobs that must be explicitly requested, i.e. aren't run by job runners unless
+ * special flags are set. The values here are keys of $wgJobClasses.
  *
  * These can be:
  * - Very long-running jobs.
index 2b12dd7..7604241 100644 (file)
@@ -616,39 +616,22 @@ class MediaWiki {
                }
 
                $group = JobQueueGroup::singleton();
-               $types = $group->getDefaultQueueTypes();
-               shuffle( $types ); // avoid starvation
-
-               // Scan the queues for a job N times...
                do {
-                       $jobFound = false; // found a job in any queue?
-                       // Find a queue with a job on it and run it...
-                       foreach ( $types as $i => $type ) {
-                               $queue = $group->get( $type );
-                               if ( $queue->isEmpty() ) {
-                                       unset( $types[$i] ); // don't keep checking this queue
-                                       continue;
-                               }
-                               $job = $queue->pop();
-                               if ( $job ) {
-                                       $jobFound = true;
-                                       $output = $job->toString() . "\n";
-                                       $t = - microtime( true );
-                                       $success = $job->run();
-                                       $queue->ack( $job ); // done
-                                       $t += microtime( true );
-                                       $t = round( $t * 1000 );
-                                       if ( !$success ) {
-                                               $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
-                                       } else {
-                                               $output .= "Success, Time: $t ms\n";
-                                       }
-                                       wfDebugLog( 'jobqueue', $output );
-                                       break;
+                       $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue
+                       if ( $job ) {
+                               $output = $job->toString() . "\n";
+                               $t = - microtime( true );
+                               $success = $job->run();
+                               $group->ack( $job ); // done
+                               $t += microtime( true );
+                               $t = round( $t * 1000 );
+                               if ( !$success ) {
+                                       $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
                                } else {
-                                       unset( $types[$i] ); // don't keep checking this queue
+                                       $output .= "Success, Time: $t ms\n";
                                }
+                               wfDebugLog( 'jobqueue', $output );
                        }
-               } while ( --$n && $jobFound );
+               } while ( --$n && $job );
        }
 }
index b9cad7f..eaa68d5 100644 (file)
@@ -31,16 +31,24 @@ class JobQueueGroup {
        /** @var Array */
        protected static $instances = array();
 
+       /** @var ProcessCacheLRU */
+       protected $cache;
+
        protected $wiki; // string; wiki ID
 
-       const TYPE_DEFAULT = 1; // integer; job not in $wgJobTypesExcludedFromDefaultQueue
+       const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY     = 2; // integer; any job
 
+       const USE_CACHE = 1; // integer; use process cache
+
+       const PROC_CACHE_TTL = 15; // integer; seconds
+
        /**
         * @param $wiki string Wiki ID
         */
        protected function __construct( $wiki ) {
                $this->wiki = $wiki;
+               $this->cache = new ProcessCacheLRU( 1 );
        }
 
        /**
@@ -55,6 +63,15 @@ class JobQueueGroup {
                return self::$instances[$wiki];
        }
 
+       /**
+        * Destroy the singleton instances
+        *
+        * @return void
+        */
+       public static function destroySingletons() {
+               self::$instances = array();
+       }
+
        /**
         * @param $type string
         * @return JobQueue Job queue object for a given queue type
@@ -99,25 +116,44 @@ class JobQueueGroup {
                        }
                }
 
+               if ( $this->cache->has( 'queues-ready', 'list' ) ) {
+                       $list = $this->cache->get( 'queues-ready', 'list' );
+                       if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
+                               $this->cache->clear( 'queues-ready' );
+                       }
+               }
+
                return $ok;
        }
 
        /**
         * Pop a job off one of the job queues
         *
-        * @param $type integer JobQueueGroup::TYPE_* constant
+        * @param $queueType integer JobQueueGroup::TYPE_* constant
+        * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
         * @return Job|bool Returns false on failure
         */
-       public function pop( $type = self::TYPE_DEFAULT ) {
-               $types = ( $type == self::TYPE_DEFAULT )
-                       ? $this->getDefaultQueueTypes()
-                       : $this->getQueueTypes();
+       public function pop( $queueType = self::TYPE_DEFAULT, $flags = 0 ) {
+               if ( $flags & self::USE_CACHE ) {
+                       if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
+                               $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+                       }
+                       $types = $this->cache->get( 'queues-ready', 'list' );
+               } else {
+                       $types = $this->getQueuesWithJobs();
+               }
+
+               if ( $queueType == self::TYPE_DEFAULT ) {
+                       $types = array_intersect( $types, $this->getDefaultQueueTypes() );
+               }
                shuffle( $types ); // avoid starvation
 
                foreach ( $types as $type ) { // for each queue...
                        $job = $this->get( $type )->pop();
-                       if ( $job ) {
-                               return $job; // found
+                       if ( $job ) { // found
+                               return $job;
+                       } else { // not found
+                               $this->cache->clear( 'queues-ready' );
                        }
                }
 
@@ -179,4 +215,17 @@ class JobQueueGroup {
                }
                return $types;
        }
+
+       /**
+        * @return Array List of default job types that have non-empty queues
+        */
+       public function getDefaultQueuesWithJobs() {
+               $types = array();
+               foreach ( $this->getDefaultQueueTypes() as $type ) {
+                       if ( !$this->get( $type )->isEmpty() ) {
+                               $types[] = $type;
+                       }
+               }
+               return $types;
+       }
 }
index f06e6b0..d401dec 100644 (file)
@@ -74,7 +74,7 @@ class RunJobs extends Maintenance {
                $group = JobQueueGroup::singleton();
                do {
                        $job = ( $type === false )
-                               ? $group->pop() // job from any queue
+                               ? $group->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE )
                                : $group->get( $type )->pop(); // job from a single queue
                        if ( $job ) { // found a job
                                // Perform the job (logging success/failure and runtime)...
index 03b94ae..a793bab 100644 (file)
@@ -23,6 +23,7 @@ class TemplateCategoriesTest extends MediaWikiLangTestCase {
                $status = $template->doEditContent( new WikitextContent( '[[Category:Solved bugs]]' ), 'Add a category through a template', 0, false, $user );
 
                // Run the job queue
+               JobQueueGroup::destroySingletons();
                $jobs = new RunJobs;
                $jobs->loadParamsAndArgs( null, array( 'quiet' => true ), null );
                $jobs->execute();