Added JobQueueGroup::lazyPush method
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 15 May 2015 23:23:51 +0000 (16:23 -0700)
committerOri.livneh <ori@wikimedia.org>
Mon, 18 May 2015 16:26:59 +0000 (16:26 +0000)
* Made use of this in triggerOpportunisticLinksUpdate()
* This will defer and better batch job insertion
* Lazy job insertion and other deferred updates
  make use of register_postsend_function if present
* Also cleaned up some return types and exceptions
  in JobQueueGroup

Bug: T99302
Change-Id: I3a3968d75cb37563f970be08e63f31a090e0e037

includes/MediaWiki.php
includes/jobqueue/JobQueueGroup.php
includes/page/WikiPage.php

index b1d0738..84001ff 100644 (file)
@@ -422,8 +422,7 @@ class MediaWiki {
        }
 
        /**
-        * Run the current MediaWiki instance
-        * index.php just calls this
+        * Run the current MediaWiki instance; index.php just calls this
         */
        public function run() {
                try {
@@ -437,9 +436,30 @@ class MediaWiki {
                                wfGetLBFactory()->commitMasterChanges();
                                $e->report(); // display the GUI error
                        }
-                       if ( function_exists( 'fastcgi_finish_request' ) ) {
-                               fastcgi_finish_request();
-                       }
+               } catch ( Exception $e ) {
+                       MWExceptionHandler::handleException( $e );
+               }
+
+               if ( function_exists( 'register_postsend_function' ) ) {
+                       // https://github.com/facebook/hhvm/issues/1230
+                       register_postsend_function( array( $this, 'postSendUpdates' ) );
+               } elseif ( function_exists( 'fastcgi_finish_request' ) ) {
+                       fastcgi_finish_request();
+                       $this->postSendUpdates();
+               } else {
+                       $this->postSendUpdates();
+               }
+       }
+
+       /**
+        * This function does work that can be done *after* the
+        * user gets the HTTP response so they don't block on it
+        *
+        * @since 1.26
+        */
+       public function postSendUpdates() {
+               try {
+                       JobQueueGroup::singleton()->pushLazyJobs();
                        $this->triggerJobs();
                        $this->restInPeace();
                } catch ( Exception $e ) {
@@ -605,6 +625,9 @@ class MediaWiki {
                // Do any deferred jobs
                DeferredUpdates::doUpdates( 'commit' );
 
+               // Make sure any lazy jobs are pushed
+               JobQueueGroup::singleton()->pushLazyJobs();
+
                // Log profiling data, e.g. in the database or UDP
                wfLogProfilingData();
 
index fdf7b87..72d2537 100644 (file)
@@ -40,6 +40,9 @@ class JobQueueGroup {
        /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
        protected $coalescedQueues;
 
+       /** @var Job[] */
+       protected $bufferedJobs = array();
+
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY = 2; // integer; any job
 
@@ -100,13 +103,13 @@ class JobQueueGroup {
        }
 
        /**
-        * Insert jobs into the respective queues of with the belong.
+        * Insert jobs into the respective queues of which they belong
         *
         * This inserts the jobs into the queue specified by $wgJobTypeConf
         * and updates the aggregate job queue information cache as needed.
         *
-        * @param Job|Job[] $jobs A single Job or a list of Jobs
-        * @throws MWException
+        * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+        * @throws InvalidArgumentException
         * @return void
         */
        public function push( $jobs ) {
@@ -115,13 +118,11 @@ class JobQueueGroup {
                        return;
                }
 
+               $this->assertValidJobs( $jobs );
+
                $jobsByType = array(); // (job type => list of jobs)
                foreach ( $jobs as $job ) {
-                       if ( $job instanceof IJobSpecification ) {
-                               $jobsByType[$job->getType()][] = $job;
-                       } else {
-                               throw new MWException( "Attempted to push a non-Job object into a queue." );
-                       }
+                       $jobsByType[$job->getType()][] = $job;
                }
 
                foreach ( $jobsByType as $type => $jobs ) {
@@ -136,6 +137,41 @@ class JobQueueGroup {
                }
        }
 
+       /**
+        * Buffer jobs for insertion via push() or call it now if in CLI mode
+        *
+        * Note that MediaWiki::restInPeace() calls pushLazyJobs()
+        *
+        * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+        * @return void
+        * @since 1.26
+        */
+       public function lazyPush( $jobs ) {
+               if ( PHP_SAPI === 'cli' ) {
+                       $this->push( $jobs );
+                       return;
+               }
+
+               $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+
+               // Throw errors now instead of on push(), when other jobs may be buffered
+               $this->assertValidJobs( $jobs );
+
+               $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
+       }
+
+       /**
+        * Push all jobs buffered via lazyPush() into their respective queues
+        *
+        * @return void
+        * @since 1.26
+        */
+       public function pushLazyJobs() {
+               $this->push( $this->bufferedJobs );
+
+               $this->bufferedJobs = array();
+       }
+
        /**
         * Pop a job off one of the job queues
         *
@@ -188,10 +224,10 @@ class JobQueueGroup {
         * Acknowledge that a job was completed
         *
         * @param Job $job
-        * @return bool
+        * @return void
         */
        public function ack( Job $job ) {
-               return $this->get( $job->getType() )->ack( $job );
+               $this->get( $job->getType() )->ack( $job );
        }
 
        /**
@@ -211,7 +247,6 @@ class JobQueueGroup {
         * This does nothing for certain queue classes.
         *
         * @return void
-        * @throws MWException
         */
        public function waitForBackups() {
                global $wgJobTypeConf;
@@ -364,4 +399,24 @@ class JobQueueGroup {
                        }
                }
        }
+
+       /**
+        * @param array $jobs
+        * @throws InvalidArgumentException
+        */
+       private function assertValidJobs( array $jobs ) {
+               foreach ( $jobs as $job ) { // sanity checks
+                       if ( !( $job instanceof IJobSpecification ) ) {
+                               throw new InvalidArgumentException( "Expected IJobSpecification objects" );
+                       }
+               }
+       }
+
+       function __destruct() {
+               $n = count( $this->bufferedJobs );
+               if ( $n > 0 ) {
+                       trigger_error( __METHOD__ . ": $n buffered job(s) never inserted." );
+                       $this->pushLazyJobs(); // try to do it now
+               }
+       }
 }
index 468f898..914522f 100644 (file)
@@ -3416,7 +3416,7 @@ class WikiPage implements Page, IDBAccessObject {
                        $params['isOpportunistic'] = true;
                        $params['rootJobTimestamp'] = $parserOutput->getCacheTime();
 
-                       JobQueueGroup::singleton()->push( EnqueueJob::newFromLocalJobs(
+                       JobQueueGroup::singleton()->lazyPush( EnqueueJob::newFromLocalJobs(
                                new JobSpecification( 'refreshLinks', $params,
                                        array( 'removeDuplicates' => true ), $this->mTitle )
                        ) );