*
* This is a faster and less resource-intensive job queue than JobQueueDB.
* All data for a queue using this class is placed into one redis server.
+ * The mediawiki/services/jobrunner background service must be set up and running.
*
- * There are eight main redis keys used to track jobs:
+ * There are eight main redis keys (per queue) used to track jobs:
* - l-unclaimed : A list of job IDs used for ready unclaimed jobs
* - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries
* - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
* entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
* ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
*
+ * The following keys are used to track queue states:
+ * - s-queuesWithJobs : A set of all queues with non-abandoned jobs
+ *
+ * The background service takes care of undelaying, recycling, and pruning jobs as well as
+ * removing s-queuesWithJobs entries as queues empty.
+ *
* Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
* Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
* All the keys are prefixed with the relevant wiki ID information.
$failed += count( $itemBatch );
}
}
- if ( $failed > 0 ) {
- wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
-
- throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
- }
JobQueue::incrStats( 'inserts', $this->type, count( $items ) );
+ JobQueue::incrStats( 'inserts_actual', $this->type, $pushed );
JobQueue::incrStats( 'dupe_inserts', $this->type,
count( $items ) - $failed - $pushed );
+ if ( $failed > 0 ) {
+ $err = "Could not insert {$failed} {$this->type} job(s).";
+ wfDebugLog( 'JobQueueRedis', $err );
+ throw new RedisException( $err );
+ }
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
* @throws RedisException
*/
protected function pushBlobs( RedisConnRef $conn, array $items ) {
- $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
+ $args = array( $this->encodeQueueName() );
+ // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
foreach ( $items as $item ) {
$args[] = (string)$item['uuid'];
$args[] = (string)$item['sha1'];
}
static $script =
<<<LUA
- local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
- if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
+ local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
+ -- First argument is the queue ID
+ local queueId = ARGV[1]
+ -- Next arguments all come in 4s (one per job)
+ local variadicArgCount = #ARGV - 1
+ if variadicArgCount % 4 ~= 0 then
+ return redis.error_reply('Unmatched arguments')
+ end
+ -- Insert each job into this queue as needed
local pushed = 0
- for i = 1,#ARGV,4 do
+ for i = 2,#ARGV,4 do
local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
if 1*rtimestamp > 0 then
pushed = pushed + 1
end
end
+ -- Mark this queue as having jobs
+ redis.call('sAdd',kQwJobs,queueId)
return pushed
LUA;
return $conn->luaEval( $script,
$this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
$this->getQueueKey( 'z-delayed' ), # KEYS[4]
$this->getQueueKey( 'h-data' ), # KEYS[5]
+ $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
),
$args
),
- 5 # number of first argument(s) that are keys
+ 6 # number of first argument(s) that are keys
);
}
break; // no jobs; nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type );
+ JobQueue::incrStats( 'pops', $this->type );
$item = $this->unserialize( $blob );
if ( $item === false ) {
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
static $script =
<<<LUA
local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
+ local rTime = unpack(ARGV)
-- Pop an item off the queue
local id = redis.call('rPop',kUnclaimed)
- if not id then return false end
+ if not id then
+ return false
+ end
-- Allow new duplicates of this job
local sha1 = redis.call('hGet',kSha1ById,id)
if sha1 then redis.call('hDel',kIdBySha1,sha1) end
redis.call('hDel',kSha1ById,id)
-- Mark the jobs as claimed and return it
- redis.call('zAdd',kClaimed,ARGV[1],id)
+ redis.call('zAdd',kClaimed,rTime,id)
redis.call('hIncrBy',kAttempts,id,1)
return redis.call('hGet',kData,id)
LUA;
static $script =
<<<LUA
local kClaimed, kAttempts, kData = unpack(KEYS)
+ local uuid = unpack(ARGV)
-- Unmark the job as claimed
- redis.call('zRem',kClaimed,ARGV[1])
- redis.call('hDel',kAttempts,ARGV[1])
+ redis.call('zRem',kClaimed,uuid)
+ redis.call('hDel',kAttempts,uuid)
-- Delete the job data itself
- return redis.call('hDel',kData,ARGV[1])
+ return redis.call('hDel',kData,uuid)
LUA;
$res = $conn->luaEval( $script,
array(
return false;
}
- JobQueue::incrStats( 'job-ack', $this->type );
+ JobQueue::incrStats( 'acks', $this->type );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
$keys[] = $this->getQueueKey( $prop );
}
- return ( $conn->delete( $keys ) !== false );
+ $ok = ( $conn->delete( $keys ) !== false );
+ $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
+
+ return $ok;
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
$job = Job::factory( $item['type'], $title, $item['params'] );
$job->metadata['uuid'] = $item['uuid'];
$job->metadata['timestamp'] = $item['timestamp'];
+ // Add in attempt count for debugging at showJobs.php
+ $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid );
return $job;
} catch ( RedisException $e ) {
}
}
+ /**
+ * @return array List of (wiki,type) tuples for queues with non-abandoned jobs
+ * @throws JobQueueConnectionError
+ * @throws JobQueueError
+ */
+ public function getServerQueuesWithJobs() {
+ $queues = array();
+
+ $conn = $this->getConnection();
+ try {
+ $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
+ foreach ( $set as $queue ) {
+ $queues[] = $this->decodeQueueName( $queue );
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $queues;
+ }
+
/**
* @param IJobSpecification $job
* @return array
// Additional job metadata
'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
'sha1' => $job->ignoreDuplicates()
- ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
+ ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
: '',
'timestamp' => time() // UNIX timestamp
);
throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
}
+ /**
+ * @return string JSON
+ */
+ private function encodeQueueName() {
+ return json_encode( array( $this->type, $this->wiki ) );
+ }
+
+ /**
+ * @param string $name JSON
+ * @return array (type, wiki)
+ */
+ private function decodeQueueName( $name ) {
+ return json_decode( $name );
+ }
+
+ /**
+ * @param string $name
+ * @return string
+ */
+ private function getGlobalKey( $name ) {
+ $parts = array( 'global', 'jobqueue', $name );
+ foreach ( $parts as $part ) {
+ if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
+ throw new InvalidArgumentException( "Key part characters are out of range." );
+ }
+ }
+
+ return implode( ':', $parts );
+ }
+
/**
* @param string $prop
* @param string|null $type
private function getQueueKey( $prop, $type = null ) {
$type = is_string( $type ) ? $type : $this->type;
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- if ( strlen( $this->key ) ) { // namespaced queue (for testing)
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
- } else {
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
- }
- }
- /**
- * @param string $key
- * @return void
- */
- public function setTestingPrefix( $key ) {
- $this->key = $key;
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
}
}