Error handling tweaks in JobQueueFederated
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 8 Nov 2013 21:51:37 +0000 (13:51 -0800)
committerTim Starling <tstarling@wikimedia.org>
Tue, 17 Dec 2013 23:34:17 +0000 (23:34 +0000)
* Throw errors in more cases when all partitions cannot be reached

Change-Id: I86ba1076430be8e4be0606294c370fadb0c43d63

includes/job/JobQueueFederated.php

index 589bed6..f4caac6 100644 (file)
@@ -150,21 +150,20 @@ class JobQueueFederated extends JobQueue {
                        return false;
                }
 
+               $empty = true;
+               $failed = 0;
                foreach ( $this->partitionQueues as $queue ) {
                        try {
-                               if ( !$queue->doIsEmpty() ) {
-                                       $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
-
-                                       return false;
-                               }
+                               $empty = $empty && $queue->doIsEmpty();
                        } catch ( JobQueueError $e ) {
+                               ++$failed;
                                MWExceptionHandler::logException( $e );
                        }
                }
+               $this->throwErrorIfAllPartitionsDown( $failed );
 
-               $this->cache->add( $key, 'true', self::CACHE_TTL_LONG );
-
-               return true;
+               $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
+               return !$empty;
        }
 
        protected function doGetSize() {
@@ -196,14 +195,16 @@ class JobQueueFederated extends JobQueue {
                        return $count;
                }
 
-               $count = 0;
+               $failed = 0;
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $count += $queue->$method();
                        } catch ( JobQueueError $e ) {
+                               ++$failed;
                                MWExceptionHandler::logException( $e );
                        }
                }
+               $this->throwErrorIfAllPartitionsDown( $failed );
 
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
 
@@ -275,7 +276,7 @@ class JobQueueFederated extends JobQueue {
                        } else {
                                $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
                                if ( !$partitionRing ) {
-                                       throw new JobQueueError( "Could not insert job(s), all partitions are down." );
+                                       throw new JobQueueError( "Could not insert job(s), no partitions available." );
                                }
                                $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
                        }
@@ -297,7 +298,7 @@ class JobQueueFederated extends JobQueue {
                        } else {
                                $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
                                if ( !$partitionRing ) {
-                                       throw new JobQueueError( "Could not insert job(s), all partitions are down." );
+                                       throw new JobQueueError( "Could not insert job(s), no partitions available." );
                                }
                                $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
                        }
@@ -316,6 +317,7 @@ class JobQueueFederated extends JobQueue {
 
                $partitionsTry = $this->partitionMap; // (partition => weight)
 
+               $failed = 0;
                while ( count( $partitionsTry ) ) {
                        $partition = ArrayUtils::pickRandom( $partitionsTry );
                        if ( $partition === false ) {
@@ -327,8 +329,9 @@ class JobQueueFederated extends JobQueue {
                        try {
                                $job = $queue->pop();
                        } catch ( JobQueueError $e ) {
-                               $job = false;
+                               ++$failed;
                                MWExceptionHandler::logException( $e );
+                               $job = false;
                        }
                        if ( $job ) {
                                $job->metadata['QueuePartition'] = $partition;
@@ -338,6 +341,7 @@ class JobQueueFederated extends JobQueue {
                                unset( $partitionsTry[$partition] ); // blacklist partition
                        }
                }
+               $this->throwErrorIfAllPartitionsDown( $failed );
 
                $this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG );
 
@@ -381,25 +385,32 @@ class JobQueueFederated extends JobQueue {
        }
 
        protected function doDelete() {
+               $failed = 0;
                /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $queue->doDelete();
                        } catch ( JobQueueError $e ) {
+                               ++$failed;
                                MWExceptionHandler::logException( $e );
                        }
                }
+               $this->throwErrorIfAllPartitionsDown( $failed );
+               return true;
        }
 
        protected function doWaitForBackups() {
+               $failed = 0;
                /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
                                $queue->waitForBackups();
                        } catch ( JobQueueError $e ) {
+                               ++$failed;
                                MWExceptionHandler::logException( $e );
                        }
                }
+               $this->throwErrorIfAllPartitionsDown( $failed );
        }
 
        protected function doGetPeriodicTasks() {
@@ -463,6 +474,7 @@ class JobQueueFederated extends JobQueue {
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $result = array();
 
+               $failed = 0;
                /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
@@ -476,16 +488,18 @@ class JobQueueFederated extends JobQueue {
                                        break; // short-circuit
                                }
                        } catch ( JobQueueError $e ) {
+                               ++$failed;
                                MWExceptionHandler::logException( $e );
                        }
                }
+               $this->throwErrorIfAllPartitionsDown( $failed );
 
                return array_values( $result );
        }
 
        protected function doGetSiblingQueueSizes( array $types ) {
                $result = array();
-
+               $failed = 0;
                /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        try {
@@ -498,13 +512,28 @@ class JobQueueFederated extends JobQueue {
                                        return null; // not supported on all partitions; bail
                                }
                        } catch ( JobQueueError $e ) {
+                               ++$failed;
                                MWExceptionHandler::logException( $e );
                        }
                }
+               $this->throwErrorIfAllPartitionsDown( $failed );
 
                return $result;
        }
 
+       /**
+        * Throw an error if no partitions available
+        *
+        * @param int $down The number of up partitions down
+        * @return void
+        * @throws JobQueueError
+        */
+       protected function throwErrorIfAllPartitionsDown( $down ) {
+               if ( $down >= count( $this->partitionQueues ) ) {
+                       throw new JobQueueError( 'No queue partitions available.' );
+               }
+       }
+
        public function setTestingPrefix( $key ) {
                /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {