*
* If used for performance, then $wgMainCacheType should be set to memcached/redis.
* Note that "fifo" cannot be used for the ordering, since the data is distributed.
- * One can still use "timestamp" instead, as in "roughly timestamp ordered".
+ * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also,
+ * queue classes used by this should ignore down servers (with TTL) to avoid slowness.
*
* @ingroup JobQueue
* @since 1.22
*/
class JobQueueFederated extends JobQueue {
- /** @var Array (wiki ID => section name) */
- protected $sectionsByWiki = array();
- /** @var Array (section name => (partition name => weight)) */
- protected $partitionsBySection = array();
- /** @var Array (section name => config array) */
- protected $configByPartition = array();
- /** @var Array (partition names => integer) */
- protected $partitionsNoPush = array();
-
- /** @var HashRing */
- protected $partitionRing;
+ /** @var Array (partition name => weight) */
+ protected $partitionMap = array();
/** @var Array (partition name => JobQueue) */
protected $partitionQueues = array();
+ /** @var HashRing */
+ protected $partitionPushRing;
/** @var BagOStuff */
protected $cache;
*/
protected function __construct( array $params ) {
parent::__construct( $params );
- $this->sectionsByWiki = isset( $params['sectionsByWiki'] )
- ? $params['sectionsByWiki']
- : array(); // all in "default" section
- $this->partitionsBySection = $params['partitionsBySection'];
- $this->configByPartition = $params['configByPartition'];
+ $section = isset( $params['sectionsByWiki'][$this->wiki] )
+ ? $params['sectionsByWiki'][$this->wiki]
+ : 'default';
+ if ( !isset( $params['partitionsBySection'][$section] ) ) {
+ throw new MWException( "No configuration for section '$section'." );
+ }
+ $this->partitionMap = $params['partitionsBySection'][$section];
+ $partitionPushMap = $this->partitionMap;
if ( isset( $params['partitionsNoPush'] ) ) {
- $this->partitionsNoPush = array_flip( $params['partitionsNoPush'] );
+ foreach ( $params['partitionsNoPush'] as $partition ) {
+ unset( $partitionPushMap[$partition] );
+ }
}
+ // Get the config to pass to merge into each partition queue config
$baseConfig = $params;
foreach ( array( 'class', 'sectionsByWiki',
'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o )
{
unset( $baseConfig[$o] );
}
- foreach ( $this->getPartitionMap() as $partition => $w ) {
- if ( !isset( $this->configByPartition[$partition] ) ) {
+ // Get the partition queue objects
+ foreach ( $this->partitionMap as $partition => $w ) {
+ if ( !isset( $params['configByPartition'][$partition] ) ) {
throw new MWException( "No configuration for partition '$partition'." );
}
$this->partitionQueues[$partition] = JobQueue::factory(
- $baseConfig + $this->configByPartition[$partition]
- );
+ $baseConfig + $params['configByPartition'][$partition] );
}
// Get the ring of partitions to push job de-duplication information into
- $partitionsTry = array_diff_key(
- $this->getPartitionMap(),
- $this->partitionsNoPush
- ); // (partition => weight)
- $this->partitionRing = new HashRing( $partitionsTry );
+ $this->partitionPushRing = new HashRing( $partitionPushMap );
// Aggregate cache some per-queue values if there are multiple partition queues
- $this->cache = $this->isFederated() ? wfGetMainCache() : new EmptyBagOStuff();
+ $this->cache = count( $this->partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
}
protected function supportedOrders() {
if ( !count( $jobs ) ) {
return true; // nothing to do
}
-
- $partitionsTry = array_diff_key(
- $this->getPartitionMap(),
- $this->partitionsNoPush
- ); // (partition => weight)
-
+ // Local ring variable that may be changed to point to a new ring on failure
+ $partitionRing = $this->partitionPushRing;
// Try to insert the jobs and update $partitionsTry on any failures
- $jobsLeft = $this->tryJobInsertions( $jobs, $partitionsTry, $flags );
+ $jobsLeft = $this->tryJobInsertions( $jobs, $partitionRing, $flags );
if ( count( $jobsLeft ) ) { // some jobs failed to insert?
// Try to insert the remaning jobs once more, ignoring the bad partitions
- return !count( $this->tryJobInsertions( $jobsLeft, $partitionsTry, $flags ) );
- } else {
- return true;
+ return !count( $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ) );
}
+ return true;
}
/**
* @param array $jobs
- * @param array $partitionsTry
+ * @param HashRing $partitionRing
* @param integer $flags
* @return array List of Job object that could not be inserted
*/
- protected function tryJobInsertions( array $jobs, array &$partitionsTry, $flags ) {
- if ( !count( $partitionsTry ) ) {
- return $jobs; // can't insert anything
- }
-
+ protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
$jobsLeft = array();
- $partitionRing = new HashRing( $partitionsTry );
// Because jobs are spread across partitions, per-job de-duplication needs
// to use a consistent hash to avoid allowing duplicate jobs per partition.
// When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
$key = $this->getCacheKey( 'empty' );
$this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
} else {
- unset( $partitionsTry[$partition] ); // blacklist partition
+ $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
+ if ( !$partitionRing ) {
+ throw new JobQueueError( "Could not insert job(s), all partitions are down." );
+ }
$jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
}
}
+
// Insert the jobs that are not de-duplicated into the queues...
foreach ( $nuJobBatches as $jobBatch ) {
- $partition = ArrayUtils::pickRandom( $partitionsTry );
- if ( $partition === false ) { // all partitions at 0 weight?
- $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
+ $partition = ArrayUtils::pickRandom( $partitionRing->getLocationWeights() );
+ $queue = $this->partitionQueues[$partition];
+ try {
+ $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ wfDebugLog( 'exception', $e->getLogMessage() );
+ }
+ if ( $ok ) {
+ $key = $this->getCacheKey( 'empty' );
+ $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
} else {
- $queue = $this->partitionQueues[$partition];
- try {
- $ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
- } catch ( JobQueueError $e ) {
- $ok = false;
- wfDebugLog( 'exception', $e->getLogMessage() );
- }
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
- } else {
- unset( $partitionsTry[$partition] ); // blacklist partition
- $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
+ $partitionRing = $partitionRing->newWithoutLocation( $partition ); // blacklist
+ if ( !$partitionRing ) {
+ throw new JobQueueError( "Could not insert job(s), all partitions are down." );
}
+ $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
}
}
return false;
}
- $partitionsTry = $this->getPartitionMap(); // (partition => weight)
+ $partitionsTry = $this->partitionMap; // (partition => weight)
while ( count( $partitionsTry ) ) {
$partition = ArrayUtils::pickRandom( $partitionsTry );
protected function doIsRootJobOldDuplicate( Job $job ) {
$params = $job->getRootJobParams();
- $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+ $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
try {
return $this->partitionQueues[$partitions[0]]->doIsRootJobOldDuplicate( $job );
} catch ( MWException $e ) {
protected function doDeduplicateRootJob( Job $job ) {
$params = $job->getRootJobParams();
- $partitions = $this->partitionRing->getLocations( $params['rootJobSignature'], 2 );
+ $partitions = $this->partitionPushRing->getLocations( $params['rootJobSignature'], 2 );
try {
return $this->partitionQueues[$partitions[0]]->doDeduplicateRootJob( $job );
} catch ( MWException $e ) {
}
public function getCoalesceLocationInternal() {
- return "JobQueueFederated:wiki:" . $this->wiki;
+ return "JobQueueFederated:wiki:{$this->wiki}" .
+ sha1( serialize( array_keys( $this->partitionMap ) ) );
}
protected function doGetSiblingQueuesWithJobs( array $types ) {
$result = array();
foreach ( $this->partitionQueues as $queue ) {
- $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
- if ( is_array( $nonEmpty ) ) {
- $result = array_merge( $result, $nonEmpty );
- } else {
- return null; // not supported on all partitions; bail
+ try {
+ $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
+ if ( is_array( $nonEmpty ) ) {
+ $result = array_merge( $result, $nonEmpty );
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
}
}
return array_values( array_unique( $result ) );
protected function doGetSiblingQueueSizes( array $types ) {
$result = array();
foreach ( $this->partitionQueues as $queue ) {
- $sizes = $queue->doGetSiblingQueueSizes( $types );
- if ( is_array( $sizes ) ) {
- foreach ( $sizes as $type => $size ) {
- $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+ try {
+ $sizes = $queue->doGetSiblingQueueSizes( $types );
+ if ( is_array( $sizes ) ) {
+ foreach ( $sizes as $type => $size ) {
+ $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+ }
+ } else {
+ return null; // not supported on all partitions; bail
}
- } else {
- return null; // not supported on all partitions; bail
+ } catch ( JobQueueError $e ) {
+ wfDebugLog( 'exception', $e->getLogMessage() );
}
}
return $result;
}
}
- /**
- * @return Array Map of (partition name => weight)
- */
- protected function getPartitionMap() {
- $section = isset( $this->sectionsByWiki[$this->wiki] )
- ? $this->sectionsByWiki[$this->wiki]
- : 'default';
- if ( !isset( $this->partitionsBySection[$section] ) ) {
- throw new MWException( "No configuration for section '$section'." );
- }
- return $this->partitionsBySection[$section];
- }
-
- /**
- * @return bool The queue is actually split up across multiple queue partitions
- */
- protected function isFederated() {
- return ( count( $this->getPartitionMap() ) > 1 );
- }
-
/**
* @return string
*/