From: Aaron Schulz Date: Fri, 8 Nov 2013 21:51:37 +0000 (-0800) Subject: Error handling tweaks in JobQueueFederated X-Git-Tag: 1.31.0-rc.0~17596 X-Git-Url: http://git.cyclocoop.org/%24action?a=commitdiff_plain;h=09180fd6a3dc27a887f47212fcf34acd3a89100a;p=lhc%2Fweb%2Fwiklou.git Error handling tweaks in JobQueueFederated * Throw errors in more cases when all partitions cannot be reached Change-Id: I86ba1076430be8e4be0606294c370fadb0c43d63 --- diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index 589bed6e29..f4caac6f37 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -150,21 +150,20 @@ class JobQueueFederated extends JobQueue { return false; } + $empty = true; + $failed = 0; foreach ( $this->partitionQueues as $queue ) { try { - if ( !$queue->doIsEmpty() ) { - $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); - - return false; - } + $empty = $empty && $queue->doIsEmpty(); } catch ( JobQueueError $e ) { + ++$failed; MWExceptionHandler::logException( $e ); } } + $this->throwErrorIfAllPartitionsDown( $failed ); - $this->cache->add( $key, 'true', self::CACHE_TTL_LONG ); - - return true; + $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG ); + return !$empty; } protected function doGetSize() { @@ -196,14 +195,16 @@ class JobQueueFederated extends JobQueue { return $count; } - $count = 0; + $failed = 0; foreach ( $this->partitionQueues as $queue ) { try { $count += $queue->$method(); } catch ( JobQueueError $e ) { + ++$failed; MWExceptionHandler::logException( $e ); } } + $this->throwErrorIfAllPartitionsDown( $failed ); $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); @@ -275,7 +276,7 @@ class JobQueueFederated extends JobQueue { } else { $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist if ( !$partitionRing ) { - throw new JobQueueError( "Could not insert job(s), all partitions are down." ); + throw new JobQueueError( "Could not insert job(s), no partitions available." ); } $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted } @@ -297,7 +298,7 @@ class JobQueueFederated extends JobQueue { } else { $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist if ( !$partitionRing ) { - throw new JobQueueError( "Could not insert job(s), all partitions are down." ); + throw new JobQueueError( "Could not insert job(s), no partitions available." ); } $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted } @@ -316,6 +317,7 @@ class JobQueueFederated extends JobQueue { $partitionsTry = $this->partitionMap; // (partition => weight) + $failed = 0; while ( count( $partitionsTry ) ) { $partition = ArrayUtils::pickRandom( $partitionsTry ); if ( $partition === false ) { @@ -327,8 +329,9 @@ class JobQueueFederated extends JobQueue { try { $job = $queue->pop(); } catch ( JobQueueError $e ) { - $job = false; + ++$failed; MWExceptionHandler::logException( $e ); + $job = false; } if ( $job ) { $job->metadata['QueuePartition'] = $partition; @@ -338,6 +341,7 @@ class JobQueueFederated extends JobQueue { unset( $partitionsTry[$partition] ); // blacklist partition } } + $this->throwErrorIfAllPartitionsDown( $failed ); $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG ); @@ -381,25 +385,32 @@ class JobQueueFederated extends JobQueue { } protected function doDelete() { + $failed = 0; /** @var JobQueue $queue */ foreach ( $this->partitionQueues as $queue ) { try { $queue->doDelete(); } catch ( JobQueueError $e ) { + ++$failed; MWExceptionHandler::logException( $e ); } } + $this->throwErrorIfAllPartitionsDown( $failed ); + return true; } protected function doWaitForBackups() { + $failed = 0; /** @var JobQueue $queue */ foreach ( $this->partitionQueues as $queue ) { try { $queue->waitForBackups(); } catch ( JobQueueError $e ) { + ++$failed; MWExceptionHandler::logException( $e ); } } + $this->throwErrorIfAllPartitionsDown( $failed ); } protected function doGetPeriodicTasks() { @@ -463,6 +474,7 @@ class JobQueueFederated extends JobQueue { protected function doGetSiblingQueuesWithJobs( array $types ) { $result = array(); + $failed = 0; /** @var JobQueue $queue */ foreach ( $this->partitionQueues as $queue ) { try { @@ -476,16 +488,18 @@ class JobQueueFederated extends JobQueue { break; // short-circuit } } catch ( JobQueueError $e ) { + ++$failed; MWExceptionHandler::logException( $e ); } } + $this->throwErrorIfAllPartitionsDown( $failed ); return array_values( $result ); } protected function doGetSiblingQueueSizes( array $types ) { $result = array(); - + $failed = 0; /** @var JobQueue $queue */ foreach ( $this->partitionQueues as $queue ) { try { @@ -498,13 +512,28 @@ class JobQueueFederated extends JobQueue { return null; // not supported on all partitions; bail } } catch ( JobQueueError $e ) { + ++$failed; MWExceptionHandler::logException( $e ); } } + $this->throwErrorIfAllPartitionsDown( $failed ); return $result; } + /** + * Throw an error if no partitions available + * + * @param int $down The number of up partitions down + * @return void + * @throws JobQueueError + */ + protected function throwErrorIfAllPartitionsDown( $down ) { + if ( $down >= count( $this->partitionQueues ) ) { + throw new JobQueueError( 'No queue partitions available.' ); + } + } + public function setTestingPrefix( $key ) { /** @var JobQueue $queue */ foreach ( $this->partitionQueues as $queue ) {