From: Aaron Schulz Date: Fri, 31 May 2013 03:46:33 +0000 (-0700) Subject: jobqueue: made federated queue use HashRing for root job de-duplication X-Git-Tag: 1.31.0-rc.0~19116 X-Git-Url: https://git.cyclocoop.org/%242?a=commitdiff_plain;h=44802e5594ea9ebae03eac75fb77b1f738f1eeae;p=lhc%2Fweb%2Fwiklou.git jobqueue: made federated queue use HashRing for root job de-duplication * This can spread entries out across job servers rather than always $wgMemc. * Also made the sectionsByWiki config default to an empty array. Change-Id: I176ff02eb4f05a1ea7d3bf93e0a10e074bb27d11 --- diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index db5b6862ad..19de8bb529 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -55,6 +55,8 @@ class JobQueueFederated extends JobQueue { /** @var Array (partition names => integer) */ protected $partitionsNoPush = array(); + /** @var HashRing */ + protected $partitionRing; /** @var Array (partition name => JobQueue) */ protected $partitionQueues = array(); /** @var BagOStuff */ @@ -80,7 +82,9 @@ class JobQueueFederated extends JobQueue { */ protected function __construct( array $params ) { parent::__construct( $params ); - $this->sectionsByWiki = $params['sectionsByWiki']; + $this->sectionsByWiki = isset( $params['sectionsByWiki'] ) + ? $params['sectionsByWiki'] + : array(); // all in "default" section $this->partitionsBySection = $params['partitionsBySection']; $this->configByPartition = $params['configByPartition']; if ( isset( $params['partitionsNoPush'] ) ) { @@ -100,6 +104,12 @@ class JobQueueFederated extends JobQueue { $baseConfig + $this->configByPartition[$partition] ); } + // Get the ring of partitions to push job de-duplication information into + $partitionsTry = array_diff_key( + $this->getPartitionMap(), + $this->partitionsNoPush + ); // (partition => weight) + $this->partitionRing = new HashRing( $partitionsTry ); // Aggregate cache some per-queue values if there are multiple partition queues $this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff(); } @@ -298,6 +308,32 @@ class JobQueueFederated extends JobQueue { return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); } + protected function doIsRootJobOldDuplicate( Job $job ) { + $params = $job->getRootJobParams(); + $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 ); + try { + return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job ); + } catch ( MWException $e ) { + if ( isset( $partitions[1] ) ) { // check fallback partition + return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job ); + } + } + return false; + } + + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getRootJobParams(); + $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 ); + try { + return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job ); + } catch ( MWException $e ) { + if ( isset( $partitions[1] ) ) { // check fallback partition + return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job ); + } + } + return false; + } + protected function doDelete() { foreach ( $this->partitionQueues as $queue ) { $queue->doDelete();