}
foreach ( $this->partitionQueues as $queue ) {
- if ( !$queue->doIsEmpty() ) {
- $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
- return false;
+ try {
+ if ( !$queue->doIsEmpty() ) {
+ $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
+ return false;
+ }
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
}
}
$count = 0;
foreach ( $this->partitionQueues as $queue ) {
- $count += $queue->$method();
+ try {
+ $count += $queue->$method();
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
}
$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
// Insert the de-duplicated jobs into the queues...
foreach ( $uJobsByPartition as $partition => $jobBatch ) {
$queue = $this->partitionQueues[$partition];
- if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
+ try {
+ $ok = $queue->doBatchPush( $jobBatch, $flags );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
+ if ( $ok ) {
$key = $this->getCacheKey( 'empty' );
$this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
} else {
$jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
} else {
$queue = $this->partitionQueues[$partition];
- if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
+ try {
+ $ok = $queue->doBatchPush( $jobBatch, $flags );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
+ if ( $ok ) {
$key = $this->getCacheKey( 'empty' );
$this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
} else {
break; // all partitions at 0 weight
}
$queue = $this->partitionQueues[$partition];
- $job = $queue->pop();
+ try {
+ $job = $queue->pop();
+ } catch ( JobQueueError $e ) {
+ $job = false;
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
if ( $job ) {
$job->metadata['QueuePartition'] = $partition;
return $job;
protected function doDelete() {
foreach ( $this->partitionQueues as $queue ) {
- $queue->doDelete();
+ try {
+ $queue->doDelete();
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
}
}
protected function doWaitForBackups() {
foreach ( $this->partitionQueues as $queue ) {
- $queue->waitForBackups();
+ try {
+ $queue->waitForBackups();
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
}
}
return $iterator;
}
+ public function getCoalesceLocationInternal() {
+ return "JobQueueFederated:wiki:" . $this->wiki;
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ $result = array();
+ foreach ( $this->partitionQueues as $queue ) {
+ $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
+ if ( is_array( $nonEmpty ) ) {
+ $result = array_merge( $result, $nonEmpty );
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ }
+ return array_values( array_unique( $result ) );
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $result = array();
+ foreach ( $this->partitionQueues as $queue ) {
+ $sizes = $queue->doGetSiblingQueueSizes( $types );
+ if ( is_array( $sizes ) ) {
+ foreach ( $sizes as $type => $size ) {
+ $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+ }
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ }
+ return $result;
+ }
+
public function setTestingPrefix( $key ) {
foreach ( $this->partitionQueues as $queue ) {
$queue->setTestingPrefix( $key );