$this->servers = isset( $params['redisServers'] )
? $params['redisServers']
: array( $params['redisServer'] ); // b/c
+ $params['redisConfig']['serializer'] = 'none';
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
}
return array();
}
try {
- $conn->multi( Redis::PIPELINE );
- $conn->exists( $this->getReadyQueueKey() );
- $conn->hGetAll( $this->getReadyQueueKey() );
- list( $exists, $map ) = $conn->exec();
+ $map = $conn->hGetAll( $this->getReadyQueueKey() );
- if ( $exists ) { // cache hit
+ if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
+ unset( $map['_epoch'] ); // ignore
$pendingDBs = array(); // (type => list of wikis)
foreach ( $map as $key => $time ) {
list( $type, $wiki ) = $this->dencQueueName( $key );
$pendingDBs[$type][] = $wiki;
}
- } else { // cache miss
+ } else {
// Avoid duplicated effort
$rand = wfRandomString( 32 );
$conn->multi( Redis::MULTI );
$conn->multi( Redis::PIPELINE );
$now = time();
- $map = array();
+ $map = array( '_epoch' => time() ); // dummy key for empty Redis collections
foreach ( $pendingDBs as $type => $wikis ) {
$conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
foreach ( $wikis as $wiki ) {
* @return string
*/
private function getReadyQueueKey() {
- return "jobqueue:aggregator:h-ready-queues:v1"; // global
+ return "jobqueue:aggregator:h-ready-queues:v2"; // global
}
/**
* @return string
*/
private function getQueueTypesKey() {
- return "jobqueue:aggregator:h-queue-types:v1"; // global
+ return "jobqueue:aggregator:h-queue-types:v2"; // global
}
/**