Merge "Add method parameter type documentation"
[lhc/web/wiklou.git] / includes / job / JobQueueFederated.php
index 19de8bb..d788c98 100644 (file)
@@ -138,9 +138,13 @@ class JobQueueFederated extends JobQueue {
                }
 
                foreach ( $this->partitionQueues as $queue ) {
-                       if ( !$queue->doIsEmpty() ) {
-                               $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
-                               return false;
+                       try {
+                               if ( !$queue->doIsEmpty() ) {
+                                       $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
+                                       return false;
+                               }
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
                        }
                }
 
@@ -179,7 +183,11 @@ class JobQueueFederated extends JobQueue {
 
                $count = 0;
                foreach ( $this->partitionQueues as $queue ) {
-                       $count += $queue->$method();
+                       try {
+                               $count += $queue->$method();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
 
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
@@ -244,7 +252,13 @@ class JobQueueFederated extends JobQueue {
                // Insert the de-duplicated jobs into the queues...
                foreach ( $uJobsByPartition as $partition => $jobBatch ) {
                        $queue = $this->partitionQueues[$partition];
-                       if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
+                       try {
+                               $ok = $queue->doBatchPush( $jobBatch, $flags );
+                       } catch ( JobQueueError $e ) {
+                               $ok = false;
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
+                       if ( $ok ) {
                                $key = $this->getCacheKey( 'empty' );
                                $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                        } else {
@@ -259,7 +273,13 @@ class JobQueueFederated extends JobQueue {
                                $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
                        } else {
                                $queue = $this->partitionQueues[$partition];
-                               if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
+                               try {
+                                       $ok = $queue->doBatchPush( $jobBatch, $flags );
+                               } catch ( JobQueueError $e ) {
+                                       $ok = false;
+                                       wfDebugLog( 'exception', $e->getLogMessage() );
+                               }
+                               if ( $ok ) {
                                        $key = $this->getCacheKey( 'empty' );
                                        $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
                                } else {
@@ -288,7 +308,12 @@ class JobQueueFederated extends JobQueue {
                                break; // all partitions at 0 weight
                        }
                        $queue = $this->partitionQueues[$partition];
-                       $job = $queue->pop();
+                       try {
+                               $job = $queue->pop();
+                       } catch ( JobQueueError $e ) {
+                               $job = false;
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                        if ( $job ) {
                                $job->metadata['QueuePartition'] = $partition;
                                return $job;
@@ -336,13 +361,21 @@ class JobQueueFederated extends JobQueue {
 
        protected function doDelete() {
                foreach ( $this->partitionQueues as $queue ) {
-                       $queue->doDelete();
+                       try {
+                               $queue->doDelete();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
        }
 
        protected function doWaitForBackups() {
                foreach ( $this->partitionQueues as $queue ) {
-                       $queue->waitForBackups();
+                       try {
+                               $queue->waitForBackups();
+                       } catch ( JobQueueError $e ) {
+                               wfDebugLog( 'exception', $e->getLogMessage() );
+                       }
                }
        }
 
@@ -388,6 +421,38 @@ class JobQueueFederated extends JobQueue {
                return $iterator;
        }
 
+       public function getCoalesceLocationInternal() {
+               return "JobQueueFederated:wiki:" . $this->wiki;
+       }
+
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               $result = array();
+               foreach ( $this->partitionQueues as $queue ) {
+                       $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
+                       if ( is_array( $nonEmpty ) ) {
+                               $result = array_merge( $result, $nonEmpty );
+                       } else {
+                               return null; // not supported on all partitions; bail
+                       }
+               }
+               return array_values( array_unique( $result ) );
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $result = array();
+               foreach ( $this->partitionQueues as $queue ) {
+                       $sizes = $queue->doGetSiblingQueueSizes( $types );
+                       if ( is_array( $sizes ) ) {
+                               foreach ( $sizes as $type => $size ) {
+                                       $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+                               }
+                       } else {
+                               return null; // not supported on all partitions; bail
+                       }
+               }
+               return $result;
+       }
+
        public function setTestingPrefix( $key ) {
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->setTestingPrefix( $key );