protected $server;
/** @var string Compression method to use */
protected $compression;
- /** @var bool */
- protected $daemonized;
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
$this->server = $params['redisServer'];
$this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
- $this->daemonized = !empty( $params['daemonized'] );
+ if ( empty( $params['daemonized'] ) ) {
+ throw new Exception(
+ "Non-daemonized mode is no longer supported. Please install the " .
+ "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
+ }
$this->checkDelay = true; // always enabled
}
protected function doPop() {
$job = false;
- // Push ready delayed jobs into the queue every 10 jobs to spread the load.
- // This is also done as a periodic task, but we don't want too much done at once.
- if ( !$this->daemonized && mt_rand( 0, 9 ) == 0 ) {
- $this->recyclePruneAndUndelayJobs();
- }
-
$conn = $this->getConnection();
try {
do {
- // Keep the claimed job list down for high-traffic queues
- if ( !$this->daemonized && mt_rand( 0, 99 ) == 0 ) {
- $this->recyclePruneAndUndelayJobs();
- }
$blob = $this->popAndAcquireBlob( $conn );
if ( !is_string( $blob ) ) {
break; // no jobs; nothing to do
continue;
}
- // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
+ // If $item is invalid, the runner loop recyling will cleanup as needed
$job = $this->getJobFromFields( $item ); // may be false
} while ( !$job ); // job may be false if invalid
} catch ( RedisException $e ) {
}
}
- /**
- * Recycle or destroy any jobs that have been claimed for too long
- * and release any ready delayed jobs into the queue
- *
- * @return int Number of jobs recycled/deleted/undelayed
- * @throws MWException|JobQueueError
- */
- public function recyclePruneAndUndelayJobs() {
- $count = 0;
- // For each job item that can be retried, we need to add it back to the
- // main queue and remove it from the list of currenty claimed job items.
- // For those that cannot, they are marked as dead and kept around for
- // investigation and manual job restoration but are eventually deleted.
- $conn = $this->getConnection();
- try {
- $now = time();
- static $script =
-<<<LUA
- local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
- local released,abandoned,pruned,undelayed = 0,0,0,0
- -- Get all non-dead jobs that have an expired claim on them.
- -- The score for each item is the last claim timestamp (UNIX).
- local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
- for k,id in ipairs(staleClaims) do
- local timestamp = redis.call('zScore',kClaimed,id)
- local attempts = redis.call('hGet',kAttempts,id)
- if attempts < ARGV[3] then
- -- Claim expired and retries left: re-enqueue the job
- redis.call('lPush',kUnclaimed,id)
- released = released + 1
- else
- -- Claim expired and no retries left: mark the job as dead
- redis.call('zAdd',kAbandoned,timestamp,id)
- abandoned = abandoned + 1
- end
- redis.call('zRem',kClaimed,id)
- end
- -- Get all of the dead jobs that have been marked as dead for too long.
- -- The score for each item is the last claim timestamp (UNIX).
- local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
- for k,id in ipairs(deadClaims) do
- -- Stale and out of retries: remove any traces of the job
- redis.call('zRem',kAbandoned,id)
- redis.call('hDel',kAttempts,id)
- redis.call('hDel',kData,id)
- pruned = pruned + 1
- end
- -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
- local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
- -- Migrate the jobs from the "delayed" set to the "unclaimed" list
- for k,id in ipairs(ids) do
- redis.call('lPush',kUnclaimed,id)
- redis.call('zRem',kDelayed,id)
- end
- undelayed = #ids
- return {released,abandoned,pruned,undelayed}
-LUA;
- $res = $conn->luaEval( $script,
- array(
- $this->getQueueKey( 'z-claimed' ), # KEYS[1]
- $this->getQueueKey( 'h-attempts' ), # KEYS[2]
- $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
- $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
- $this->getQueueKey( 'z-delayed' ), # KEYS[6]
- $now - $this->claimTTL, # ARGV[1]
- $now - self::MAX_AGE_PRUNE, # ARGV[2]
- $this->maxTries, # ARGV[3]
- $now # ARGV[4]
- ),
- 6 # number of first argument(s) that are keys
- );
- if ( $res ) {
- list( $released, $abandoned, $pruned, $undelayed ) = $res;
- $count += $released + $pruned + $undelayed;
- JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki );
- JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki );
- JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki );
- }
- } catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
- }
-
- return $count;
- }
-
/**
* @return array
*/
protected function doGetPeriodicTasks() {
- if ( $this->daemonized ) {
- return array(); // managed in the runner loop
- }
- $periods = array( 300 ); // 5 min; delayed/stale jobs
- if ( $this->claimTTL > 0 ) {
- $periods[] = ceil( $this->claimTTL / 2 ); // halved to avoid bad timing
- }
- $period = min( $periods );
- $period = max( $period, 30 ); // sanity
-
- return array(
- 'recyclePruneAndUndelayJobs' => array(
- 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
- 'period' => $period,
- )
- );
+ return array(); // managed in the runner loop
}
/**