class JobQueueFederated extends JobQueue {
/** @var HashRing */
protected $partitionRing;
- /** @var HashRing */
- protected $partitionPushRing;
/** @var array (partition name => JobQueue) reverse sorted by weight */
protected $partitionQueues = array();
* These configuration arrays are passed to JobQueue::factory().
* The options set here are overridden by those passed to this
* the federated queue itself (e.g. 'order' and 'claimTTL').
- * - partitionsNoPush : List of partition names that can handle pop() but not push().
- * This can be used to migrate away from a certain partition.
* - maxPartitionsTry : Maximum number of times to attempt job insertion using
* different partition queues. This improves availability
* during failure, at the cost of added latency and somewhat
// Get the full partition map
$partitionMap = $params['partitionsBySection'][$section];
arsort( $partitionMap, SORT_NUMERIC );
- // Get the partitions jobs can actually be pushed to
- $partitionPushMap = $partitionMap;
- if ( isset( $params['partitionsNoPush'] ) ) {
- foreach ( $params['partitionsNoPush'] as $partition ) {
- unset( $partitionPushMap[$partition] );
- }
- }
// Get the config to pass to merge into each partition queue config
$baseConfig = $params;
foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
- 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
+ 'partitionsBySection', 'configByPartition', ) as $o
) {
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
}
// Ring of all partitions
$this->partitionRing = new HashRing( $partitionMap );
- // Get the ring of partitions to push jobs into
- if ( count( $partitionPushMap ) === count( $partitionMap ) ) {
- $this->partitionPushRing = clone $this->partitionRing; // faster
- } else {
- $this->partitionPushRing = new HashRing( $partitionPushMap );
- }
}
protected function supportedOrders() {
protected function doBatchPush( array $jobs, $flags ) {
// Local ring variable that may be changed to point to a new ring on failure
- $partitionRing = $this->partitionPushRing;
+ $partitionRing = $this->partitionRing;
// Try to insert the jobs and update $partitionsTry on any failures.
// Retry to insert any remaning jobs again, ignoring the bad partitions.
$jobsLeft = $jobs;
protected function doIsRootJobOldDuplicate( Job $job ) {
$params = $job->getRootJobParams();
$sigature = $params['rootJobSignature'];
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
try {
return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
} catch ( JobQueueError $e ) {
- if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
}
}
protected function doDeduplicateRootJob( Job $job ) {
$params = $job->getRootJobParams();
$sigature = $params['rootJobSignature'];
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
try {
return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
} catch ( JobQueueError $e ) {
- if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
- $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionRing->getLiveLocation( $sigature );
return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
}
}