* @since 1.22
*/
class JobQueueFederated extends JobQueue {
- /** @var Array (partition name => weight) */
+ /** @var Array (partition name => weight) reverse sorted by weight */
protected $partitionMap = array();
- /** @var Array (partition name => JobQueue) */
+ /** @var Array (partition name => JobQueue) reverse sorted by weight */
protected $partitionQueues = array();
/** @var HashRing */
protected $partitionPushRing;
if ( !isset( $params['partitionsBySection'][$section] ) ) {
throw new MWException( "No configuration for section '$section'." );
}
+ // Get the full partition map
$this->partitionMap = $params['partitionsBySection'][$section];
+ arsort( $this->partitionMap, SORT_NUMERIC );
+ // Get the partitions jobs can actually be pushed to
$partitionPushMap = $this->partitionMap;
if ( isset( $params['partitionsNoPush'] ) ) {
foreach ( $params['partitionsNoPush'] as $partition ) {
$this->partitionQueues[$partition] = JobQueue::factory(
$baseConfig + $params['configByPartition'][$partition] );
}
- // Get the ring of partitions to push job de-duplication information into
+ // Get the ring of partitions to push jobs into
$this->partitionPushRing = new HashRing( $partitionPushMap );
// Aggregate cache some per-queue values if there are multiple partition queues
$this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
$partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
try {
return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
- } catch ( MWException $e ) {
+ } catch ( JobQueueError $e ) {
if ( isset( $partitions[1] ) ) { // check fallback partition
return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
}
$partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
try {
return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
- } catch ( MWException $e ) {
+ } catch ( JobQueueError $e ) {
if ( isset( $partitions[1] ) ) { // check fallback partition
return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
}
try {
$nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
if ( is_array( $nonEmpty ) ) {
- $result = array_merge( $result, $nonEmpty );
+ $result = array_unique( array_merge( $result, $nonEmpty ) );
} else {
return null; // not supported on all partitions; bail
}
+ if ( count( $result ) == count( $types ) ) {
+ break; // short-circuit
+ }
} catch ( JobQueueError $e ) {
MWExceptionHandler::logException( $e );
}
}
- return array_values( array_unique( $result ) );
+ return array_values( $result );
}
protected function doGetSiblingQueueSizes( array $types ) {