return false;
}
+ $empty = true;
+ $failed = 0;
foreach ( $this->partitionQueues as $queue ) {
try {
- if ( !$queue->doIsEmpty() ) {
- $this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
-
- return false;
- }
+ $empty = $empty && $queue->doIsEmpty();
} catch ( JobQueueError $e ) {
+ ++$failed;
MWExceptionHandler::logException( $e );
}
}
+ $this->throwErrorIfAllPartitionsDown( $failed );
- $this->cache->add( $key, 'true', self::CACHE_TTL_LONG );
-
- return true;
+ $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
+ return !$empty;
}
protected function doGetSize() {
return $count;
}
- $count = 0;
+ $failed = 0;
foreach ( $this->partitionQueues as $queue ) {
try {
$count += $queue->$method();
} catch ( JobQueueError $e ) {
+ ++$failed;
MWExceptionHandler::logException( $e );
}
}
+ $this->throwErrorIfAllPartitionsDown( $failed );
$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
} else {
$partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
if ( !$partitionRing ) {
- throw new JobQueueError( "Could not insert job(s), all partitions are down." );
+ throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
$jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
}
} else {
$partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
if ( !$partitionRing ) {
- throw new JobQueueError( "Could not insert job(s), all partitions are down." );
+ throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
$jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
}
$partitionsTry = $this->partitionMap; // (partition => weight)
+ $failed = 0;
while ( count( $partitionsTry ) ) {
$partition = ArrayUtils::pickRandom( $partitionsTry );
if ( $partition === false ) {
try {
$job = $queue->pop();
} catch ( JobQueueError $e ) {
- $job = false;
+ ++$failed;
MWExceptionHandler::logException( $e );
+ $job = false;
}
if ( $job ) {
$job->metadata['QueuePartition'] = $partition;
unset( $partitionsTry[$partition] ); // blacklist partition
}
}
+ $this->throwErrorIfAllPartitionsDown( $failed );
$this->cache->set( $key, 'true', JobQueueDB::CACHE_TTL_LONG );
}
protected function doDelete() {
+ $failed = 0;
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
try {
$queue->doDelete();
} catch ( JobQueueError $e ) {
+ ++$failed;
MWExceptionHandler::logException( $e );
}
}
+ $this->throwErrorIfAllPartitionsDown( $failed );
+ return true;
}
protected function doWaitForBackups() {
+ $failed = 0;
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
try {
$queue->waitForBackups();
} catch ( JobQueueError $e ) {
+ ++$failed;
MWExceptionHandler::logException( $e );
}
}
+ $this->throwErrorIfAllPartitionsDown( $failed );
}
protected function doGetPeriodicTasks() {
protected function doGetSiblingQueuesWithJobs( array $types ) {
$result = array();
+ $failed = 0;
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
try {
break; // short-circuit
}
} catch ( JobQueueError $e ) {
+ ++$failed;
MWExceptionHandler::logException( $e );
}
}
+ $this->throwErrorIfAllPartitionsDown( $failed );
return array_values( $result );
}
protected function doGetSiblingQueueSizes( array $types ) {
$result = array();
-
+ $failed = 0;
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
try {
return null; // not supported on all partitions; bail
}
} catch ( JobQueueError $e ) {
+ ++$failed;
MWExceptionHandler::logException( $e );
}
}
+ $this->throwErrorIfAllPartitionsDown( $failed );
return $result;
}
+ /**
+ * Throw an error if no partitions available
+ *
+ * @param int $down The number of up partitions down
+ * @return void
+ * @throws JobQueueError
+ */
+ protected function throwErrorIfAllPartitionsDown( $down ) {
+ if ( $down >= count( $this->partitionQueues ) ) {
+ throw new JobQueueError( 'No queue partitions available.' );
+ }
+ }
+
public function setTestingPrefix( $key ) {
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {