From: Aaron Schulz Date: Fri, 15 May 2015 23:23:51 +0000 (-0700) Subject: Added JobQueueGroup::lazyPush method X-Git-Tag: 1.31.0-rc.0~11374 X-Git-Url: http://git.cyclocoop.org/%24self?a=commitdiff_plain;h=7e97ce6a4402edae8bd626a9a6aa39060095aa42;p=lhc%2Fweb%2Fwiklou.git Added JobQueueGroup::lazyPush method * Made use of this in triggerOpportunisticLinksUpdate() * This will defer and better batch job insertion * Lazy job insertion and other deferred updates make use of register_postsend_function if present * Also cleaned up some return types and exceptions in JobQueueGroup Bug: T99302 Change-Id: I3a3968d75cb37563f970be08e63f31a090e0e037 --- diff --git a/includes/MediaWiki.php b/includes/MediaWiki.php index b1d0738c13..84001ffd4e 100644 --- a/includes/MediaWiki.php +++ b/includes/MediaWiki.php @@ -422,8 +422,7 @@ class MediaWiki { } /** - * Run the current MediaWiki instance - * index.php just calls this + * Run the current MediaWiki instance; index.php just calls this */ public function run() { try { @@ -437,9 +436,30 @@ class MediaWiki { wfGetLBFactory()->commitMasterChanges(); $e->report(); // display the GUI error } - if ( function_exists( 'fastcgi_finish_request' ) ) { - fastcgi_finish_request(); - } + } catch ( Exception $e ) { + MWExceptionHandler::handleException( $e ); + } + + if ( function_exists( 'register_postsend_function' ) ) { + // https://github.com/facebook/hhvm/issues/1230 + register_postsend_function( array( $this, 'postSendUpdates' ) ); + } elseif ( function_exists( 'fastcgi_finish_request' ) ) { + fastcgi_finish_request(); + $this->postSendUpdates(); + } else { + $this->postSendUpdates(); + } + } + + /** + * This function does work that can be done *after* the + * user gets the HTTP response so they don't block on it + * + * @since 1.26 + */ + public function postSendUpdates() { + try { + JobQueueGroup::singleton()->pushLazyJobs(); $this->triggerJobs(); $this->restInPeace(); } catch ( Exception $e ) { @@ -605,6 +625,9 @@ class MediaWiki { // Do any deferred jobs DeferredUpdates::doUpdates( 'commit' ); + // Make sure any lazy jobs are pushed + JobQueueGroup::singleton()->pushLazyJobs(); + // Log profiling data, e.g. in the database or UDP wfLogProfilingData(); diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php index fdf7b876cc..72d2537ee1 100644 --- a/includes/jobqueue/JobQueueGroup.php +++ b/includes/jobqueue/JobQueueGroup.php @@ -40,6 +40,9 @@ class JobQueueGroup { /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ protected $coalescedQueues; + /** @var Job[] */ + protected $bufferedJobs = array(); + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job @@ -100,13 +103,13 @@ class JobQueueGroup { } /** - * Insert jobs into the respective queues of with the belong. + * Insert jobs into the respective queues of which they belong * * This inserts the jobs into the queue specified by $wgJobTypeConf * and updates the aggregate job queue information cache as needed. * - * @param Job|Job[] $jobs A single Job or a list of Jobs - * @throws MWException + * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs + * @throws InvalidArgumentException * @return void */ public function push( $jobs ) { @@ -115,13 +118,11 @@ class JobQueueGroup { return; } + $this->assertValidJobs( $jobs ); + $jobsByType = array(); // (job type => list of jobs) foreach ( $jobs as $job ) { - if ( $job instanceof IJobSpecification ) { - $jobsByType[$job->getType()][] = $job; - } else { - throw new MWException( "Attempted to push a non-Job object into a queue." ); - } + $jobsByType[$job->getType()][] = $job; } foreach ( $jobsByType as $type => $jobs ) { @@ -136,6 +137,41 @@ class JobQueueGroup { } } + /** + * Buffer jobs for insertion via push() or call it now if in CLI mode + * + * Note that MediaWiki::restInPeace() calls pushLazyJobs() + * + * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs + * @return void + * @since 1.26 + */ + public function lazyPush( $jobs ) { + if ( PHP_SAPI === 'cli' ) { + $this->push( $jobs ); + return; + } + + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + + // Throw errors now instead of on push(), when other jobs may be buffered + $this->assertValidJobs( $jobs ); + + $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs ); + } + + /** + * Push all jobs buffered via lazyPush() into their respective queues + * + * @return void + * @since 1.26 + */ + public function pushLazyJobs() { + $this->push( $this->bufferedJobs ); + + $this->bufferedJobs = array(); + } + /** * Pop a job off one of the job queues * @@ -188,10 +224,10 @@ class JobQueueGroup { * Acknowledge that a job was completed * * @param Job $job - * @return bool + * @return void */ public function ack( Job $job ) { - return $this->get( $job->getType() )->ack( $job ); + $this->get( $job->getType() )->ack( $job ); } /** @@ -211,7 +247,6 @@ class JobQueueGroup { * This does nothing for certain queue classes. * * @return void - * @throws MWException */ public function waitForBackups() { global $wgJobTypeConf; @@ -364,4 +399,24 @@ class JobQueueGroup { } } } + + /** + * @param array $jobs + * @throws InvalidArgumentException + */ + private function assertValidJobs( array $jobs ) { + foreach ( $jobs as $job ) { // sanity checks + if ( !( $job instanceof IJobSpecification ) ) { + throw new InvalidArgumentException( "Expected IJobSpecification objects" ); + } + } + } + + function __destruct() { + $n = count( $this->bufferedJobs ); + if ( $n > 0 ) { + trigger_error( __METHOD__ . ": $n buffered job(s) never inserted." ); + $this->pushLazyJobs(); // try to do it now + } + } } diff --git a/includes/page/WikiPage.php b/includes/page/WikiPage.php index 468f898eef..914522ffbb 100644 --- a/includes/page/WikiPage.php +++ b/includes/page/WikiPage.php @@ -3416,7 +3416,7 @@ class WikiPage implements Page, IDBAccessObject { $params['isOpportunistic'] = true; $params['rootJobTimestamp'] = $parserOutput->getCacheTime(); - JobQueueGroup::singleton()->push( EnqueueJob::newFromLocalJobs( + JobQueueGroup::singleton()->lazyPush( EnqueueJob::newFromLocalJobs( new JobSpecification( 'refreshLinks', $params, array( 'removeDuplicates' => true ), $this->mTitle ) ) );