* @throws JobQueueError
*/
protected function doGetAcquiredCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
$conn = $this->getConnection();
try {
$conn->multi( Redis::PIPELINE );
* @throws JobQueueError
*/
protected function doGetAbandonedCount() {
- if ( $this->claimTTL <= 0 ) {
- return 0; // no acknowledgements
- }
$conn = $this->getConnection();
try {
return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
$conn = $this->getConnection();
try {
do {
- if ( $this->claimTTL > 0 ) {
- // Keep the claimed job list down for high-traffic queues
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->recyclePruneAndUndelayJobs();
- }
- $blob = $this->popAndAcquireBlob( $conn );
- } else {
- $blob = $this->popAndDeleteBlob( $conn );
+ // Keep the claimed job list down for high-traffic queues
+ if ( !$this->daemonized && mt_rand( 0, 99 ) == 0 ) {
+ $this->recyclePruneAndUndelayJobs();
}
+ $blob = $this->popAndAcquireBlob( $conn );
if ( !is_string( $blob ) ) {
break; // no jobs; nothing to do
}
return $job;
}
- /**
- * @param RedisConnRef $conn
- * @return array Serialized string or false
- * @throws RedisException
- */
- protected function popAndDeleteBlob( RedisConnRef $conn ) {
- static $script =
-<<<LUA
- local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
- -- Pop an item off the queue
- local id = redis.call('rpop',kUnclaimed)
- if not id then return false end
- -- Get the job data and remove it
- local item = redis.call('hGet',kData,id)
- redis.call('hDel',kData,id)
- -- Allow new duplicates of this job
- local sha1 = redis.call('hGet',kSha1ById,id)
- if sha1 then redis.call('hDel',kIdBySha1,sha1) end
- redis.call('hDel',kSha1ById,id)
- -- Return the job data
- return item
-LUA;
- return $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
- $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
- $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
- ),
- 4 # number of first argument(s) that are keys
- );
- }
-
/**
* @param RedisConnRef $conn
* @return array Serialized string or false
if ( !isset( $job->metadata['uuid'] ) ) {
throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
}
- if ( $this->claimTTL > 0 ) {
- $conn = $this->getConnection();
- try {
- static $script =
+
+ $conn = $this->getConnection();
+ try {
+ static $script =
<<<LUA
- local kClaimed, kAttempts, kData = unpack(KEYS)
- -- Unmark the job as claimed
- redis.call('zRem',kClaimed,ARGV[1])
- redis.call('hDel',kAttempts,ARGV[1])
- -- Delete the job data itself
- return redis.call('hDel',kData,ARGV[1])
+ local kClaimed, kAttempts, kData = unpack(KEYS)
+ -- Unmark the job as claimed
+ redis.call('zRem',kClaimed,ARGV[1])
+ redis.call('hDel',kAttempts,ARGV[1])
+ -- Delete the job data itself
+ return redis.call('hDel',kData,ARGV[1])
LUA;
- $res = $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-claimed' ), # KEYS[1]
- $this->getQueueKey( 'h-attempts' ), # KEYS[2]
- $this->getQueueKey( 'h-data' ), # KEYS[3]
- $job->metadata['uuid'] # ARGV[1]
- ),
- 3 # number of first argument(s) that are keys
- );
-
- if ( !$res ) {
- wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
-
- return false;
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ $res = $conn->luaEval( $script,
+ array(
+ $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+ $this->getQueueKey( 'h-data' ), # KEYS[3]
+ $job->metadata['uuid'] # ARGV[1]
+ ),
+ 3 # number of first argument(s) that are keys
+ );
+
+ if ( !$res ) {
+ wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+
+ return false;
}
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
}
return true;
}
$periods = array( 3600 ); // standard cleanup (useful on config change)
if ( $this->claimTTL > 0 ) {
- $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
+ $periods[] = ceil( $this->claimTTL / 2 ); // halved to avoid bad timing
}
if ( $this->checkDelay ) {
$periods[] = 300; // 5 minutes