/**
* Class to handle tracking information about all queues using PhpRedis
*
+ * The mediawiki/services/jobrunner background service must be set up and running.
+ *
* @ingroup JobQueue
* @ingroup Redis
* @since 1.21
protected $redisPool;
/** @var array List of Redis server addresses */
protected $servers;
- /** @var bool */
- protected $registeredQueue = false;
/**
* @param array $params Possible keys:
}
protected function doNotifyQueueEmpty( $wiki, $type ) {
- $conn = $this->getConnection();
- if ( !$conn ) {
- return false;
- }
- try {
- $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
-
- return true;
- } catch ( RedisException $e ) {
- $this->handleException( $conn, $e );
-
- return false;
- }
+ return true; // managed by the service
}
protected function doNotifyQueueNonEmpty( $wiki, $type ) {
- $conn = $this->getConnection();
- if ( !$conn ) {
- return false;
- }
- try {
- $conn->multi( Redis::PIPELINE );
- if ( !$this->registeredQueue ) {
- // Make sure the queue is registered as existing
- $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
- $conn->sAdd( $this->getWikiSetKey(), $wiki );
- }
- $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
- $conn->exec();
-
- $this->registeredQueue = true;
-
- return true;
- } catch ( RedisException $e ) {
- $this->handleException( $conn, $e );
-
- return false;
- }
+ return true; // managed by the service
}
protected function doGetAllReadyWikiQueues() {
unset( $map['_epoch'] ); // ignore
$pendingDBs = array(); // (type => list of wikis)
foreach ( $map as $key => $time ) {
- list( $type, $wiki ) = $this->dencQueueName( $key );
+ list( $type, $wiki ) = $this->decodeQueueName( $key );
$pendingDBs[$type][] = $wiki;
}
} else {
- // Avoid duplicated effort
- $rand = wfRandomString( 32 );
- $conn->multi( Redis::MULTI );
- $conn->setex( "{$rand}:lock", 3600, 1 );
- $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" );
- if ( $conn->exec() !== array( true, true ) ) { // lock
- $conn->delete( "{$rand}:lock" );
- return array(); // already in progress
- }
-
- $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
-
- $conn->multi( Redis::PIPELINE );
- $now = time();
- $map = array( '_epoch' => time() ); // dummy key for empty Redis collections
- foreach ( $pendingDBs as $type => $wikis ) {
- $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
- foreach ( $wikis as $wiki ) {
- $map[$this->encQueueName( $type, $wiki )] = $now;
- }
- }
- $conn->hMSet( $this->getReadyQueueKey(), $map );
- $conn->exec();
-
- $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
+ throw new UnexpectedValueException(
+ "No queue listing found; make sure redisJobChronService is running."
+ );
}
return $pendingDBs;
} catch ( RedisException $e ) {
- $this->handleException( $conn, $e );
+ $this->redisPool->handleError( $conn, $e );
return array();
}
}
protected function doPurge() {
- $conn = $this->getConnection();
- if ( !$conn ) {
- return false;
- }
- try {
- $conn->delete( $this->getReadyQueueKey() );
- // leave key at getQueueTypesKey() alone
- } catch ( RedisException $e ) {
- $this->handleException( $conn, $e );
-
- return false;
- }
-
- return true;
+ return true; // fully and only refreshed by the service
}
/**
return $conn;
}
- /**
- * @param RedisConnRef $conn
- * @param RedisException $e
- * @return void
- */
- protected function handleException( RedisConnRef $conn, $e ) {
- $this->redisPool->handleError( $conn, $e );
- }
-
/**
* @return string
*/
return "jobqueue:aggregator:h-ready-queues:v2"; // global
}
- /**
- * @return string
- */
- private function getQueueTypesKey() {
- return "jobqueue:aggregator:h-queue-types:v2"; // global
- }
-
- /**
- * @return string
- */
- private function getWikiSetKey() {
- return "jobqueue:aggregator:s-wikis:v2"; // global
- }
-
- /**
- * @param string $type
- * @param string $wiki
- * @return string
- */
- private function encQueueName( $type, $wiki ) {
- return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
- }
-
/**
* @param string $name
* @return string
*/
- private function dencQueueName( $name ) {
+ private function decodeQueueName( $name ) {
list( $type, $wiki ) = explode( '/', $name, 2 );
return array( rawurldecode( $type ), rawurldecode( $wiki ) );