/** @var Array (partition names => integer) */
protected $partitionsNoPush = array();
+ /** @var HashRing */
+ protected $partitionRing;
/** @var Array (partition name => JobQueue) */
protected $partitionQueues = array();
/** @var BagOStuff */
*/
protected function __construct( array $params ) {
parent::__construct( $params );
- $this->sectionsByWiki = $params['sectionsByWiki'];
+ $this->sectionsByWiki = isset( $params['sectionsByWiki'] )
+ ? $params['sectionsByWiki']
+ : array(); // all in "default" section
$this->partitionsBySection = $params['partitionsBySection'];
$this->configByPartition = $params['configByPartition'];
if ( isset( $params['partitionsNoPush'] ) ) {
$baseConfig + $this->configByPartition[$partition]
);
}
+ // Get the ring of partitions to push job de-duplication information into
+ $partitionsTry = array_diff_key(
+ $this->getPartitionMap(),
+ $this->partitionsNoPush
+ ); // (partition => weight)
+ $this->partitionRing = new HashRing( $partitionsTry );
// Aggregate cache some per-queue values if there are multiple partition queues
$this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff();
}
return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
}
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ $params = $job->getRootJobParams();
+ $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+ try {
+ return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
+ } catch ( MWException $e ) {
+ if ( isset( $partitions[1] ) ) { // check fallback partition
+ return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
+ }
+ }
+ return false;
+ }
+
+ protected function doDeduplicateRootJob( Job $job ) {
+ $params = $job->getRootJobParams();
+ $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+ try {
+ return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
+ } catch ( MWException $e ) {
+ if ( isset( $partitions[1] ) ) { // check fallback partition
+ return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
+ }
+ }
+ return false;
+ }
+
protected function doDelete() {
foreach ( $this->partitionQueues as $queue ) {
$queue->doDelete();