From fc81192a06372779240c23cd96fb305488925bd5 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Thu, 28 Feb 2013 14:21:28 -0800 Subject: [PATCH] [JobQueue] Added JobQueueFederated class for partitioned queues. * This lets queues be horizontally partitioned onto different servers, with weights assigned to each. The queue classes used by the different partitions can be hetereogenous or homogeneous. * How partitioning is done is setup similar to LBFactory, where wikis belong to sections and sections have config. Change-Id: I44d59b67cf417dca28a3e9b25371dac5a7ffcb47 --- includes/AutoLoader.php | 2 + includes/HashRing.php | 114 +++++++++ includes/job/JobQueue.php | 6 +- includes/job/JobQueueFederated.php | 382 +++++++++++++++++++++++++++++ 4 files changed, 501 insertions(+), 3 deletions(-) create mode 100644 includes/HashRing.php create mode 100644 includes/job/JobQueueFederated.php diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 4a8ecaa69e..4e562f60f0 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -109,6 +109,7 @@ $wgAutoloadLocalClasses = array( 'FormOptions' => 'includes/FormOptions.php', 'FormSpecialPage' => 'includes/SpecialPage.php', 'GitInfo' => 'includes/GitInfo.php', + 'HashRing' => 'includes/HashRing.php', 'HashtableReplacer' => 'includes/StringUtils.php', 'HistoryBlob' => 'includes/HistoryBlob.php', 'HistoryBlobCurStub' => 'includes/HistoryBlob.php', @@ -656,6 +657,7 @@ $wgAutoloadLocalClasses = array( 'JobQueueAggregatorRedis' => 'includes/job/JobQueueAggregatorRedis.php', 'JobQueueDB' => 'includes/job/JobQueueDB.php', 'JobQueueGroup' => 'includes/job/JobQueueGroup.php', + 'JobQueueFederated' => 'includes/job/JobQueueFederated.php', 'JobQueueRedis' => 'includes/job/JobQueueRedis.php', # includes/job/jobs diff --git a/includes/HashRing.php b/includes/HashRing.php new file mode 100644 index 0000000000..976d36d827 --- /dev/null +++ b/includes/HashRing.php @@ -0,0 +1,114 @@ + (start, end)) */ + protected $ring = array(); + + const RING_SIZE = 16777216; // 2^24 + + /** + * @param array $map (location => weight) + */ + public function __construct( array $map ) { + $sum = array_sum( $map ); + if ( !count( $map ) || $sum <= 0 ) { + throw new MWException( "Ring is empty or all weights are zero." ); + } + // Sort the locations based on the hash of their names + $hashes = array(); + foreach ( $map as $location => $weight ) { + $hashes[$location] = sha1( $location ); + } + uksort( $map, function ( $a, $b ) use ( $hashes ) { + return strcmp( $hashes[$a], $hashes[$b] ); + } ); + // Fit the map to weight-proportionate one with a space of size RING_SIZE + $standardMap = array(); + foreach ( $map as $location => $weight ) { + $standardMap[$location] = (int)floor( $weight/$sum * self::RING_SIZE ); + } + // Build a ring of RING_SIZE spots, with each location at a spot in location hash order + $index = 0; + foreach ( $standardMap as $location => $weight ) { + // Location covers half-closed interval [$index,$index + $weight) + $this->ring[$location] = array( $index, $index + $weight ); + $index += $weight; + } + // Make sure the last location covers what is left + end( $this->ring ); + $this->ring[key( $this->ring )][1] = self::RING_SIZE; + } + + /** + * Get the location of an item on the ring + * + * @param string $item + * @return string Location + */ + public function getLocation( $item ) { + $locations = $this->getLocations( $item, 1 ); + return $locations[0]; + } + + /** + * Get the location of an item on the ring, as well as the next clockwise locations + * + * @param string $item + * @param integer $limit Maximum number of locations to return + * @return array List of locations + */ + public function getLocations( $item, $limit ) { + $locations = array(); + $primaryLocation = null; + $spot = hexdec( substr( sha1( $item ), 0, 6 ) ); // first 24 bits + foreach ( $this->ring as $location => $range ) { + if ( count( $locations ) >= $limit ) { + break; + } + // The $primaryLocation is the location the item spot is in. + // After that is reached, keep appending the next locations. + if ( ( $range[0] <= $spot && $spot < $range[1] ) || $primaryLocation !== null ) { + if ( $primaryLocation === null ) { + $primaryLocation = $location; + } + $locations[] = $location; + } + } + // If more locations are requested, wrap-around and keep adding them + reset( $this->ring ); + while ( count( $locations ) < $limit ) { + list( $location, ) = each( $this->ring ); + if ( $location === $primaryLocation ) { + break; // don't go in circles + } + $locations[] = $location; + } + return $locations; + } +} diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 5e596dc5a5..92bac3fab3 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -121,19 +121,19 @@ abstract class JobQueue { } /** - * @return string One of (random, timestamp, fifo) + * @return string One of (random, timestamp, fifo, undefined) */ final public function getOrder() { return $this->order; } /** - * @return Array Subset of (random, timestamp, fifo) + * @return Array Subset of (random, timestamp, fifo, undefined) */ abstract protected function supportedOrders(); /** - * @return string One of (random, timestamp, fifo) + * @return string One of (random, timestamp, fifo, undefined) */ abstract protected function optimalOrder(); diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php new file mode 100644 index 0000000000..ed3c4f45db --- /dev/null +++ b/includes/job/JobQueueFederated.php @@ -0,0 +1,382 @@ + section name) */ + protected $sectionsByWiki = array(); + /** @var Array (section name => (partition name => weight)) */ + protected $partitionsBySection = array(); + /** @var Array (section name => config array) */ + protected $configByPartition = array(); + /** @var Array (partition names => integer) */ + protected $partitionsNoPush = array(); + + /** @var Array (partition name => JobQueue) */ + protected $partitionQueues = array(); + /** @var BagOStuff */ + protected $cache; + + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating + const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date + + /** + * @params include: + * - sectionsByWiki : A map of wiki IDs to section names. + * Wikis will default to using the section "default". + * - partitionsBySection : Map of section names to maps of (partition name => weight). + * A section called 'default' must be defined if not all wikis + * have explicitly defined sections. + * - configByPartition : Map of queue partition names to configuration arrays. + * These configuration arrays are passed to JobQueue::factory(). + * The options set here are overriden by those passed to this + * the federated queue itself (e.g. 'order' and 'claimTTL'). + * - partitionsNoPush : List of partition names that can handle pop() but not push(). + * This can be used to migrate away from a certain partition. + * @param array $params + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $this->sectionsByWiki = $params['sectionsByWiki']; + $this->partitionsBySection = $params['partitionsBySection']; + $this->configByPartition = $params['configByPartition']; + if ( isset( $params['partitionsNoPush'] ) ) { + $this->partitionsNoPush = array_flip( $params['partitionsNoPush'] ); + } + $baseConfig = $params; + foreach ( array( 'class', 'sectionsByWiki', + 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o ) + { + unset( $baseConfig[$o] ); + } + foreach ( $this->getPartitionMap() as $partition => $w ) { + if ( !isset( $this->configByPartition[$partition] ) ) { + throw new MWException( "No configuration for partition '$partition'." ); + } + $this->partitionQueues[$partition] = JobQueue::factory( + $baseConfig + $this->configByPartition[$partition] + ); + } + // Aggregate cache some per-queue values if there are multiple partition queues + $this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff(); + } + + protected function supportedOrders() { + // No FIFO due to partitioning, though "rough timestamp order" is supported + return array( 'undefined', 'random', 'timestamp' ); + } + + protected function optimalOrder() { + return 'undefined'; // defer to the partitions + } + + protected function supportsDelayedJobs() { + return true; // defer checks to the partitions + } + + protected function doIsEmpty() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return true; + } elseif ( $isEmpty === 'false' ) { + return false; + } + + foreach ( $this->partitionQueues as $queue ) { + if ( !$queue->doIsEmpty() ) { + $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); + return false; + } + } + + $this->cache->add( $key, 'true', self::CACHE_TTL_LONG ); + return true; + } + + protected function doGetSize() { + return $this->getCrossPartitionSum( 'size', 'doGetSize' ); + } + + protected function doGetAcquiredCount() { + return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); + } + + protected function doGetDelayedCount() { + return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); + } + + protected function doGetAbandonedCount() { + return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); + } + + /** + * @param string $type + * @param string $method + * @return integer + */ + protected function getCrossPartitionSum( $type, $method ) { + $key = $this->getCacheKey( $type ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $count = 0; + foreach ( $this->partitionQueues as $queue ) { + $count += $queue->$method(); + } + + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + return $count; + } + + protected function doBatchPush( array $jobs, $flags ) { + if ( !count( $jobs ) ) { + return true; // nothing to do + } + + $partitionsTry = array_diff_key( + $this->getPartitionMap(), + $this->partitionsNoPush + ); // (partition => weight) + + // Try to insert the jobs and update $partitionsTry on any failures + $jobsLeft = $this->tryJobInsertions( $jobs, $partitionsTry, $flags ); + if ( count( $jobsLeft ) ) { // some jobs failed to insert? + // Try to insert the remaning jobs once more, ignoring the bad partitions + return !count( $this->tryJobInsertions( $jobsLeft, $partitionsTry, $flags ) ); + } else { + return true; + } + } + + /** + * @param array $jobs + * @param array $partitionsTry + * @param integer $flags + * @return array List of Job object that could not be inserted + */ + protected function tryJobInsertions( array $jobs, array &$partitionsTry, $flags ) { + if ( !count( $partitionsTry ) ) { + return $jobs; // can't insert anything + } + + $jobsLeft = array(); + + $partitionRing = new HashRing( $partitionsTry ); + // Because jobs are spread across partitions, per-job de-duplication needs + // to use a consistent hash to avoid allowing duplicate jobs per partition. + // When inserting a batch of de-duplicated jobs, QoS_Atomic is disregarded. + $uJobsByPartition = array(); // (partition name => job list) + foreach ( $jobs as $key => $job ) { + if ( $job->ignoreDuplicates() ) { + $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); + $uJobsByPartition[$partitionRing->getLocation( $sha1 )][] = $job; + unset( $jobs[$key] ); + } + } + // Get the batches of jobs that are not de-duplicated + if ( $flags & self::QoS_Atomic ) { + $nuJobBatches = array( $jobs ); // all or nothing + } else { + // Split the jobs into batches and spread them out over servers if there + // are many jobs. This helps keep the partitions even. Otherwise, send all + // the jobs to a single partition queue to avoids the extra connections. + $nuJobBatches = array_chunk( $jobs, 300 ); + } + + // Insert the de-duplicated jobs into the queues... + foreach ( $uJobsByPartition as $partition => $jobBatch ) { + $queue = $this->partitionQueues[$partition]; + if ( $queue->doBatchPush( $jobBatch, $flags ) ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); + } else { + unset( $partitionsTry[$partition] ); // blacklist partition + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + // Insert the jobs that are not de-duplicated into the queues... + foreach ( $nuJobBatches as $jobBatch ) { + $partition = ArrayUtils::pickRandom( $partitionsTry ); + if ( $partition === false ) { // all partitions at 0 weight? + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } else { + $queue = $this->partitionQueues[$partition]; + if ( $queue->doBatchPush( $jobBatch, $flags ) ) { + $key = $this->getCacheKey( 'empty' ); + $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); + } else { + unset( $partitionsTry[$partition] ); // blacklist partition + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + } + + return $jobsLeft; + } + + protected function doPop() { + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $this->cache->get( $key ); + if ( $isEmpty === 'true' ) { + return false; + } + + $partitionsTry = $this->getPartitionMap(); // (partition => weight) + + while ( count( $partitionsTry ) ) { + $partition = ArrayUtils::pickRandom( $partitionsTry ); + if ( $partition === false ) { + break; // all partitions at 0 weight + } + $queue = $this->partitionQueues[$partition]; + $job = $queue->pop(); + if ( $job ) { + $job->metadata['QueuePartition'] = $partition; + return $job; + } else { + unset( $partitionsTry[$partition] ); // blacklist partition + } + } + + $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG ); + return false; + } + + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['QueuePartition'] ) ) { + throw new MWException( "The given job has no defined partition name." ); + } + return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); + } + + protected function doWaitForBackups() { + foreach ( $this->partitionQueues as $queue ) { + $queue->waitForBackups(); + } + } + + protected function doGetPeriodicTasks() { + $tasks = array(); + foreach ( $this->partitionQueues as $partition => $queue ) { + foreach ( $queue->getPeriodicTasks() as $task => $def ) { + $tasks["{$partition}:{$task}"] = $def; + } + } + return $tasks; + } + + protected function doFlushCaches() { + static $types = array( + 'empty', + 'size', + 'acquiredcount', + 'delayedcount', + 'abandonedcount' + ); + foreach ( $types as $type ) { + $this->cache->delete( $this->getCacheKey( $type ) ); + } + foreach ( $this->partitionQueues as $queue ) { + $queue->doFlushCaches(); + } + } + + public function getAllQueuedJobs() { + $iterator = new AppendIterator(); + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllQueuedJobs() ); + } + return $iterator; + } + + public function getAllDelayedJobs() { + $iterator = new AppendIterator(); + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllDelayedJobs() ); + } + return $iterator; + } + + public function setTestingPrefix( $key ) { + foreach ( $this->partitionQueues as $queue ) { + $queue->setTestingPrefix( $key ); + } + } + + /** + * @return Array Map of (partition name => weight) + */ + protected function getPartitionMap() { + $section = isset( $this->sectionsByWiki[$this->wiki] ) + ? $this->sectionsByWiki[$this->wiki] + : 'default'; + if ( !isset( $this->partitionsBySection[$section] ) ) { + throw new MWException( "No configuration for section '$section'." ); + } + return $this->partitionsBySection[$section]; + } + + /** + * @return bool The queue is actually split up across multiple queue partitions + */ + protected function isFederated() { + return ( count( $this->getPartitionMap() ) > 1 ); + } + + /** + * @return string + */ + private function getCacheKey( $property ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); + } +} -- 2.20.1