foreach ( $props as $prop ) {
$keys[] = $this->getQueueKey( $prop );
}
- $res = ( $conn->delete( $keys ) !== false );
+ return ( $conn->delete( $keys ) !== false );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
$conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
function( $uid ) use ( $that, $conn ) {
return $that->getJobFromUidInternal( $uid, $conn );
- }
+ },
+ array( 'accept' => function ( $job ) { return is_object( $job ); } )
);
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
$conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
function( $uid ) use ( $that, $conn ) {
return $that->getJobFromUidInternal( $uid, $conn );
- }
+ },
+ array( 'accept' => function ( $job ) { return is_object( $job ); } )
);
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
}
+ public function getCoalesceLocationInternal() {
+ return "RedisServer:" . $this->server;
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $sizes = array(); // (type => size)
+ $types = array_values( $types ); // reindex
+ try {
+ $conn = $this->getConnection();
+ $conn->multi( Redis::PIPELINE );
+ foreach ( $types as $type ) {
+ $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
+ }
+ $res = $conn->exec();
+ if ( is_array( $res ) ) {
+ foreach ( $res as $i => $size ) {
+ $sizes[$types[$i]] = $size;
+ }
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ return $sizes;
+ }
+
/**
* This function should not be called outside JobQueueRedis
*
* @param $uid string
* @param $conn RedisConnRef
- * @return Job
+ * @return Job|bool Returns false if the job does not exist
* @throws MWException
*/
public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
try {
+ $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
+ if ( $data === false ) {
+ return false; // not found
+ }
$item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
if ( !is_array( $item ) ) { // this shouldn't happen
throw new MWException( "Could not find job with ID '$uid'." );
protected function getConnection() {
$conn = $this->redisPool->getConnection( $this->server );
if ( !$conn ) {
- throw new MWException( "Unable to connect to redis server." );
+ throw new JobQueueConnectionError( "Unable to connect to redis server." );
}
return $conn;
}
*/
protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
$this->redisPool->handleException( $server, $conn, $e );
- throw new MWException( "Redis server error: {$e->getMessage()}\n" );
+ throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
}
/**
* @param $prop string
+ * @param $type string|null
* @return string
*/
- private function getQueueKey( $prop ) {
+ private function getQueueKey( $prop, $type = null ) {
+ $type = is_string( $type ) ? $type : $this->type;
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
if ( strlen( $this->key ) ) { // namespaced queue (for testing)
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
} else {
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
}
}