/** @var BagOStuff */
protected $dupCache;
+ /** @var JobQueueAggregator */
+ protected $aggr;
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
throw new MWException( __CLASS__ . " does not support delayed jobs." );
}
$this->dupCache = wfGetCache( CACHE_ANYTHING );
+ $this->aggr = isset( $params['aggregator'] )
+ ? $params['aggregator']
+ : new JobQueueAggregatorNull( array() );
}
/**
* @throws JobQueueError
*/
final public function push( $jobs, $flags = 0 ) {
- $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ $this->batchPush( $jobs, $flags );
}
/**
}
$this->doBatchPush( $jobs, $flags );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
/**
$job = $this->doPop();
+ if ( !$job ) {
+ $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+ }
+
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {