From: Aaron Schulz Date: Wed, 19 Jun 2013 01:35:14 +0000 (-0700) Subject: jobqueue: various cleanups to JobQueueFederated X-Git-Tag: 1.31.0-rc.0~18480 X-Git-Url: https://git.cyclocoop.org/%7B%24www_url%7Dadmin/compta/banques/?a=commitdiff_plain;h=8c6e0901332333de3c92740e4badd4ea2c4f782b;p=lhc%2Fweb%2Fwiklou.git jobqueue: various cleanups to JobQueueFederated * The relevant partition information is now stored in the fields instead of the general information for all wikis. * Also improved hash ring caching and made getCoalesceLocationInternal() more robust. * Improved availability of sibling queue methods. Change-Id: I7ac268bb1db4615e0f735576c506593d5688e17e --- diff --git a/includes/HashRing.php b/includes/HashRing.php index cd39ad8185..930f8c0aa1 100644 --- a/includes/HashRing.php +++ b/includes/HashRing.php @@ -27,6 +27,8 @@ * @since 1.22 */ class HashRing { + /** @var Array (location => weight) */ + protected $sourceMap = array(); /** @var Array (location => (start, end)) */ protected $ring = array(); @@ -40,6 +42,7 @@ class HashRing { if ( !count( $map ) ) { throw new MWException( "Ring is empty or all weights are zero." ); } + $this->sourceMap = $map; // Sort the locations based on the hash of their names $hashes = array(); foreach ( $map as $location => $weight ) { @@ -112,4 +115,28 @@ class HashRing { } return $locations; } + + /** + * Get the map of locations to weight (ignores 0-weight items) + * + * @return array + */ + public function getLocationWeights() { + return $this->sourceMap; + } + + /** + * Get a new hash ring with a location removed from the ring + * + * @param string $location + * @return HashRing|bool Returns false if no non-zero weighted spots are left + */ + public function newWithoutLocation( $location ) { + $map = $this->sourceMap; + unset( $map[$location] ); + if ( count( $map ) ) { + return new self( $map ); + } + return false; + } } diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index 4257da43a0..ce2f3c71b7 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -40,25 +40,19 @@ * * If used for performance, then $wgMainCacheType should be set to memcached/redis. * Note that "fifo" cannot be used for the ordering, since the data is distributed. - * One can still use "timestamp" instead, as in "roughly timestamp ordered". + * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, + * queue classes used by this should ignore down servers (with TTL) to avoid slowness. * * @ingroup JobQueue * @since 1.22 */ class JobQueueFederated extends JobQueue { - /** @var Array (wiki ID => section name) */ - protected $sectionsByWiki = array(); - /** @var Array (section name => (partition name => weight)) */ - protected $partitionsBySection = array(); - /** @var Array (section name => config array) */ - protected $configByPartition = array(); - /** @var Array (partition names => integer) */ - protected $partitionsNoPush = array(); - - /** @var HashRing */ - protected $partitionRing; + /** @var Array (partition name => weight) */ + protected $partitionMap = array(); /** @var Array (partition name => JobQueue) */ protected $partitionQueues = array(); + /** @var HashRing */ + protected $partitionPushRing; /** @var BagOStuff */ protected $cache; @@ -82,36 +76,38 @@ class JobQueueFederated extends JobQueue { */ protected function __construct( array $params ) { parent::__construct( $params ); - $this->sectionsByWiki = isset( $params['sectionsByWiki'] ) - ? $params['sectionsByWiki'] - : array(); // all in "default" section - $this->partitionsBySection = $params['partitionsBySection']; - $this->configByPartition = $params['configByPartition']; + $section = isset( $params['sectionsByWiki'][$this->wiki] ) + ? $params['sectionsByWiki'][$this->wiki] + : 'default'; + if ( !isset( $params['partitionsBySection'][$section] ) ) { + throw new MWException( "No configuration for section '$section'." ); + } + $this->partitionMap = $params['partitionsBySection'][$section]; + $partitionPushMap = $this->partitionMap; if ( isset( $params['partitionsNoPush'] ) ) { - $this->partitionsNoPush = array_flip( $params['partitionsNoPush'] ); + foreach ( $params['partitionsNoPush'] as $partition ) { + unset( $partitionPushMap[$partition] ); + } } + // Get the config to pass to merge into each partition queue config $baseConfig = $params; foreach ( array( 'class', 'sectionsByWiki', 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o ) { unset( $baseConfig[$o] ); } - foreach ( $this->getPartitionMap() as $partition => $w ) { - if ( !isset( $this->configByPartition[$partition] ) ) { + // Get the partition queue objects + foreach ( $this->partitionMap as $partition => $w ) { + if ( !isset( $params['configByPartition'][$partition] ) ) { throw new MWException( "No configuration for partition '$partition'." ); } $this->partitionQueues[$partition] = JobQueue::factory( - $baseConfig + $this->configByPartition[$partition] - ); + $baseConfig + $params['configByPartition'][$partition] ); } // Get the ring of partitions to push job de-duplication information into - $partitionsTry = array_diff_key( - $this->getPartitionMap(), - $this->partitionsNoPush - ); // (partition => weight) - $this->partitionRing = new HashRing( $partitionsTry ); + $this->partitionPushRing = new HashRing( $partitionPushMap ); // Aggregate cache some per-queue values if there are multiple partition queues - $this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff(); + $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff(); } protected function supportedOrders() { @@ -198,36 +194,26 @@ class JobQueueFederated extends JobQueue { if ( !count( $jobs ) ) { return true; // nothing to do } - - $partitionsTry = array_diff_key( - $this->getPartitionMap(), - $this->partitionsNoPush - ); // (partition => weight) - + // Local ring variable that may be changed to point to a new ring on failure + $partitionRing = $this->partitionPushRing; // Try to insert the jobs and update $partitionsTry on any failures - $jobsLeft = $this->tryJobInsertions( $jobs, $partitionsTry, $flags ); + $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags ); if ( count( $jobsLeft ) ) { // some jobs failed to insert? // Try to insert the remaning jobs once more, ignoring the bad partitions - return !count( $this->tryJobInsertions( $jobsLeft, $partitionsTry, $flags ) ); - } else { - return true; + return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) ); } + return true; } /** * @param array $jobs - * @param array $partitionsTry + * @param HashRing $partitionRing * @param integer $flags * @return array List of Job object that could not be inserted */ - protected function tryJobInsertions( array $jobs, array &$partitionsTry, $flags ) { - if ( !count( $partitionsTry ) ) { - return $jobs; // can't insert anything - } - + protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { $jobsLeft = array(); - $partitionRing = new HashRing( $partitionsTry ); // Because jobs are spread across partitions, per-job de-duplication needs // to use a consistent hash to avoid allowing duplicate jobs per partition. // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. @@ -262,30 +248,33 @@ class JobQueueFederated extends JobQueue { $key = $this->getCacheKey( 'empty' ); $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); } else { - unset( $partitionsTry[$partition] ); // blacklist partition + $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist + if ( !$partitionRing ) { + throw new JobQueueError( "Could not insert job(s), all partitions are down." ); + } $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted } } + // Insert the jobs that are not de-duplicated into the queues... foreach ( $nuJobBatches as $jobBatch ) { - $partition = ArrayUtils::pickRandom( $partitionsTry ); - if ( $partition === false ) { // all partitions at 0 weight? - $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() ); + $queue = $this->partitionQueues[$partition]; + try { + $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + wfDebugLog( 'exception', $e->getLogMessage() ); + } + if ( $ok ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); } else { - $queue = $this->partitionQueues[$partition]; - try { - $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); - } catch ( JobQueueError $e ) { - $ok = false; - wfDebugLog( 'exception', $e->getLogMessage() ); - } - if ( $ok ) { - $key = $this->getCacheKey( 'empty' ); - $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); - } else { - unset( $partitionsTry[$partition] ); // blacklist partition - $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist + if ( !$partitionRing ) { + throw new JobQueueError( "Could not insert job(s), all partitions are down." ); } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted } } @@ -300,7 +289,7 @@ class JobQueueFederated extends JobQueue { return false; } - $partitionsTry = $this->getPartitionMap(); // (partition => weight) + $partitionsTry = $this->partitionMap; // (partition => weight) while ( count( $partitionsTry ) ) { $partition = ArrayUtils::pickRandom( $partitionsTry ); @@ -335,7 +324,7 @@ class JobQueueFederated extends JobQueue { protected function doIsRootJobOldDuplicate( Job $job ) { $params = $job->getRootJobParams(); - $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 ); + $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); try { return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); } catch ( MWException $e ) { @@ -348,7 +337,7 @@ class JobQueueFederated extends JobQueue { protected function doDeduplicateRootJob( Job $job ) { $params = $job->getRootJobParams(); - $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 ); + $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 ); try { return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); } catch ( MWException $e ) { @@ -422,17 +411,22 @@ class JobQueueFederated extends JobQueue { } public function getCoalesceLocationInternal() { - return "JobQueueFederated:wiki:" . $this->wiki; + return "JobQueueFederated:wiki:{$this->wiki}" . + sha1( serialize( array_keys( $this->partitionMap ) ) ); } protected function doGetSiblingQueuesWithJobs( array $types ) { $result = array(); foreach ( $this->partitionQueues as $queue ) { - $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); - if ( is_array( $nonEmpty ) ) { - $result = array_merge( $result, $nonEmpty ); - } else { - return null; // not supported on all partitions; bail + try { + $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); + if ( is_array( $nonEmpty ) ) { + $result = array_merge( $result, $nonEmpty ); + } else { + return null; // not supported on all partitions; bail + } + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); } } return array_values( array_unique( $result ) ); @@ -441,13 +435,17 @@ class JobQueueFederated extends JobQueue { protected function doGetSiblingQueueSizes( array $types ) { $result = array(); foreach ( $this->partitionQueues as $queue ) { - $sizes = $queue->doGetSiblingQueueSizes( $types ); - if ( is_array( $sizes ) ) { - foreach ( $sizes as $type => $size ) { - $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; + try { + $sizes = $queue->doGetSiblingQueueSizes( $types ); + if ( is_array( $sizes ) ) { + foreach ( $sizes as $type => $size ) { + $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; + } + } else { + return null; // not supported on all partitions; bail } - } else { - return null; // not supported on all partitions; bail + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); } } return $result; @@ -459,26 +457,6 @@ class JobQueueFederated extends JobQueue { } } - /** - * @return Array Map of (partition name => weight) - */ - protected function getPartitionMap() { - $section = isset( $this->sectionsByWiki[$this->wiki] ) - ? $this->sectionsByWiki[$this->wiki] - : 'default'; - if ( !isset( $this->partitionsBySection[$section] ) ) { - throw new MWException( "No configuration for section '$section'." ); - } - return $this->partitionsBySection[$section]; - } - - /** - * @return bool The queue is actually split up across multiple queue partitions - */ - protected function isFederated() { - return ( count( $this->getPartitionMap() ) > 1 ); - } - /** * @return string */