X-Git-Url: https://git.cyclocoop.org/?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobQueueGroup.php;h=5bd1cc94a858b843af4a8c82b70de10014f380cd;hb=d85d5a3755bf022c615e815557f9b1ddac47dda8;hp=ebd547a0e55a74f2132864db77c138b3e7ae11fd;hpb=b15cd2ee45c5d1dbd58ea3df0becedc43a74ed94;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php index ebd547a0e5..5bd1cc94a8 100644 --- a/includes/jobqueue/JobQueueGroup.php +++ b/includes/jobqueue/JobQueueGroup.php @@ -28,7 +28,7 @@ * @since 1.21 */ class JobQueueGroup { - /** @var array */ + /** @var JobQueueGroup[] */ protected static $instances = array(); /** @var ProcessCacheLRU */ @@ -40,6 +40,9 @@ class JobQueueGroup { /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ protected $coalescedQueues; + /** @var Job[] */ + protected $bufferedJobs = array(); + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job @@ -100,13 +103,13 @@ class JobQueueGroup { } /** - * Insert jobs into the respective queues of with the belong. + * Insert jobs into the respective queues of which they belong * * This inserts the jobs into the queue specified by $wgJobTypeConf * and updates the aggregate job queue information cache as needed. * - * @param Job|Job[] $jobs A single Job or a list of Jobs - * @throws MWException + * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs + * @throws InvalidArgumentException * @return void */ public function push( $jobs ) { @@ -115,13 +118,11 @@ class JobQueueGroup { return; } + $this->assertValidJobs( $jobs ); + $jobsByType = array(); // (job type => list of jobs) foreach ( $jobs as $job ) { - if ( $job instanceof IJobSpecification ) { - $jobsByType[$job->getType()][] = $job; - } else { - throw new MWException( "Attempted to push a non-Job object into a queue." ); - } + $jobsByType[$job->getType()][] = $job; } foreach ( $jobsByType as $type => $jobs ) { @@ -136,6 +137,42 @@ class JobQueueGroup { } } + /** + * Buffer jobs for insertion via push() or call it now if in CLI mode + * + * Note that MediaWiki::restInPeace() calls pushLazyJobs() + * + * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs + * @return void + * @since 1.26 + */ + public function lazyPush( $jobs ) { + if ( PHP_SAPI === 'cli' ) { + $this->push( $jobs ); + return; + } + + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + + // Throw errors now instead of on push(), when other jobs may be buffered + $this->assertValidJobs( $jobs ); + + $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs ); + } + + /** + * Push all jobs buffered via lazyPush() into their respective queues + * + * @return void + * @since 1.26 + */ + public static function pushLazyJobs() { + foreach ( self::$instances as $group ) { + $group->push( $group->bufferedJobs ); + $group->bufferedJobs = array(); + } + } + /** * Pop a job off one of the job queues * @@ -188,10 +225,10 @@ class JobQueueGroup { * Acknowledge that a job was completed * * @param Job $job - * @return bool + * @return void */ public function ack( Job $job ) { - return $this->get( $job->getType() )->ack( $job ); + $this->get( $job->getType() )->ack( $job ); } /** @@ -211,7 +248,6 @@ class JobQueueGroup { * This does nothing for certain queue classes. * * @return void - * @throws MWException */ public function waitForBackups() { global $wgJobTypeConf; @@ -341,69 +377,6 @@ class JobQueueGroup { return $this->coalescedQueues; } - /** - * Execute any due periodic queue maintenance tasks for all queues. - * - * A task is "due" if the time ellapsed since the last run is greater than - * the defined run period. Concurrent calls to this function will cause tasks - * to be attempted twice, so they may need their own methods of mutual exclusion. - * - * @return int Number of tasks run - */ - public function executeReadyPeriodicTasks() { - global $wgMemc; - - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); - $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) - - $count = 0; - $tasksRun = array(); // (queue => task => UNIX timestamp) - foreach ( $this->getQueueTypes() as $type ) { - $queue = $this->get( $type ); - foreach ( $queue->getPeriodicTasks() as $task => $definition ) { - if ( $definition['period'] <= 0 ) { - continue; // disabled - } elseif ( !isset( $lastRuns[$type][$task] ) - || $lastRuns[$type][$task] < ( time() - $definition['period'] ) - ) { - try { - if ( call_user_func( $definition['callback'] ) !== null ) { - $tasksRun[$type][$task] = time(); - ++$count; - } - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - } - } - - if ( $count === 0 ) { - return $count; // nothing to update - } - - $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) { - if ( is_array( $lastRuns ) ) { - foreach ( $tasksRun as $type => $tasks ) { - foreach ( $tasks as $task => $timestamp ) { - if ( !isset( $lastRuns[$type][$task] ) - || $timestamp > $lastRuns[$type][$task] - ) { - $lastRuns[$type][$task] = $timestamp; - } - } - } - } else { - $lastRuns = $tasksRun; - } - - return $lastRuns; - } ); - - return $count; - } - /** * @param string $name * @return mixed @@ -427,4 +400,24 @@ class JobQueueGroup { } } } + + /** + * @param array $jobs + * @throws InvalidArgumentException + */ + private function assertValidJobs( array $jobs ) { + foreach ( $jobs as $job ) { // sanity checks + if ( !( $job instanceof IJobSpecification ) ) { + throw new InvalidArgumentException( "Expected IJobSpecification objects" ); + } + } + } + + function __destruct() { + $n = count( $this->bufferedJobs ); + if ( $n > 0 ) { + $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) ); + trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." ); + } + } }