From: Aaron Schulz Date: Tue, 8 Apr 2014 01:53:38 +0000 (-0700) Subject: Improved HashRing usage in JobQueueFederated X-Git-Tag: 1.31.0-rc.0~16151 X-Git-Url: https://git.cyclocoop.org/%7B%24admin_url%7Dmembres/%7B%7B%20url_for%28%27vote%27%2C%20idvote=vote.voteid%29%20%7D%7D?a=commitdiff_plain;h=f7f710287b701dc148f56a4a3ba6f88db3506893;p=lhc%2Fweb%2Fwiklou.git Improved HashRing usage in JobQueueFederated * Added proper ejection and caching in HashRing to avoid rebuilding the hash all the time (or doing manual caching). * Made JobQueueFederated blacklist failing servers for a few seconds. * Also made the JobQueueFederated root job methods properly respect the weights when they fail over. Change-Id: Ifa4c03272c1777cfff2523ab21f780074ddcf359 --- diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php index f2599ae349..05ded4eb0d 100644 --- a/includes/jobqueue/JobQueueFederated.php +++ b/includes/jobqueue/JobQueueFederated.php @@ -47,14 +47,12 @@ * @since 1.22 */ class JobQueueFederated extends JobQueue { - /** @var array (partition name => weight) reverse sorted by weight */ - protected $partitionMap = array(); - - /** @var array (partition name => JobQueue) reverse sorted by weight */ - protected $partitionQueues = array(); - + /** @var HashRing */ + protected $partitionRing; /** @var HashRing */ protected $partitionPushRing; + /** @var array (partition name => JobQueue) reverse sorted by weight */ + protected $partitionQueues = array(); /** @var BagOStuff */ protected $cache; @@ -97,10 +95,10 @@ class JobQueueFederated extends JobQueue { ? $params['maxPartitionsTry'] : 2; // Get the full partition map - $this->partitionMap = $params['partitionsBySection'][$section]; - arsort( $this->partitionMap, SORT_NUMERIC ); + $partitionMap = $params['partitionsBySection'][$section]; + arsort( $partitionMap, SORT_NUMERIC ); // Get the partitions jobs can actually be pushed to - $partitionPushMap = $this->partitionMap; + $partitionPushMap = $partitionMap; if ( isset( $params['partitionsNoPush'] ) ) { foreach ( $params['partitionsNoPush'] as $partition ) { unset( $partitionPushMap[$partition] ); @@ -114,17 +112,23 @@ class JobQueueFederated extends JobQueue { unset( $baseConfig[$o] ); // partition queue doesn't care about this } // Get the partition queue objects - foreach ( $this->partitionMap as $partition => $w ) { + foreach ( $partitionMap as $partition => $w ) { if ( !isset( $params['configByPartition'][$partition] ) ) { throw new MWException( "No configuration for partition '$partition'." ); } $this->partitionQueues[$partition] = JobQueue::factory( $baseConfig + $params['configByPartition'][$partition] ); } + // Ring of all partitions + $this->partitionRing = new HashRing( $partitionMap ); // Get the ring of partitions to push jobs into - $this->partitionPushRing = new HashRing( $partitionPushMap ); + if ( count( $partitionPushMap ) === count( $partitionMap ) ) { + $this->partitionPushRing = clone $this->partitionRing; // faster + } else { + $this->partitionPushRing = new HashRing( $partitionPushMap ); + } // Aggregate cache some per-queue values if there are multiple partition queues - $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); + $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); } protected function supportedOrders() { @@ -218,6 +222,11 @@ class JobQueueFederated extends JobQueue { // Retry to insert any remaning jobs again, ignoring the bad partitions. $jobsLeft = $jobs; for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { + try { + $partitionRing->getLiveRing(); + } catch ( UnexpectedValueException $e ) { + break; // all servers down; nothing to insert to + } $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); } if ( count( $jobsLeft ) ) { @@ -246,7 +255,7 @@ class JobQueueFederated extends JobQueue { foreach ( $jobs as $key => $job ) { if ( $job->ignoreDuplicates() ) { $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); - $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job; + $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job; unset( $jobs[$key] ); } } @@ -275,8 +284,7 @@ class JobQueueFederated extends JobQueue { $key = $this->getCacheKey( 'empty' ); $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); } else { - $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist - if ( !$partitionRing ) { + if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist throw new JobQueueError( "Could not insert job(s), no partitions available." ); } $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted @@ -285,7 +293,7 @@ class JobQueueFederated extends JobQueue { // Insert the jobs that are not de-duplicated into the queues... foreach ( $nuJobBatches as $jobBatch ) { - $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() ); + $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() ); $queue = $this->partitionQueues[$partition]; try { $ok = true; @@ -298,8 +306,7 @@ class JobQueueFederated extends JobQueue { $key = $this->getCacheKey( 'empty' ); $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); } else { - $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist - if ( !$partitionRing ) { + if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist throw new JobQueueError( "Could not insert job(s), no partitions available." ); } $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted @@ -317,7 +324,7 @@ class JobQueueFederated extends JobQueue { return false; } - $partitionsTry = $this->partitionMap; // (partition => weight) + $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight) $failed = 0; while ( count( $partitionsTry ) ) { @@ -360,12 +367,14 @@ class JobQueueFederated extends JobQueue { protected function doIsRootJobOldDuplicate( Job $job ) { $params = $job->getRootJobParams(); - $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); + $sigature = $params['rootJobSignature']; + $partition = $this->partitionPushRing->getLiveLocation( $sigature ); try { - return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); + return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); } catch ( JobQueueError $e ) { - if ( isset( $partitions[1] ) ) { // check fallback partition - return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job ); + if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); } } @@ -374,12 +383,14 @@ class JobQueueFederated extends JobQueue { protected function doDeduplicateRootJob( Job $job ) { $params = $job->getRootJobParams(); - $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); + $sigature = $params['rootJobSignature']; + $partition = $this->partitionPushRing->getLiveLocation( $sigature ); try { - return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); + return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); } catch ( JobQueueError $e ) { - if ( isset( $partitions[1] ) ) { // check fallback partition - return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job ); + if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionPushRing->getLiveLocation( $sigature ); + return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); } } @@ -470,7 +481,7 @@ class JobQueueFederated extends JobQueue { public function getCoalesceLocationInternal() { return "JobQueueFederated:wiki:{$this->wiki}" . - sha1( serialize( array_keys( $this->partitionMap ) ) ); + sha1( serialize( array_keys( $this->partitionQueues ) ) ); } protected function doGetSiblingQueuesWithJobs( array $types ) { diff --git a/includes/libs/HashRing.php b/includes/libs/HashRing.php index 6925c7fb6d..df990981df 100644 --- a/includes/libs/HashRing.php +++ b/includes/libs/HashRing.php @@ -32,6 +32,13 @@ class HashRing { /** @var Array (location => (start, end)) */ protected $ring = array(); + /** @var Array (location => (start, end)) */ + protected $liveRing; + /** @var Array (location => UNIX timestamp) */ + protected $ejectionExpiries = array(); + /** @var integer UNIX timestamp */ + protected $ejectionNextExpiry = INF; + const RING_SIZE = 268435456; // 2^28 /** @@ -84,7 +91,7 @@ class HashRing { } /** - * Get the location of an item on the ring, as well as the next clockwise locations + * Get the location of an item on the ring, as well as the next locations * * @param string $item * @param integer $limit Maximum number of locations to return @@ -138,10 +145,95 @@ class HashRing { public function newWithoutLocation( $location ) { $map = $this->sourceMap; unset( $map[$location] ); - if ( count( $map ) ) { - return new self( $map ); + + return count( $map ) ? new self( $map ) : false; + } + + /** + * Remove a location from the "live" hash ring + * + * @param string $location + * @param integer $ttl Seconds + * @return bool Whether some non-ejected locations are left + */ + public function ejectFromLiveRing( $location, $ttl ) { + if ( !isset( $this->sourceMap[$location] ) ) { + throw new UnexpectedValueException( "No location '$location' in the ring." ); + } + $expiry = time() + $ttl; + $this->liveRing = null; // stale + $this->ejectionExpiries[$location] = $expiry; + $this->ejectionNextExpiry = min( $expiry, $this->ejectionNextExpiry ); + + return ( count( $this->ejectionExpiries ) < count( $this->sourceMap ) ); + } + + /** + * Get the "live" hash ring (which does not include ejected locations) + * + * @return HashRing + * @throws UnexpectedValueException + */ + public function getLiveRing() { + $now = time(); + if ( $this->liveRing === null || $this->ejectionNextExpiry <= $now ) { + $this->ejectionExpiries = array_filter( + $this->ejectionExpiries, + function( $expiry ) use ( $now ) { + return ( $expiry > $now ); + } + ); + if ( count( $this->ejectionExpiries ) ) { + $map = array_diff_key( $this->sourceMap, $this->ejectionExpiries ); + $this->liveRing = count( $map ) ? new self( $map ) : false; + + $this->ejectionNextExpiry = min( $this->ejectionExpiries ); + } else { // common case; avoid recalculating ring + $this->liveRing = clone $this; + $this->liveRing->ejectionExpiries = array(); + $this->liveRing->ejectionNextExpiry = INF; + $this->liveRing->liveRing = null; + + $this->ejectionNextExpiry = INF; + } + } + if ( !$this->liveRing ) { + throw UnexpectedValueException( "The live ring is currently empty." ); } - return false; + return $this->liveRing; + } + + /** + * Get the location of an item on the "live" ring + * + * @param string $item + * @return string Location + * @throws UnexpectedValueException + */ + public function getLiveLocation( $item ) { + return $this->getLiveRing()->getLocation( $item ); + } + + /** + * Get the location of an item on the "live" ring, as well as the next locations + * + * @param string $item + * @param integer $limit Maximum number of locations to return + * @return array List of locations + * @throws UnexpectedValueException + */ + public function getLiveLocations( $item ) { + return $this->getLiveRing()->getLocations( $item ); + } + + /** + * Get the map of "live" locations to weight (ignores 0-weight items) + * + * @return array + * @throws UnexpectedValueException + */ + public function getLiveLocationWeights() { + return $this->getLiveRing()->getLocationWeights(); } }