From aa37656c36bd732d01b28a3a0658a0fd4632c46f Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Sat, 19 Oct 2013 12:48:12 -0700 Subject: [PATCH] Added "maxPartitionsTry" option to JobQueueFederated * This makes the retry logic more configurable * Also throw an error if some jobs are not inserted to match the other queues. Change-Id: I363e4813ee89468cde4349ba81ea458ff4ffe446 --- includes/job/JobQueueFederated.php | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index d3ce164af9..36f4959ecd 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -56,6 +56,8 @@ class JobQueueFederated extends JobQueue { /** @var BagOStuff */ protected $cache; + protected $maxPartitionsTry; // integer; maximum number of partitions to try + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date @@ -72,6 +74,10 @@ class JobQueueFederated extends JobQueue { * the federated queue itself (e.g. 'order' and 'claimTTL'). * - partitionsNoPush : List of partition names that can handle pop() but not push(). * This can be used to migrate away from a certain partition. + * - maxPartitionsTry : Maximum number of times to attempt job insertion using + * different partition queues. This improves availability + * during failure, at the cost of added latency and somewhat + * less reliable job de-duplication mechanisms. * @param array $params */ protected function __construct( array $params ) { @@ -82,6 +88,9 @@ class JobQueueFederated extends JobQueue { if ( !isset( $params['partitionsBySection'][$section] ) ) { throw new MWException( "No configuration for section '$section'." ); } + $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] ) + ? $params['maxPartitionsTry'] + : 2; // Get the full partition map $this->partitionMap = $params['partitionsBySection'][$section]; arsort( $this->partitionMap, SORT_NUMERIC ); @@ -94,10 +103,10 @@ class JobQueueFederated extends JobQueue { } // Get the config to pass to merge into each partition queue config $baseConfig = $params; - foreach ( array( 'class', 'sectionsByWiki', + foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry', 'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o ) { - unset( $baseConfig[$o] ); + unset( $baseConfig[$o] ); // partition queue doesn't care about this } // Get the partition queue objects foreach ( $this->partitionMap as $partition => $w ) { @@ -194,16 +203,17 @@ class JobQueueFederated extends JobQueue { } protected function doBatchPush( array $jobs, $flags ) { - if ( !count( $jobs ) ) { - return true; // nothing to do - } // Local ring variable that may be changed to point to a new ring on failure $partitionRing = $this->partitionPushRing; - // Try to insert the jobs and update $partitionsTry on any failures - $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags ); - if ( count( $jobsLeft ) ) { // some jobs failed to insert? - // Try to insert the remaning jobs once more, ignoring the bad partitions - return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) ); + // Try to insert the jobs and update $partitionsTry on any failures. + // Retry to insert any remaning jobs again, ignoring the bad partitions. + $jobsLeft = $jobs; + for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { + $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); + } + if ( count( $jobsLeft ) ) { + throw new JobQueueError( + "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); } return true; } -- 2.20.1