class JobQueueAggregatorRedis extends JobQueueAggregator {
/** @var RedisConnectionPool */
protected $redisPool;
-
/** @var array List of Redis server addresses */
protected $servers;
+ /** @var bool */
+ protected $registeredQueue = false;
/**
* @param array $params Possible keys:
return false;
}
try {
- // Make sure doNotifyQueueNonEmpty() takes precedence to avoid races
- $conn->watch( $this->getReadyQueueKey() );
- $conn->multi()
- ->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) )
- ->exec();
+ $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
return true;
} catch ( RedisException $e ) {
}
try {
$conn->multi( Redis::PIPELINE );
- $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
- $conn->sAdd( $this->getWikiSetKey(), $wiki );
+ if ( !$this->registeredQueue ) {
+ // Make sure the queue is registered as existing
+ $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
+ $conn->sAdd( $this->getWikiSetKey(), $wiki );
+ }
$conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
$conn->exec();
+ $this->registeredQueue = true;
+
return true;
} catch ( RedisException $e ) {
$this->handleException( $conn, $e );