Improved HashRing usage in JobQueueFederated
authorAaron Schulz <aschulz@wikimedia.org>
Tue, 8 Apr 2014 01:53:38 +0000 (18:53 -0700)
committerOri.livneh <ori@wikimedia.org>
Fri, 18 Apr 2014 20:28:48 +0000 (20:28 +0000)
* Added proper ejection and caching in HashRing to avoid rebuilding
  the hash all the time (or doing manual caching).
* Made JobQueueFederated blacklist failing servers for a few seconds.
* Also made the JobQueueFederated root job methods properly respect
  the weights when they fail over.

Change-Id: Ifa4c03272c1777cfff2523ab21f780074ddcf359

includes/jobqueue/JobQueueFederated.php
includes/libs/HashRing.php

index f2599ae..05ded4e 100644 (file)
  * @since 1.22
  */
 class JobQueueFederated extends JobQueue {
-       /** @var array (partition name => weight) reverse sorted by weight */
-       protected $partitionMap = array();
-
-       /** @var array (partition name => JobQueue) reverse sorted by weight */
-       protected $partitionQueues = array();
-
+       /** @var HashRing */
+       protected $partitionRing;
        /** @var HashRing */
        protected $partitionPushRing;
+       /** @var array (partition name => JobQueue) reverse sorted by weight */
+       protected $partitionQueues = array();
 
        /** @var BagOStuff */
        protected $cache;
@@ -97,10 +95,10 @@ class JobQueueFederated extends JobQueue {
                        ? $params['maxPartitionsTry']
                        : 2;
                // Get the full partition map
-               $this->partitionMap = $params['partitionsBySection'][$section];
-               arsort( $this->partitionMap, SORT_NUMERIC );
+               $partitionMap = $params['partitionsBySection'][$section];
+               arsort( $partitionMap, SORT_NUMERIC );
                // Get the partitions jobs can actually be pushed to
-               $partitionPushMap = $this->partitionMap;
+               $partitionPushMap = $partitionMap;
                if ( isset( $params['partitionsNoPush'] ) ) {
                        foreach ( $params['partitionsNoPush'] as $partition ) {
                                unset( $partitionPushMap[$partition] );
@@ -114,17 +112,23 @@ class JobQueueFederated extends JobQueue {
                        unset( $baseConfig[$o] ); // partition queue doesn't care about this
                }
                // Get the partition queue objects
-               foreach ( $this->partitionMap as $partition => $w ) {
+               foreach ( $partitionMap as $partition => $w ) {
                        if ( !isset( $params['configByPartition'][$partition] ) ) {
                                throw new MWException( "No configuration for partition '$partition'." );
                        }
                        $this->partitionQueues[$partition] = JobQueue::factory(
                                $baseConfig + $params['configByPartition'][$partition] );
                }
+               // Ring of all partitions
+               $this->partitionRing = new HashRing( $partitionMap );
                // Get the ring of partitions to push jobs into
-               $this->partitionPushRing = new HashRing( $partitionPushMap );
+               if ( count( $partitionPushMap ) === count( $partitionMap ) ) {
+                       $this->partitionPushRing = clone $this->partitionRing; // faster
+               } else {
+                       $this->partitionPushRing = new HashRing( $partitionPushMap );
+               }
                // Aggregate cache some per-queue values if there are multiple partition queues
-               $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
+               $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
        }
 
        protected function supportedOrders() {
@@ -218,6 +222,11 @@ class JobQueueFederated extends JobQueue {
                // Retry to insert any remaning jobs again, ignoring the bad partitions.
                $jobsLeft = $jobs;
                for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
+                       try {
+                               $partitionRing->getLiveRing();
+                       } catch ( UnexpectedValueException $e ) {
+                               break; // all servers down; nothing to insert to
+                       }
                        $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
                }
                if ( count( $jobsLeft ) ) {
@@ -246,7 +255,7 @@ class JobQueueFederated extends JobQueue {
                foreach ( $jobs as $key => $job ) {
                        if ( $job->ignoreDuplicates() ) {
                                $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
-                               $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job;
+                               $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
                                unset( $jobs[$key] );
                        }
                }
@@ -275,8 +284,7 @@ class JobQueueFederated extends JobQueue {
                                $key = $this->getCacheKey( 'empty' );
                                $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                        } else {
-                               $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
-                               if ( !$partitionRing ) {
+                               if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
                                        throw new JobQueueError( "Could not insert job(s), no partitions available." );
                                }
                                $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
@@ -285,7 +293,7 @@ class JobQueueFederated extends JobQueue {
 
                // Insert the jobs that are not de-duplicated into the queues...
                foreach ( $nuJobBatches as $jobBatch ) {
-                       $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() );
+                       $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
                        $queue = $this->partitionQueues[$partition];
                        try {
                                $ok = true;
@@ -298,8 +306,7 @@ class JobQueueFederated extends JobQueue {
                                $key = $this->getCacheKey( 'empty' );
                                $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                        } else {
-                               $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
-                               if ( !$partitionRing ) {
+                               if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
                                        throw new JobQueueError( "Could not insert job(s), no partitions available." );
                                }
                                $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
@@ -317,7 +324,7 @@ class JobQueueFederated extends JobQueue {
                        return false;
                }
 
-               $partitionsTry = $this->partitionMap; // (partition => weight)
+               $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
 
                $failed = 0;
                while ( count( $partitionsTry ) ) {
@@ -360,12 +367,14 @@ class JobQueueFederated extends JobQueue {
 
        protected function doIsRootJobOldDuplicate( Job $job ) {
                $params = $job->getRootJobParams();
-               $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
+               $sigature = $params['rootJobSignature'];
+               $partition = $this->partitionPushRing->getLiveLocation( $sigature );
                try {
-                       return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
+                       return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
                } catch ( JobQueueError $e ) {
-                       if ( isset( $partitions[1] ) ) { // check fallback partition
-                               return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
+                       if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
+                               $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+                               return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
                        }
                }
 
@@ -374,12 +383,14 @@ class JobQueueFederated extends JobQueue {
 
        protected function doDeduplicateRootJob( Job $job ) {
                $params = $job->getRootJobParams();
-               $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
+               $sigature = $params['rootJobSignature'];
+               $partition = $this->partitionPushRing->getLiveLocation( $sigature );
                try {
-                       return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
+                       return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
                } catch ( JobQueueError $e ) {
-                       if ( isset( $partitions[1] ) ) { // check fallback partition
-                               return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
+                       if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
+                               $partition = $this->partitionPushRing->getLiveLocation( $sigature );
+                               return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
                        }
                }
 
@@ -470,7 +481,7 @@ class JobQueueFederated extends JobQueue {
 
        public function getCoalesceLocationInternal() {
                return "JobQueueFederated:wiki:{$this->wiki}" .
-                       sha1( serialize( array_keys( $this->partitionMap ) ) );
+                       sha1( serialize( array_keys( $this->partitionQueues ) ) );
        }
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
index 6925c7f..df99098 100644 (file)
@@ -32,6 +32,13 @@ class HashRing {
        /** @var Array (location => (start, end)) */
        protected $ring = array();
 
+       /** @var Array (location => (start, end)) */
+       protected $liveRing;
+       /** @var Array (location => UNIX timestamp) */
+       protected $ejectionExpiries = array();
+       /** @var integer UNIX timestamp */
+       protected $ejectionNextExpiry = INF;
+
        const RING_SIZE = 268435456; // 2^28
 
        /**
@@ -84,7 +91,7 @@ class HashRing {
        }
 
        /**
-        * Get the location of an item on the ring, as well as the next clockwise locations
+        * Get the location of an item on the ring, as well as the next locations
         *
         * @param string $item
         * @param integer $limit Maximum number of locations to return
@@ -138,10 +145,95 @@ class HashRing {
        public function newWithoutLocation( $location ) {
                $map = $this->sourceMap;
                unset( $map[$location] );
-               if ( count( $map ) ) {
-                       return new self( $map );
+
+               return count( $map ) ? new self( $map ) : false;
+       }
+
+       /**
+        * Remove a location from the "live" hash ring
+        *
+        * @param string $location
+        * @param integer $ttl Seconds
+        * @return bool Whether some non-ejected locations are left
+        */
+       public function ejectFromLiveRing( $location, $ttl ) {
+               if ( !isset( $this->sourceMap[$location] ) ) {
+                       throw new UnexpectedValueException( "No location '$location' in the ring." );
+               }
+               $expiry = time() + $ttl;
+               $this->liveRing = null; // stale
+               $this->ejectionExpiries[$location] = $expiry;
+               $this->ejectionNextExpiry = min( $expiry, $this->ejectionNextExpiry );
+
+               return ( count( $this->ejectionExpiries ) < count( $this->sourceMap ) );
+       }
+
+       /**
+        * Get the "live" hash ring (which does not include ejected locations)
+        *
+        * @return HashRing
+        * @throws UnexpectedValueException
+        */
+       public function getLiveRing() {
+               $now = time();
+               if ( $this->liveRing === null || $this->ejectionNextExpiry <= $now ) {
+                       $this->ejectionExpiries = array_filter(
+                               $this->ejectionExpiries,
+                               function( $expiry ) use ( $now ) {
+                                       return ( $expiry > $now );
+                               }
+                       );
+                       if ( count( $this->ejectionExpiries ) ) {
+                               $map = array_diff_key( $this->sourceMap, $this->ejectionExpiries );
+                               $this->liveRing = count( $map ) ? new self( $map ) : false;
+
+                               $this->ejectionNextExpiry = min( $this->ejectionExpiries );
+                       } else { // common case; avoid recalculating ring
+                               $this->liveRing = clone $this;
+                               $this->liveRing->ejectionExpiries = array();
+                               $this->liveRing->ejectionNextExpiry = INF;
+                               $this->liveRing->liveRing = null;
+
+                               $this->ejectionNextExpiry = INF;
+                       }
+               }
+               if ( !$this->liveRing ) {
+                       throw UnexpectedValueException( "The live ring is currently empty." );
                }
 
-               return false;
+               return $this->liveRing;
+       }
+
+       /**
+        * Get the location of an item on the "live" ring
+        *
+        * @param string $item
+        * @return string Location
+        * @throws UnexpectedValueException
+        */
+       public function getLiveLocation( $item ) {
+               return $this->getLiveRing()->getLocation( $item );
+       }
+
+       /**
+        * Get the location of an item on the "live" ring, as well as the next locations
+        *
+        * @param string $item
+        * @param integer $limit Maximum number of locations to return
+        * @return array List of locations
+        * @throws UnexpectedValueException
+        */
+       public function getLiveLocations( $item ) {
+               return $this->getLiveRing()->getLocations( $item );
+       }
+
+       /**
+        * Get the map of "live" locations to weight (ignores 0-weight items)
+        *
+        * @return array
+        * @throws UnexpectedValueException
+        */
+       public function getLiveLocationWeights() {
+               return $this->getLiveRing()->getLocationWeights();
        }
 }