// Push ready delayed jobs into the queue every 10 jobs to spread the load.
// This is also done as a periodic task, but we don't want too much done at once.
if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
- $this->releaseReadyDelayedJobs();
+ $this->recyclePruneAndUndelayJobs();
}
$conn = $this->getConnection();
if ( $this->claimTTL > 0 ) {
// Keep the claimed job list down for high-traffic queues
if ( mt_rand( 0, 99 ) == 0 ) {
- $this->recycleAndDeleteStaleJobs();
+ $this->recyclePruneAndUndelayJobs();
}
$blob = $this->popAndAcquireBlob( $conn );
} else {
continue;
}
- // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
+ // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
$job = $this->getJobFromFields( $item ); // may be false
} while ( !$job ); // job may be false if invalid
} catch ( RedisException $e ) {
}
}
- /**
- * Release any ready delayed jobs into the queue
- *
- * @return int Number of jobs released
- * @throws JobQueueError
- */
- public function releaseReadyDelayedJobs() {
- $count = 0;
-
- $conn = $this->getConnection();
- try {
- static $script =
-<<<LUA
- local kDelayed, kUnclaimed = unpack(KEYS)
- -- Get the list of ready delayed jobs, sorted by readiness
- local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[1])
- -- Migrate the jobs from the "delayed" set to the "unclaimed" list
- for k,id in ipairs(ids) do
- redis.call('lPush',kUnclaimed,id)
- redis.call('zRem',kDelayed,id)
- end
- return #ids
-LUA;
- $count += (int)$conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-delayed' ), // KEYS[1]
- $this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
- time() // ARGV[1]; max "delay until" UNIX timestamp
- ),
- 2 # first two arguments are keys
- );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
-
- return $count;
- }
-
/**
* Recycle or destroy any jobs that have been claimed for too long
+ * and release any ready delayed jobs into the queue
*
- * @return int Number of jobs recycled/deleted
+ * @return int Number of jobs recycled/deleted/undelayed
* @throws MWException|JobQueueError
*/
- public function recycleAndDeleteStaleJobs() {
- if ( $this->claimTTL <= 0 ) { // sanity
- throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
- }
+ public function recyclePruneAndUndelayJobs() {
$count = 0;
// For each job item that can be retried, we need to add it back to the
// main queue and remove it from the list of currenty claimed job items.
$now = time();
static $script =
<<<LUA
- local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned = unpack(KEYS)
- local released,abandoned,pruned = 0,0,0
+ local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
+ local released,abandoned,pruned,undelayed = 0,0,0,0
-- Get all non-dead jobs that have an expired claim on them.
-- The score for each item is the last claim timestamp (UNIX).
local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
redis.call('hDel',kData,id)
pruned = pruned + 1
end
- return {released,abandoned,pruned}
+ -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
+ local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
+ -- Migrate the jobs from the "delayed" set to the "unclaimed" list
+ for k,id in ipairs(ids) do
+ redis.call('lPush',kUnclaimed,id)
+ redis.call('zRem',kDelayed,id)
+ end
+ undelayed = #ids
+ return {released,abandoned,pruned,undelayed}
LUA;
$res = $conn->luaEval( $script,
array(
$this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
$this->getQueueKey( 'h-data' ), # KEYS[4]
$this->getQueueKey( 'z-abandoned' ), # KEYS[5]
+ $this->getQueueKey( 'z-delayed' ), # KEYS[6]
$now - $this->claimTTL, # ARGV[1]
$now - self::MAX_AGE_PRUNE, # ARGV[2]
- $this->maxTries # ARGV[3]
+ $this->maxTries, # ARGV[3]
+ $now # ARGV[4]
),
- 5 # number of first argument(s) that are keys
+ 6 # number of first argument(s) that are keys
);
if ( $res ) {
- list( $released, $abandoned, $pruned ) = $res;
- $count += $released + $pruned;
+ list( $released, $abandoned, $pruned, $undelayed ) = $res;
+ $count += $released + $pruned + $undelayed;
JobQueue::incrStats( 'job-recycle', $this->type, $released );
JobQueue::incrStats( 'job-abandon', $this->type, $abandoned );
}
* @return array
*/
protected function doGetPeriodicTasks() {
- $tasks = array();
+ $periods = array( 3600 ); // standard cleanup (useful on config change)
if ( $this->claimTTL > 0 ) {
- $tasks['recycleAndDeleteStaleJobs'] = array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- );
+ $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
}
if ( $this->checkDelay ) {
- $tasks['releaseReadyDelayedJobs'] = array(
- 'callback' => array( $this, 'releaseReadyDelayedJobs' ),
- 'period' => 300 // 5 minutes
- );
+ $periods[] = 300; // 5 minutes
}
-
- return $tasks;
+ return array(
+ 'recyclePruneAndUndelayJobs' => array(
+ 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
+ 'period' => max( min( $periods ), 30 ) // sanity
+ )
+ );
}
/**