/** @var array (partition name => JobQueue) reverse sorted by weight */
protected $partitionQueues = array();
- /** @var BagOStuff */
- protected $cache;
-
/** @var int Maximum number of partitions to try */
protected $maxPartitionsTry;
- const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
-
/**
* @param array $params Possible keys:
* - sectionsByWiki : A map of wiki IDs to section names.
} else {
$this->partitionPushRing = new HashRing( $partitionPushMap );
}
- // Aggregate cache some per-queue values if there are multiple partition queues
- $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
}
protected function supportedOrders() {
}
protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
$empty = true;
$failed = 0;
foreach ( $this->partitionQueues as $queue ) {
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
return $empty;
}
* @return int
*/
protected function getCrossPartitionSum( $type, $method ) {
- $key = $this->getCacheKey( $type );
-
- $count = $this->cache->get( $key );
- if ( $count !== false ) {
- return $count;
- }
-
+ $count = 0;
$failed = 0;
foreach ( $this->partitionQueues as $queue ) {
try {
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
-
return $count;
}
$ok = false;
MWExceptionHandler::logException( $e );
}
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
- } else {
+ if ( !$ok ) {
if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
$ok = false;
MWExceptionHandler::logException( $e );
}
- if ( $ok ) {
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
- } else {
+ if ( !$ok ) {
if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
throw new JobQueueError( "Could not insert job(s), no partitions available." );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
- $key = $this->getCacheKey( 'empty' );
- $this->cache->set( $key, 'true', self::CACHE_TTL_LONG );
-
return false;
}
'abandonedcount'
);
- foreach ( $types as $type ) {
- $this->cache->delete( $this->getCacheKey( $type ) );
- }
-
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$queue->doFlushCaches();
$queue->setTestingPrefix( $key );
}
}
-
- /**
- * @param string $property
- * @return string
- */
- private function getCacheKey( $property ) {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
- }
}