* @since 1.22
*/
class JobQueueFederated extends JobQueue {
- /** @var Array (partition name => weight) */
+ /** @var Array (partition name => weight) reverse sorted by weight */
protected $partitionMap = array();
- /** @var Array (partition name => JobQueue) */
+ /** @var Array (partition name => JobQueue) reverse sorted by weight */
protected $partitionQueues = array();
/** @var HashRing */
protected $partitionPushRing;
/** @var BagOStuff */
protected $cache;
+ protected $maxPartitionsTry; // integer; maximum number of partitions to try
+
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
* the federated queue itself (e.g. 'order' and 'claimTTL').
* - partitionsNoPush : List of partition names that can handle pop() but not push().
* This can be used to migrate away from a certain partition.
+ * - maxPartitionsTry : Maximum number of times to attempt job insertion using
+ * different partition queues. This improves availability
+ * during failure, at the cost of added latency and somewhat
+ * less reliable job de-duplication mechanisms.
* @param array $params
*/
protected function __construct( array $params ) {
if ( !isset( $params['partitionsBySection'][$section] ) ) {
throw new MWException( "No configuration for section '$section'." );
}
+ $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] )
+ ? $params['maxPartitionsTry']
+ : 2;
+ // Get the full partition map
$this->partitionMap = $params['partitionsBySection'][$section];
+ arsort( $this->partitionMap, SORT_NUMERIC );
+ // Get the partitions jobs can actually be pushed to
$partitionPushMap = $this->partitionMap;
if ( isset( $params['partitionsNoPush'] ) ) {
foreach ( $params['partitionsNoPush'] as $partition ) {
}
// Get the config to pass to merge into each partition queue config
$baseConfig = $params;
- foreach ( array( 'class', 'sectionsByWiki',
+ foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o )
{
- unset( $baseConfig[$o] );
+ unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
// Get the partition queue objects
foreach ( $this->partitionMap as $partition => $w ) {
$this->partitionQueues[$partition] = JobQueue::factory(
$baseConfig + $params['configByPartition'][$partition] );
}
- // Get the ring of partitions to push job de-duplication information into
+ // Get the ring of partitions to push jobs into
$this->partitionPushRing = new HashRing( $partitionPushMap );
// Aggregate cache some per-queue values if there are multiple partition queues
$this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
return false;
}
} catch ( JobQueueError $e ) {
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
}
try {
$count += $queue->$method();
} catch ( JobQueueError $e ) {
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
}
}
protected function doBatchPush( array $jobs, $flags ) {
- if ( !count( $jobs ) ) {
- return true; // nothing to do
- }
// Local ring variable that may be changed to point to a new ring on failure
$partitionRing = $this->partitionPushRing;
- // Try to insert the jobs and update $partitionsTry on any failures
- $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags );
- if ( count( $jobsLeft ) ) { // some jobs failed to insert?
- // Try to insert the remaning jobs once more, ignoring the bad partitions
- return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) );
+ // Try to insert the jobs and update $partitionsTry on any failures.
+ // Retry to insert any remaning jobs again, ignoring the bad partitions.
+ $jobsLeft = $jobs;
+ for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
+ $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
+ }
+ if ( count( $jobsLeft ) ) {
+ throw new JobQueueError(
+ "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
}
return true;
}
$ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
} catch ( JobQueueError $e ) {
$ok = false;
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
if ( $ok ) {
$key = $this->getCacheKey( 'empty' );
$ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
} catch ( JobQueueError $e ) {
$ok = false;
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
if ( $ok ) {
$key = $this->getCacheKey( 'empty' );
$job = $queue->pop();
} catch ( JobQueueError $e ) {
$job = false;
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
if ( $job ) {
$job->metadata['QueuePartition'] = $partition;
$partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
try {
return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
- } catch ( MWException $e ) {
+ } catch ( JobQueueError $e ) {
if ( isset( $partitions[1] ) ) { // check fallback partition
return $this->partitionQueues[$partitions[1]]->doIsRootJobOldDuplicate( $job );
}
$partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
try {
return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
- } catch ( MWException $e ) {
+ } catch ( JobQueueError $e ) {
if ( isset( $partitions[1] ) ) { // check fallback partition
return $this->partitionQueues[$partitions[1]]->doDeduplicateRootJob( $job );
}
try {
$queue->doDelete();
} catch ( JobQueueError $e ) {
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
}
}
try {
$queue->waitForBackups();
} catch ( JobQueueError $e ) {
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
}
}
try {
$nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
if ( is_array( $nonEmpty ) ) {
- $result = array_merge( $result, $nonEmpty );
+ $result = array_unique( array_merge( $result, $nonEmpty ) );
} else {
return null; // not supported on all partitions; bail
}
+ if ( count( $result ) == count( $types ) ) {
+ break; // short-circuit
+ }
} catch ( JobQueueError $e ) {
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
}
- return array_values( array_unique( $result ) );
+ return array_values( $result );
}
protected function doGetSiblingQueueSizes( array $types ) {
return null; // not supported on all partitions; bail
}
} catch ( JobQueueError $e ) {
- wfDebugLog( 'exception', $e->getLogMessage() );
+ MWExceptionHandler::logException( $e );
}
}
return $result;