/** @var BagOStuff */
protected $dupCache;
+ /** @var JobQueueAggregator */
+ protected $aggr;
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
throw new MWException( __CLASS__ . " does not support delayed jobs." );
}
$this->dupCache = wfGetCache( CACHE_ANYTHING );
+ $this->aggr = isset( $params['aggregator'] )
+ ? $params['aggregator']
+ : new JobQueueAggregatorNull( array() );
}
/**
* @throws JobQueueError
*/
final public function push( $jobs, $flags = 0 ) {
- $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ $this->batchPush( $jobs, $flags );
}
/**
}
$this->doBatchPush( $jobs, $flags );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
/**
$job = $this->doPop();
+ if ( !$job ) {
+ $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+ }
+
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
$affected = $dbw->affectedRows();
$count += $affected;
JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
+ // The tasks recycled jobs or release delayed jobs into the queue
$this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
}
} else {
$conf = $conf + $wgJobTypeConf['default'];
}
+ $conf['aggregator'] = JobQueueAggregator::singleton();
return JobQueue::factory( $conf );
}
foreach ( $jobsByType as $type => $jobs ) {
$this->get( $type )->push( $jobs );
- JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
}
if ( $this->cache->has( 'queues-ready', 'list' ) ) {
if ( is_string( $qtype ) ) { // specific job type
if ( !in_array( $qtype, $blacklist ) ) {
$job = $this->get( $qtype )->pop();
- if ( !$job ) {
- JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
- }
}
} else { // any job in the "default" jobs types
if ( $flags & self::USE_CACHE ) {
if ( $job ) { // found
break;
} else { // not found
- JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
$this->cache->clear( 'queues-ready' );
}
}
}
}
}
- // The tasks may have recycled jobs or release delayed jobs into the queue
- if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
- JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
- }
}
if ( $count === 0 ) {
/**
* @param array $params
*/
- protected function __construct( array $params ) {
+ public function __construct( array $params ) {
}
/**
* If a hostname is specified but no port, the standard port number
* 6379 will be used. Required.
*/
- protected function __construct( array $params ) {
+ public function __construct( array $params ) {
parent::__construct( $params );
$this->servers = isset( $params['redisServers'] )
? $params['redisServers']