jobqueue: various cleanups to JobQueueFederated
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 19 Jun 2013 01:35:14 +0000 (18:35 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Fri, 18 Oct 2013 00:39:02 +0000 (17:39 -0700)
* The relevant partition information is now stored in
  the fields instead of the general information for all wikis.
* Also improved hash ring caching and made getCoalesceLocationInternal()
  more robust.
* Improved availability of sibling queue methods.

Change-Id: I7ac268bb1db4615e0f735576c506593d5688e17e

includes/HashRing.php
includes/job/JobQueueFederated.php

index cd39ad8..930f8c0 100644 (file)
@@ -27,6 +27,8 @@
  * @since 1.22
  */
 class HashRing {
+       /** @var Array (location => weight) */
+       protected $sourceMap = array();
        /** @var Array (location => (start, end)) */
        protected $ring = array();
 
@@ -40,6 +42,7 @@ class HashRing {
                if ( !count( $map ) ) {
                        throw new MWException( "Ring is empty or all weights are zero." );
                }
+               $this->sourceMap = $map;
                // Sort the locations based on the hash of their names
                $hashes = array();
                foreach ( $map as $location => $weight ) {
@@ -112,4 +115,28 @@ class HashRing {
                }
                return $locations;
        }
+
+       /**
+        * Get the map of locations to weight (ignores 0-weight items)
+        *
+        * @return array
+        */
+       public function getLocationWeights() {
+               return $this->sourceMap;
+       }
+
+       /**
+        * Get a new hash ring with a location removed from the ring
+        *
+        * @param string $location
+        * @return HashRing|bool Returns false if no non-zero weighted spots are left
+        */
+       public function newWithoutLocation( $location ) {
+               $map = $this->sourceMap;
+               unset( $map[$location] );
+               if ( count( $map ) ) {
+                       return new self( $map );
+               }
+               return false;
+       }
 }
index 4257da4..ce2f3c7 100644 (file)
  *
  * If used for performance, then $wgMainCacheType should be set to memcached/redis.
  * Note that "fifo" cannot be used for the ordering, since the data is distributed.
- * One can still use "timestamp" instead, as in "roughly timestamp ordered".
+ * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also,
+ * queue classes used by this should ignore down servers (with TTL) to avoid slowness.
  *
  * @ingroup JobQueue
  * @since 1.22
  */
 class JobQueueFederated extends JobQueue {
-       /** @var Array (wiki ID => section name) */
-       protected $sectionsByWiki = array();
-       /** @var Array (section name => (partition name => weight)) */
-       protected $partitionsBySection = array();
-       /** @var Array (section name => config array) */
-       protected $configByPartition = array();
-       /** @var Array (partition names => integer) */
-       protected $partitionsNoPush = array();
-
-       /** @var HashRing */
-       protected $partitionRing;
+       /** @var Array (partition name => weight) */
+       protected $partitionMap = array();
        /** @var Array (partition name => JobQueue) */
        protected $partitionQueues = array();
+       /** @var HashRing */
+       protected $partitionPushRing;
        /** @var BagOStuff */
        protected $cache;
 
@@ -82,36 +76,38 @@ class JobQueueFederated extends JobQueue {
         */
        protected function __construct( array $params ) {
                parent::__construct( $params );
-               $this->sectionsByWiki = isset( $params['sectionsByWiki'] )
-                       ? $params['sectionsByWiki']
-                       : array(); // all in "default" section
-               $this->partitionsBySection = $params['partitionsBySection'];
-               $this->configByPartition = $params['configByPartition'];
+               $section = isset( $params['sectionsByWiki'][$this->wiki] )
+                       ? $params['sectionsByWiki'][$this->wiki]
+                       : 'default';
+               if ( !isset( $params['partitionsBySection'][$section] ) ) {
+                       throw new MWException( "No configuration for section '$section'." );
+               }
+               $this->partitionMap = $params['partitionsBySection'][$section];
+               $partitionPushMap = $this->partitionMap;
                if ( isset( $params['partitionsNoPush'] ) ) {
-                       $this->partitionsNoPush = array_flip( $params['partitionsNoPush'] );
+                       foreach ( $params['partitionsNoPush'] as $partition ) {
+                               unset( $partitionPushMap[$partition] );
+                       }
                }
+               // Get the config to pass to merge into each partition queue config
                $baseConfig = $params;
                foreach ( array( 'class', 'sectionsByWiki',
                        'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o )
                {
                        unset( $baseConfig[$o] );
                }
-               foreach ( $this->getPartitionMap() as $partition => $w ) {
-                       if ( !isset( $this->configByPartition[$partition] ) ) {
+               // Get the partition queue objects
+               foreach ( $this->partitionMap as $partition => $w ) {
+                       if ( !isset( $params['configByPartition'][$partition] ) ) {
                                throw new MWException( "No configuration for partition '$partition'." );
                        }
                        $this->partitionQueues[$partition] = JobQueue::factory(
-                               $baseConfig + $this->configByPartition[$partition]
-                       );
+                               $baseConfig + $params['configByPartition'][$partition] );
                }
                // Get the ring of partitions to push job de-duplication information into
-               $partitionsTry = array_diff_key(
-                       $this->getPartitionMap(),
-                       $this->partitionsNoPush
-               ); // (partition => weight)
-               $this->partitionRing = new HashRing( $partitionsTry );
+               $this->partitionPushRing = new HashRing( $partitionPushMap );
                // Aggregate cache some per-queue values if there are multiple partition queues
-               $this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff();
+               $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
        }
 
        protected function supportedOrders() {
@@ -198,36 +194,26 @@ class JobQueueFederated extends JobQueue {
                if ( !count( $jobs ) ) {
                        return true; // nothing to do
                }
-
-               $partitionsTry = array_diff_key(
-                       $this->getPartitionMap(),
-                       $this->partitionsNoPush
-               ); // (partition => weight)
-
+               // Local ring variable that may be changed to point to a new ring on failure
+               $partitionRing = $this->partitionPushRing;
                // Try to insert the jobs and update $partitionsTry on any failures
-               $jobsLeft = $this->tryJobInsertions( $jobs, $partitionsTry, $flags );
+               $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags );
                if ( count( $jobsLeft ) ) { // some jobs failed to insert?
                        // Try to insert the remaning jobs once more, ignoring the bad partitions
-                       return !count( $this->tryJobInsertions( $jobsLeft, $partitionsTry, $flags ) );
-               } else {
-                       return true;
+                       return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) );
                }
+               return true;
        }
 
        /**
         * @param array $jobs
-        * @param array $partitionsTry
+        * @param HashRing $partitionRing
         * @param integer $flags
         * @return array List of Job object that could not be inserted
         */
-       protected function tryJobInsertions( array $jobs, array &$partitionsTry, $flags ) {
-               if ( !count( $partitionsTry ) ) {
-                       return $jobs; // can't insert anything
-               }
-
+       protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
                $jobsLeft = array();
 
-               $partitionRing = new HashRing( $partitionsTry );
                // Because jobs are spread across partitions, per-job de-duplication needs
                // to use a consistent hash to avoid allowing duplicate jobs per partition.
                // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
@@ -262,30 +248,33 @@ class JobQueueFederated extends JobQueue {
                                $key = $this->getCacheKey( 'empty' );
                                $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                        } else {
-                               unset( $partitionsTry[$partition] ); // blacklist partition
+                               $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
+                               if ( !$partitionRing ) {
+                                       throw new JobQueueError( "Could not insert job(s), all partitions are down." );
+                               }
                                $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
                        }
                }
+
                // Insert the jobs that are not de-duplicated into the queues...
                foreach ( $nuJobBatches as $jobBatch ) {
-                       $partition = ArrayUtils::pickRandom( $partitionsTry );
-                       if ( $partition === false ) { // all partitions at 0 weight?
-                               $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
+                       $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() );
+                       $queue = $this->partitionQueues[$partition];
+                       try {
+                               $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
+                       } catch ( JobQueueError $e ) {
+                               $ok = false;
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
+                       if ( $ok ) {
+                               $key = $this->getCacheKey( 'empty' );
+                               $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                        } else {
-                               $queue = $this->partitionQueues[$partition];
-                               try {
-                                       $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
-                               } catch ( JobQueueError $e ) {
-                                       $ok = false;
-                                       wfDebugLog( 'exception', $e->getLogMessage() );
-                               }
-                               if ( $ok ) {
-                                       $key = $this->getCacheKey( 'empty' );
-                                       $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
-                               } else {
-                                       unset( $partitionsTry[$partition] ); // blacklist partition
-                                       $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
+                               $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
+                               if ( !$partitionRing ) {
+                                       throw new JobQueueError( "Could not insert job(s), all partitions are down." );
                                }
+                               $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
                        }
                }
 
@@ -300,7 +289,7 @@ class JobQueueFederated extends JobQueue {
                        return false;
                }
 
-               $partitionsTry = $this->getPartitionMap(); // (partition => weight)
+               $partitionsTry = $this->partitionMap; // (partition => weight)
 
                while ( count( $partitionsTry ) ) {
                        $partition = ArrayUtils::pickRandom( $partitionsTry );
@@ -335,7 +324,7 @@ class JobQueueFederated extends JobQueue {
 
        protected function doIsRootJobOldDuplicate( Job $job ) {
                $params = $job->getRootJobParams();
-               $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+               $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
                try {
                        return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
                } catch ( MWException $e ) {
@@ -348,7 +337,7 @@ class JobQueueFederated extends JobQueue {
 
        protected function doDeduplicateRootJob( Job $job ) {
                $params = $job->getRootJobParams();
-               $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+               $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
                try {
                        return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
                } catch ( MWException $e ) {
@@ -422,17 +411,22 @@ class JobQueueFederated extends JobQueue {
        }
 
        public function getCoalesceLocationInternal() {
-               return "JobQueueFederated:wiki:" . $this->wiki;
+               return "JobQueueFederated:wiki:{$this->wiki}" .
+                       sha1( serialize( array_keys( $this->partitionMap ) ) );
        }
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $result = array();
                foreach ( $this->partitionQueues as $queue ) {
-                       $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
-                       if ( is_array( $nonEmpty ) ) {
-                               $result = array_merge( $result, $nonEmpty );
-                       } else {
-                               return null; // not supported on all partitions; bail
+                       try {
+                               $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
+                               if ( is_array( $nonEmpty ) ) {
+                                       $result = array_merge( $result, $nonEmpty );
+                               } else {
+                                       return null; // not supported on all partitions; bail
+                               }
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
                        }
                }
                return array_values( array_unique( $result ) );
@@ -441,13 +435,17 @@ class JobQueueFederated extends JobQueue {
        protected function doGetSiblingQueueSizes( array $types ) {
                $result = array();
                foreach ( $this->partitionQueues as $queue ) {
-                       $sizes = $queue->doGetSiblingQueueSizes( $types );
-                       if ( is_array( $sizes ) ) {
-                               foreach ( $sizes as $type => $size ) {
-                                       $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+                       try {
+                               $sizes = $queue->doGetSiblingQueueSizes( $types );
+                               if ( is_array( $sizes ) ) {
+                                       foreach ( $sizes as $type => $size ) {
+                                               $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+                                       }
+                               } else {
+                                       return null; // not supported on all partitions; bail
                                }
-                       } else {
-                               return null; // not supported on all partitions; bail
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
                        }
                }
                return $result;
@@ -459,26 +457,6 @@ class JobQueueFederated extends JobQueue {
                }
        }
 
-       /**
-        * @return Array Map of (partition name => weight)
-        */
-       protected function getPartitionMap() {
-               $section = isset( $this->sectionsByWiki[$this->wiki] )
-                       ? $this->sectionsByWiki[$this->wiki]
-                       : 'default';
-               if ( !isset( $this->partitionsBySection[$section] ) ) {
-                       throw new MWException( "No configuration for section '$section'." );
-               }
-               return $this->partitionsBySection[$section];
-       }
-
-       /**
-        * @return bool The queue is actually split up across multiple queue partitions
-        */
-       protected function isFederated() {
-               return ( count( $this->getPartitionMap() ) > 1 );
-       }
-
        /**
         * @return string
         */