From 37042262e32eba32b6c354f77d8cac98651fb6f7 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Mon, 16 Feb 2015 15:34:53 -0800 Subject: [PATCH] Moved some JobQueueAggregator logic out of JobQueueGroup Change-Id: I28ba1a25db225d4cf5f503a6c0f4405f13118151 --- includes/jobqueue/JobQueue.php | 13 ++++++++++++- includes/jobqueue/JobQueueDB.php | 2 ++ includes/jobqueue/JobQueueGroup.php | 10 +--------- includes/jobqueue/aggregator/JobQueueAggregator.php | 2 +- .../jobqueue/aggregator/JobQueueAggregatorRedis.php | 2 +- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php index 53fcaeeda0..1a730d3049 100644 --- a/includes/jobqueue/JobQueue.php +++ b/includes/jobqueue/JobQueue.php @@ -49,6 +49,8 @@ abstract class JobQueue { /** @var BagOStuff */ protected $dupCache; + /** @var JobQueueAggregator */ + protected $aggr; const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions @@ -76,6 +78,9 @@ abstract class JobQueue { throw new MWException( __CLASS__ . " does not support delayed jobs." ); } $this->dupCache = wfGetCache( CACHE_ANYTHING ); + $this->aggr = isset( $params['aggregator'] ) + ? $params['aggregator'] + : new JobQueueAggregatorNull( array() ); } /** @@ -298,7 +303,8 @@ abstract class JobQueue { * @throws JobQueueError */ final public function push( $jobs, $flags = 0 ) { - $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + $this->batchPush( $jobs, $flags ); } /** @@ -327,6 +333,7 @@ abstract class JobQueue { } $this->doBatchPush( $jobs, $flags ); + $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); } /** @@ -356,6 +363,10 @@ abstract class JobQueue { $job = $this->doPop(); + if ( !$job ) { + $this->aggr->notifyQueueEmpty( $this->wiki, $this->type ); + } + // Flag this job as an old duplicate based on its "root" job... try { if ( $job && $this->isRootJobOldDuplicate( $job ) ) { diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index 5e8399c914..d5f47ffda0 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -686,7 +686,9 @@ class JobQueueDB extends JobQueue { $affected = $dbw->affectedRows(); $count += $affected; JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki ); + // The tasks recycled jobs or release delayed jobs into the queue $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); } } diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php index dbb85d7327..ebd547a0e5 100644 --- a/includes/jobqueue/JobQueueGroup.php +++ b/includes/jobqueue/JobQueueGroup.php @@ -94,6 +94,7 @@ class JobQueueGroup { } else { $conf = $conf + $wgJobTypeConf['default']; } + $conf['aggregator'] = JobQueueAggregator::singleton(); return JobQueue::factory( $conf ); } @@ -125,7 +126,6 @@ class JobQueueGroup { foreach ( $jobsByType as $type => $jobs ) { $this->get( $type )->push( $jobs ); - JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); } if ( $this->cache->has( 'queues-ready', 'list' ) ) { @@ -153,9 +153,6 @@ class JobQueueGroup { if ( is_string( $qtype ) ) { // specific job type if ( !in_array( $qtype, $blacklist ) ) { $job = $this->get( $qtype )->pop(); - if ( !$job ) { - JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); - } } } else { // any job in the "default" jobs types if ( $flags & self::USE_CACHE ) { @@ -179,7 +176,6 @@ class JobQueueGroup { if ( $job ) { // found break; } else { // not found - JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); $this->cache->clear( 'queues-ready' ); } } @@ -381,10 +377,6 @@ class JobQueueGroup { } } } - // The tasks may have recycled jobs or release delayed jobs into the queue - if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) { - JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); - } } if ( $count === 0 ) { diff --git a/includes/jobqueue/aggregator/JobQueueAggregator.php b/includes/jobqueue/aggregator/JobQueueAggregator.php index 4c2dfadf30..febc277a51 100644 --- a/includes/jobqueue/aggregator/JobQueueAggregator.php +++ b/includes/jobqueue/aggregator/JobQueueAggregator.php @@ -34,7 +34,7 @@ abstract class JobQueueAggregator { /** * @param array $params */ - protected function __construct( array $params ) { + public function __construct( array $params ) { } /** diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php index db9e764c3b..847dd6f4b7 100644 --- a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php +++ b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php @@ -44,7 +44,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { * If a hostname is specified but no port, the standard port number * 6379 will be used. Required. */ - protected function __construct( array $params ) { + public function __construct( array $params ) { parent::__construct( $params ); $this->servers = isset( $params['redisServers'] ) ? $params['redisServers'] -- 2.20.1