* @since 1.22
*/
class JobQueueFederated extends JobQueue {
- /** @var array (partition name => weight) reverse sorted by weight */
- protected $partitionMap = array();
-
- /** @var array (partition name => JobQueue) reverse sorted by weight */
- protected $partitionQueues = array();
-
+ /** @var HashRing */
+ protected $partitionRing;
/** @var HashRing */
protected $partitionPushRing;
+ /** @var array (partition name => JobQueue) reverse sorted by weight */
+ protected $partitionQueues = array();
/** @var BagOStuff */
protected $cache;
? $params['maxPartitionsTry']
: 2;
// Get the full partition map
- $this->partitionMap = $params['partitionsBySection'][$section];
- arsort( $this->partitionMap, SORT_NUMERIC );
+ $partitionMap = $params['partitionsBySection'][$section];
+ arsort( $partitionMap, SORT_NUMERIC );
// Get the partitions jobs can actually be pushed to
- $partitionPushMap = $this->partitionMap;
+ $partitionPushMap = $partitionMap;
if ( isset( $params['partitionsNoPush'] ) ) {
foreach ( $params['partitionsNoPush'] as $partition ) {
unset( $partitionPushMap[$partition] );
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
// Get the partition queue objects
- foreach ( $this->partitionMap as $partition => $w ) {
+ foreach ( $partitionMap as $partition => $w ) {
if ( !isset( $params['configByPartition'][$partition] ) ) {
throw new MWException( "No configuration for partition '$partition'." );
}
$this->partitionQueues[$partition] = JobQueue::factory(
$baseConfig + $params['configByPartition'][$partition] );
}
+ // Ring of all partitions
+ $this->partitionRing = new HashRing( $partitionMap );
// Get the ring of partitions to push jobs into
- $this->partitionPushRing = new HashRing( $partitionPushMap );
+ if ( count( $partitionPushMap ) === count( $partitionMap ) ) {
+ $this->partitionPushRing = clone $this->partitionRing; // faster
+ } else {
+ $this->partitionPushRing = new HashRing( $partitionPushMap );
+ }
// Aggregate cache some per-queue values if there are multiple partition queues
- $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
+ $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
}
protected function supportedOrders() {
// Retry to insert any remaning jobs again, ignoring the bad partitions.
$jobsLeft = $jobs;
for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
+ try {
+ $partitionRing->getLiveRing();
+ } catch ( UnexpectedValueException $e ) {
+ break; // all servers down; nothing to insert to
+ }
$jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
}
if ( count( $jobsLeft ) ) {
foreach ( $jobs as $key => $job ) {
if ( $job->ignoreDuplicates() ) {
$sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
- $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job;
+ $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
unset( $jobs[$key] );
}
}
$key = $this->getCacheKey( 'empty' );
$this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
} else {
- $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
- if ( !$partitionRing ) {
+ if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
$jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
// Insert the jobs that are not de-duplicated into the queues...
foreach ( $nuJobBatches as $jobBatch ) {
- $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() );
+ $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
$queue = $this->partitionQueues[$partition];
try {
$ok = true;
$key = $this->getCacheKey( 'empty' );
$this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
} else {
- $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
- if ( !$partitionRing ) {
+ if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
$jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
return false;
}
- $partitionsTry = $this->partitionMap; // (partition => weight)
+ $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
$failed = 0;
while ( count( $partitionsTry ) ) {
protected function doIsRootJobOldDuplicate( Job $job ) {
$params = $job->getRootJobParams();
- $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
+ $sigature = $params['rootJobSignature'];
+ $partition = $this->partitionPushRing->getLiveLocation( $sigature );
try {
- return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
+ return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
} catch ( JobQueueError $e ) {
- if ( isset( $partitions[1] ) ) { // check fallback partition
- return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
+ if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
}
}
protected function doDeduplicateRootJob( Job $job ) {
$params = $job->getRootJobParams();
- $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
+ $sigature = $params['rootJobSignature'];
+ $partition = $this->partitionPushRing->getLiveLocation( $sigature );
try {
- return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
+ return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
} catch ( JobQueueError $e ) {
- if ( isset( $partitions[1] ) ) { // check fallback partition
- return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
+ if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+ return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
}
}
public function getCoalesceLocationInternal() {
return "JobQueueFederated:wiki:{$this->wiki}" .
- sha1( serialize( array_keys( $this->partitionMap ) ) );
+ sha1( serialize( array_keys( $this->partitionQueues ) ) );
}
protected function doGetSiblingQueuesWithJobs( array $types ) {
/** @var Array (location => (start, end)) */
protected $ring = array();
+ /** @var Array (location => (start, end)) */
+ protected $liveRing;
+ /** @var Array (location => UNIX timestamp) */
+ protected $ejectionExpiries = array();
+ /** @var integer UNIX timestamp */
+ protected $ejectionNextExpiry = INF;
+
const RING_SIZE = 268435456; // 2^28
/**
}
/**
- * Get the location of an item on the ring, as well as the next clockwise locations
+ * Get the location of an item on the ring, as well as the next locations
*
* @param string $item
* @param integer $limit Maximum number of locations to return
public function newWithoutLocation( $location ) {
$map = $this->sourceMap;
unset( $map[$location] );
- if ( count( $map ) ) {
- return new self( $map );
+
+ return count( $map ) ? new self( $map ) : false;
+ }
+
+ /**
+ * Remove a location from the "live" hash ring
+ *
+ * @param string $location
+ * @param integer $ttl Seconds
+ * @return bool Whether some non-ejected locations are left
+ */
+ public function ejectFromLiveRing( $location, $ttl ) {
+ if ( !isset( $this->sourceMap[$location] ) ) {
+ throw new UnexpectedValueException( "No location '$location' in the ring." );
+ }
+ $expiry = time() + $ttl;
+ $this->liveRing = null; // stale
+ $this->ejectionExpiries[$location] = $expiry;
+ $this->ejectionNextExpiry = min( $expiry, $this->ejectionNextExpiry );
+
+ return ( count( $this->ejectionExpiries ) < count( $this->sourceMap ) );
+ }
+
+ /**
+ * Get the "live" hash ring (which does not include ejected locations)
+ *
+ * @return HashRing
+ * @throws UnexpectedValueException
+ */
+ public function getLiveRing() {
+ $now = time();
+ if ( $this->liveRing === null || $this->ejectionNextExpiry <= $now ) {
+ $this->ejectionExpiries = array_filter(
+ $this->ejectionExpiries,
+ function( $expiry ) use ( $now ) {
+ return ( $expiry > $now );
+ }
+ );
+ if ( count( $this->ejectionExpiries ) ) {
+ $map = array_diff_key( $this->sourceMap, $this->ejectionExpiries );
+ $this->liveRing = count( $map ) ? new self( $map ) : false;
+
+ $this->ejectionNextExpiry = min( $this->ejectionExpiries );
+ } else { // common case; avoid recalculating ring
+ $this->liveRing = clone $this;
+ $this->liveRing->ejectionExpiries = array();
+ $this->liveRing->ejectionNextExpiry = INF;
+ $this->liveRing->liveRing = null;
+
+ $this->ejectionNextExpiry = INF;
+ }
+ }
+ if ( !$this->liveRing ) {
+ throw UnexpectedValueException( "The live ring is currently empty." );
}
- return false;
+ return $this->liveRing;
+ }
+
+ /**
+ * Get the location of an item on the "live" ring
+ *
+ * @param string $item
+ * @return string Location
+ * @throws UnexpectedValueException
+ */
+ public function getLiveLocation( $item ) {
+ return $this->getLiveRing()->getLocation( $item );
+ }
+
+ /**
+ * Get the location of an item on the "live" ring, as well as the next locations
+ *
+ * @param string $item
+ * @param integer $limit Maximum number of locations to return
+ * @return array List of locations
+ * @throws UnexpectedValueException
+ */
+ public function getLiveLocations( $item ) {
+ return $this->getLiveRing()->getLocations( $item );
+ }
+
+ /**
+ * Get the map of "live" locations to weight (ignores 0-weight items)
+ *
+ * @return array
+ * @throws UnexpectedValueException
+ */
+ public function getLiveLocationWeights() {
+ return $this->getLiveRing()->getLocationWeights();
}
}