/**
* Class to handle job queues stored in Redis
*
+ * This is faster, less resource intensive, queue that JobQueueDB.
+ * All data for a queue using this class is placed into one redis server.
+ *
+ * There are seven main redis keys used to track jobs:
+ * - l-unclaimed : A list of job IDs used for push/pop
+ * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries
+ * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
+ * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
+ * - h-sha1Byid : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
+ * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries
+ * - h-data : A hash of (job ID => serialized blobs) for job storage
+ * Any given job ID can be in only one of l-unclaimed, z-claimed, and z-abandoned.
+ * If an ID appears in any of those lists, it should have a h-data entry for its ID.
+ * If a job has a non-empty SHA1 de-duplication value and its ID is in l-unclaimed,
+ * then there should be no other such jobs. Every h-idBySha1 entry has an h-sha1Byid
+ * entry and every h-sha1Byid must refer to an ID that is l-unclaimed. If a job has its
+ * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
+ *
+ * Additionally, "rootjob:* keys to track "root jobs" used for additional de-duplication.
+ * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
+ * All the keys are prefixed with the relevant wiki ID information.
+ *
+ * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations.
+ * Additionally, it should be noted that redis has different persistence modes, such
+ * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be
+ * made on the servers based on what queues are using it and what tolerance they have.
+ *
* @ingroup JobQueue
* @since 1.21
*/
/**
* @params include:
* - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+ * Note that the serializer option is ignored "none" is always used.
* - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
* If a hostname is specified but no port, the standard port number
* 6379 will be used. Required.
*/
public function __construct( array $params ) {
parent::__construct( $params );
+ $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
$this->server = $params['redisServer'];
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
}
* @throws MWException
*/
protected function doIsEmpty() {
- $conn = $this->getConnection();
- try {
- return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
+ return $this->doGetSize() == 0;
}
/**
}
$conn = $this->getConnection();
try {
- return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
+ $conn->multi( Redis::PIPELINE );
+ $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
+ $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
+ return array_sum( $conn->exec() );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
* @throws MWException
*/
protected function doBatchPush( array $jobs, $flags ) {
- if ( !count( $jobs ) ) {
- return true;
- }
-
- // Convert the jobs into a list of field maps
- $items = array(); // (uid => job fields map)
+ // Convert the jobs into field maps (de-duplicated against each other)
+ $items = array(); // (job ID => job fields map)
foreach ( $jobs as $job ) {
$item = $this->getNewJobFields( $job );
- $items[$item['uid']] = $item;
+ if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
+ $items[$item['sha1']] = $item;
+ } else {
+ $items[$item['uuid']] = $item;
+ }
}
-
- $dedupUids = array(); // list of uids to check for duplicates
+ // Convert the field maps into serialized blobs
+ $tuples = array();
foreach ( $items as $item ) {
- if ( $this->isHashUid( $item['uid'] ) ) { // hash identifier => de-duplicate
- $dedupUids[] = $item['uid'];
- }
+ $tuples[] = array( $item['uuid'], $item['sha1'], serialize( $item ) );
+ }
+
+ if ( !count( $tuples ) ) {
+ return true; // nothing to do
}
$conn = $this->getConnection();
try {
- // Find which of these jobs are duplicates of unclaimed jobs in the queue...
- if ( count( $dedupUids ) ) {
- $conn->multi( Redis::PIPELINE );
- foreach ( $dedupUids as $uid ) { // check if job data exists
- $conn->exists( $this->prefixWithQueueKey( 'data', $uid ) );
- }
- if ( $this->claimTTL > 0 ) { // check which jobs were claimed
- foreach ( $dedupUids as $uid ) {
- $conn->hExists( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime' );
- }
- list( $exists, $claimed ) = array_chunk( $conn->exec(), count( $dedupUids ) );
+ // Actually push the non-duplicate jobs into the queue...
+ if ( $flags & self::QoS_Atomic ) {
+ $batches = array( $tuples ); // all or nothing
+ } else {
+ $batches = array_chunk( $tuples, 500 ); // avoid tying up the server
+ }
+ $failed = 0;
+ $pushed = 0;
+ foreach ( $batches as $tupleBatch ) {
+ $added = $this->pushBlobs( $conn, $tupleBatch );
+ if ( is_int( $added ) ) {
+ $pushed += $added;
} else {
- $exists = $conn->exec();
- $claimed = array(); // no claim system
- }
- // Remove the duplicate jobs to cut down on pushing duplicate uids...
- foreach ( $dedupUids as $k => $uid ) {
- if ( $exists[$k] && empty( $claimed[$k] ) ) {
- unset( $items[$uid] );
- }
+ $failed += count( $tupleBatch );
}
}
- // Actually push the non-duplicate jobs into the queue...
- if ( count( $items ) ) {
- $uids = array_keys( $items );
- $conn->multi( Redis::MULTI ); // begin (atomic trx)
- $conn->mSet( $this->prefixKeysWithQueueKey( 'data', $items ) );
- call_user_func_array(
- array( $conn, 'lPush' ),
- array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uids )
- );
- $res = $conn->exec(); // commit (atomic trx)
- if ( in_array( false, $res, true ) ) {
- wfDebugLog( 'JobQueueRedis', "Could not insert {$this->type} job(s)." );
- return false;
- }
+ if ( $failed > 0 ) {
+ wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
+ return false;
}
- wfIncrStats( 'job-insert', count( $items ) );
- wfIncrStats( 'job-insert-duplicate', count( $jobs ) - count( $items ) );
+ wfIncrStats( 'job-insert', count( $tuples ) );
+ wfIncrStats( 'job-insert-duplicate', count( $tuples ) - $failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
return true;
}
+ /**
+ * @param RedisConnRef $conn
+ * @param array $tuples List of tuples of (job ID, job SHA1 or '', serialized blob)
+ * @return integer Number of jobs inserted (duplicates are ignored)
+ * @throws RedisException
+ */
+ protected function pushBlobs( RedisConnRef $conn, array $tuples ) {
+ $args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] )
+ foreach ( $tuples as $tuple ) {
+ $args[] = $tuple[0]; // id
+ $args[] = $tuple[1]; // sha1
+ $args[] = $tuple[2]; // blob
+ }
+ static $script =
+<<<LUA
+ if #ARGV % 3 ~= 0 then return redis.error_reply('Unmatched arguments') end
+ local pushed = 0
+ for i = 1,#ARGV,3 do
+ local id,sha1,blob = ARGV[i],ARGV[i+1],ARGV[i+2]
+ if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then
+ redis.call('lPush',KEYS[1],id)
+ if sha1 ~= '' then
+ redis.call('hSet',KEYS[2],id,sha1)
+ redis.call('hSet',KEYS[3],sha1,id)
+ end
+ redis.call('hSet',KEYS[4],id,blob)
+ pushed = pushed + 1
+ end
+ end
+ return pushed
+LUA;
+ return $this->redisEval( $conn, $script,
+ array_merge(
+ array(
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+ $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+ $this->getQueueKey( 'h-data' ), # KEYS[4]
+ ),
+ $args
+ ),
+ 4 # number of first argument(s) that are keys
+ );
+ }
+
/**
* @see JobQueue::doPop()
* @return Job|bool
protected function doPop() {
$job = false;
- if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
- $this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list
- }
-
$conn = $this->getConnection();
try {
do {
- // Atomically pop an item off the queue and onto the "claimed" list
- $uid = $conn->rpoplpush(
- $this->getQueueKey( 'l-unclaimed' ),
- $this->getQueueKey( 'l-claimed' )
- );
- if ( $uid === false ) {
+ if ( $this->claimTTL > 0 ) {
+ // Keep the claimed job list down for high-traffic queues
+ if ( mt_rand( 0, 99 ) == 0 ) {
+ $this->recycleAndDeleteStaleJobs();
+ }
+ $blob = $this->popAndAcquireBlob( $conn );
+ } else {
+ $blob = $this->popAndDeleteBlob( $conn );
+ }
+ if ( $blob === false ) {
break; // no jobs; nothing to do
}
wfIncrStats( 'job-pop' );
- $conn->multi( Redis::PIPELINE );
- $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
- if ( $this->claimTTL > 0 ) {
- // Set the claim timestamp metadata. If this step fails, then
- // the timestamp will be assumed to be the current timestamp by
- // recycleAndDeleteStaleJobs() as of the next time that it runs.
- // If two runners claim duplicate jobs, one will abort here.
- $conn->hSetNx( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', time() );
- } else {
- // If this fails, the message key will be deleted in cleanupClaimedJobs().
- // If two runners claim duplicate jobs, one of them will abort here.
- $conn->delete(
- $this->prefixWithQueueKey( 'h-meta', $uid ),
- $this->prefixWithQueueKey( 'data', $uid ) );
- }
- list( $item, $ok ) = $conn->exec();
- if ( $item === false || ( $this->claimTTL && !$ok ) ) {
- wfDebug( "Could not find or delete job $uid; probably was a duplicate." );
- continue; // job was probably a duplicate
+ $item = unserialize( $blob );
+ if ( $item === false ) {
+ wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
+ continue;
}
// If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
return $job;
}
+ /**
+ * @param RedisConnRef $conn
+ * @return array serialized string or false
+ * @throws RedisException
+ */
+ protected function popAndDeleteBlob( RedisConnRef $conn ) {
+ static $script =
+<<<LUA
+ -- Pop an item off the queue
+ local id = redis.call('rpop',KEYS[1])
+ if not id then return false end
+ -- Get the job data and remove it
+ local item = redis.call('hGet',KEYS[4],id)
+ redis.call('hDel',KEYS[4],id)
+ -- Allow new duplicates of this job
+ local sha1 = redis.call('hGet',KEYS[2],id)
+ if sha1 then redis.call('hDel',KEYS[3],sha1) end
+ redis.call('hDel',KEYS[2],id)
+ -- Return the job data
+ return item
+LUA;
+ return $this->redisEval( $conn, $script,
+ array(
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+ $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+ $this->getQueueKey( 'h-data' ), # KEYS[4]
+ ),
+ 4 # number of first argument(s) that are keys
+ );
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @return array serialized string or false
+ * @throws RedisException
+ */
+ protected function popAndAcquireBlob( RedisConnRef $conn ) {
+ static $script =
+<<<LUA
+ -- Pop an item off the queue
+ local id = redis.call('rPop',KEYS[1])
+ if not id then return false end
+ -- Allow new duplicates of this job
+ local sha1 = redis.call('hGet',KEYS[2],id)
+ if sha1 then redis.call('hDel',KEYS[3],sha1) end
+ redis.call('hDel',KEYS[2],id)
+ -- Mark the jobs as claimed and return it
+ redis.call('zAdd',KEYS[4],ARGV[1],id)
+ redis.call('hIncrBy',KEYS[5],id,1)
+ return redis.call('hGet',KEYS[6],id)
+LUA;
+ return $this->redisEval( $conn, $script,
+ array(
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+ $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+ $this->getQueueKey( 'z-claimed' ), # KEYS[4]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[5]
+ $this->getQueueKey( 'h-data' ), # KEYS[6]
+ time(), # ARGV[1] (injected to be replication-safe)
+ ),
+ 6 # number of first argument(s) that are keys
+ );
+ }
+
/**
* @see JobQueue::doAck()
* @param Job $job
// the job was transformed into a DuplicateJob or anything of the sort.
$item = $job->metadata['sourceFields'];
- $conn->multi( Redis::MULTI ); // begin (atomic trx)
- // Remove the first instance of this job scanning right-to-left.
- // This is O(N) in the worst case, but is likely to be much faster since
- // jobs are pushed to the left and we are starting from the right, where
- // the longest running jobs are likely to be. These should be the first
- // jobs to be acknowledged assuming that job run times are roughly equal.
- $conn->lRem( $this->getQueueKey( 'l-claimed' ), $item['uid'], -1 );
- // Delete the job data and its claim metadata
- $conn->delete(
- $this->prefixWithQueueKey( 'h-meta', $item['uid'] ),
- $this->prefixWithQueueKey( 'data', $item['uid'] ) );
- $res = $conn->exec(); // commit (atomic trx)
-
- if ( in_array( false, $res, true ) ) {
+ static $script =
+<<<LUA
+ -- Unmark the job as claimed
+ redis.call('zRem',KEYS[1],ARGV[1])
+ redis.call('hDel',KEYS[2],ARGV[1])
+ -- Delete the job data itself
+ return redis.call('hDel',KEYS[3],ARGV[1])
+LUA;
+ $res = $this->redisEval( $conn, $script,
+ array(
+ $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+ $this->getQueueKey( 'h-data' ), # KEYS[3]
+ $item['uuid'] # ARGV[1]
+ ),
+ 3 # number of first argument(s) that are keys
+ );
+
+ if ( !$res ) {
wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
return false;
}
*/
public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
try {
- $fields = $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
- if ( !is_array( $fields ) ) { // wtf?
- $conn->delete( $this->prefixWithQueueKey( 'data', $uid ) );
- throw new MWException( "Could not find job with UID '$uid'." );
+ $item = unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+ if ( !is_array( $item ) ) { // this shouldn't happen
+ throw new MWException( "Could not find job with ID '$uid'." );
}
- $title = Title::makeTitle( $fields['namespace'], $fields['title'] );
- $job = Job::factory( $fields['type'], $title, $fields['params'] );
- $job->metadata['sourceFields'] = $fields;
+ $title = Title::makeTitle( $item['namespace'], $item['title'] );
+ $job = Job::factory( $item['type'], $title, $item['params'] );
+ $job->metadata['sourceFields'] = $item;
return $job;
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
$count = 0;
// For each job item that can be retried, we need to add it back to the
// main queue and remove it from the list of currenty claimed job items.
+ // For those that cannot, they are marked as dead and kept around for
+ // investigation and manual job restoration but are eventually deleted.
$conn = $this->getConnection();
try {
- // Avoid duplicate insertions of items to be re-enqueued
- $conn->multi( Redis::MULTI );
- $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
- $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
- if ( $conn->exec() !== array( true, true ) ) { // lock
- return $count; // already in progress
- }
-
$now = time();
- $claimCutoff = $now - $this->claimTTL;
- $pruneCutoff = $now - self::MAX_AGE_PRUNE;
-
- // Get the list of all claimed jobs
- $claimedUids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
- // Get a map of (uid => claim metadata) for all claimed jobs
- $metadata = $conn->mGet( $this->prefixValuesWithQueueKey( 'h-meta', $claimedUids ) );
-
- $uidsPush = array(); // items IDs to move to the "unclaimed" queue
- $uidsRemove = array(); // item IDs to remove from "claimed" queue
- foreach ( $claimedUids as $i => $uid ) { // all claimed items
- $info = $metadata[$i] ? $metadata[$i] : array();
- if ( isset( $info['ctime'] ) || isset( $info['rctime'] ) ) {
- // Prefer "ctime" (set by pop()) over "rctime" (set by this function)
- $ctime = isset( $info['ctime'] ) ? $info['ctime'] : $info['rctime'];
- // Claimed job claimed for too long?
- if ( $ctime < $claimCutoff ) {
- // Get the number of failed attempts
- $attempts = isset( $info['attempts'] ) ? $info['attempts'] : 0;
- if ( $attempts < $this->maxTries ) {
- $uidsPush[] = $uid; // retry it
- } elseif ( $ctime < $pruneCutoff ) {
- $uidsRemove[] = $uid; // just remove it
- }
- }
- } else {
- // If pop() failed to set the claim timestamp, set it to the current time.
- // Since that function sets this non-atomically *after* moving the job to
- // the "claimed" queue, it may be the case that it just didn't set it yet.
- $conn->hSet( $this->prefixWithQueueKey( 'h-meta', $uid ), 'rctime', $now );
- }
- }
-
- $conn->multi( Redis::MULTI ); // begin (atomic trx)
- if ( count( $uidsPush ) ) { // move from "l-claimed" to "l-unclaimed"
- call_user_func_array(
- array( $conn, 'lPush' ),
- array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uidsPush )
- );
- foreach ( $uidsPush as $uid ) {
- $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
- $conn->hDel( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', 'rctime' );
- $conn->hIncrBy( $this->prefixWithQueueKey( 'h-meta', $uid ), 'attempts', 1 );
- }
- }
- foreach ( $uidsRemove as $uid ) { // remove from "l-claimed"
- $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
- $conn->delete( // delete job data and metadata
- $this->prefixWithQueueKey( 'h-meta', $uid ),
- $this->prefixWithQueueKey( 'data', $uid ) );
- }
- $res = $conn->exec(); // commit (atomic trx)
-
- if ( in_array( false, $res, true ) ) {
- wfDebugLog( 'JobQueueRedis', "Could not recycle {$this->type} job(s)." );
- } else {
- $count += ( count( $uidsPush ) + count( $uidsRemove ) );
- wfIncrStats( 'job-recycle', count( $uidsPush ) );
- }
-
- $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
- } catch ( RedisException $e ) {
- $this->throwRedisException( $this->server, $conn, $e );
- }
-
- return $count;
- }
-
- /**
- * Destroy any jobs that have been claimed
- *
- * @return integer Number of jobs deleted
- * @throws MWException
- */
- protected function cleanupClaimedJobs() {
- $count = 0;
- // Make sure the message for claimed jobs was deleted
- // and remove the claimed job IDs from the "claimed" list.
- $conn = $this->getConnection();
- try {
- // Avoid races and duplicate effort
- $conn->multi( Redis::MULTI );
- $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
- $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
- if ( $conn->exec() !== array( true, true ) ) { // lock
- return $count; // already in progress
- }
- // Get the list of all claimed jobs
- $uids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
- if ( count( $uids ) ) {
- // Delete the message keys and delist the corresponding ids.
- // Since the only other changes to "l-claimed" are left pushes, we can just strip
- // off the elements read here using a right trim based on the number of ids read.
- $conn->multi( Redis::MULTI ); // begin (atomic trx)
- $conn->lTrim( $this->getQueueKey( 'l-claimed' ), 0, -count( $uids ) - 1 );
- $conn->delete( array_merge(
- $this->prefixValuesWithQueueKey( 'h-meta', $uids ),
- $this->prefixValuesWithQueueKey( 'data', $uids )
- ) );
- $res = $conn->exec(); // commit (atomic trx)
-
- if ( in_array( false, $res, true ) ) {
- wfDebugLog( 'JobQueueRedis', "Could not purge {$this->type} job(s)." );
- } else {
- $count += count( $uids );
- }
+ static $script =
+<<<LUA
+ local released,abandoned,pruned = 0,0,0
+ -- Get all non-dead jobs that have an expired claim on them.
+ -- The score for each item is the last claim timestamp (UNIX).
+ local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1],'WITHSCORES')
+ for id,timestamp in ipairs(staleClaims) do
+ local attempts = redis.call('hGet',KEYS[2],id)
+ if attempts < ARGV[3] then
+ -- Claim expired and retries left: re-enqueue the job
+ redis.call('lPush',KEYS[3],id)
+ redis.call('hIncrBy',KEYS[2],id,1)
+ released = released + 1
+ else
+ -- Claim expired and no retries left: mark the job as dead
+ redis.call('zAdd',KEYS[5],timestamp,id)
+ abandoned = abandoned + 1
+ end
+ redis.call('zRem',KEYS[1],id)
+ end
+ -- Get all of the dead jobs that have been marked as dead for too long.
+ -- The score for each item is the last claim timestamp (UNIX).
+ local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2],'WITHSCORES')
+ for id,timestamp in ipairs(deadClaims) do
+ -- Stale and out of retries: remove any traces of the job
+ redis.call('zRem',KEYS[5],id)
+ redis.call('hDel',KEYS[2],id)
+ redis.call('hDel',KEYS[4],id)
+ pruned = pruned + 1
+ end
+ return {released,abandoned,pruned}
+LUA;
+ $res = $this->redisEval( $conn, $script,
+ array(
+ $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
+ $this->getQueueKey( 'h-data' ), # KEYS[4]
+ $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
+ $now - $this->claimTTL, # ARGV[1]
+ $now - self::MAX_AGE_PRUNE, # ARGV[2]
+ $this->maxTries # ARGV[3]
+ ),
+ 5 # number of first argument(s) that are keys
+ );
+ if ( $res ) {
+ list( $released, $abandoned, $pruned ) = $res;
+ $count += $released + $pruned;
+ wfIncrStats( 'job-recycle', count( $released ) );
}
- $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
}
}
+ /**
+ * @param RedisConnRef $conn
+ * @param string $script
+ * @param array $params
+ * @param integer $numKeys
+ * @return mixed
+ */
+ protected function redisEval( RedisConnRef $conn, $script, array $params, $numKeys ) {
+ $res = $conn->evalSha( sha1( $script ), $params, $numKeys );
+ if ( $res === false && $conn->getLastError() != '' ) { // not in script cache?
+ wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
+ $res = $conn->eval( $script, $params, $numKeys );
+ }
+ return $res;
+ }
+
/**
* @param $job Job
* @return array
'namespace' => $job->getTitle()->getNamespace(),
'title' => $job->getTitle()->getDBkey(),
'params' => $job->getParams(),
- // Additional metadata
- 'uid' => $job->ignoreDuplicates()
+ // Additional job metadata
+ 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
+ 'sha1' => $job->ignoreDuplicates()
? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
- : wfRandomString( 32 ),
+ : '',
'timestamp' => time() // UNIX timestamp
);
}
return false;
}
- /**
- * @param string $uid Job UID
- * @return bool Whether $uid is a SHA-1 hash based identifier for de-duplication
- */
- protected function isHashUid( $uid ) {
- return strlen( $uid ) == 31;
- }
-
/**
* Get a connection to the server that handles all sub-queues for this queue
*
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
}
- /**
- * @param $prop string
- * @param $string string
- * @return string
- */
- private function prefixWithQueueKey( $prop, $string ) {
- return $this->getQueueKey( $prop ) . ':' . $string;
- }
-
- /**
- * @param $prop string
- * @param $items array
- * @return Array
- */
- private function prefixValuesWithQueueKey( $prop, array $items ) {
- $res = array();
- foreach ( $items as $item ) {
- $res[] = $this->prefixWithQueueKey( $prop, $item );
- }
- return $res;
- }
-
- /**
- * @param $prop string
- * @param $items array
- * @return Array
- */
- private function prefixKeysWithQueueKey( $prop, array $items ) {
- $res = array();
- foreach ( $items as $key => $item ) {
- $res[$this->prefixWithQueueKey( $prop, $key )] = $item;
- }
- return $res;
- }
-
/**
* @param $key string
* @return void
}
$baseConfig['type'] = 'null';
$baseConfig['wiki'] = wfWikiID();
- $this->queueRand = JobQueue::factory(
- array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig );
- $this->queueRandTTL = JobQueue::factory(
- array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig );
- $this->queueFifo = JobQueue::factory(
- array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig );
- $this->queueFifoTTL = JobQueue::factory(
- array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig );
- if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables
- foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
- $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
- }
+ $variants = array(
+ 'queueRand' => array( 'order' => 'random', 'claimTTL' => 0 ),
+ 'queueRandTTL' => array( 'order' => 'random', 'claimTTL' => 10 ),
+ 'queueTimestamp' => array( 'order' => 'timestamp', 'claimTTL' => 0 ),
+ 'queueTimestampTTL' => array( 'order' => 'timestamp', 'claimTTL' => 10 ),
+ 'queueFifo' => array( 'order' => 'fifo', 'claimTTL' => 0 ),
+ 'queueFifoTTL' => array( 'order' => 'fifo', 'claimTTL' => 10 ),
+ );
+ foreach ( $variants as $q => $settings ) {
+ try {
+ $this->$q = JobQueue::factory( $settings + $baseConfig );
+ if ( ! ( $this->$q instanceof JobQueueDB ) ) {
+ $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
+ }
+ } catch ( MWException $e ) {}; // unsupported? (@TODO: what if it was another error?)
}
}
protected function tearDown() {
global $wgMemc;
parent::tearDown();
- foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
- do {
- $job = $this->$q->pop();
- if ( $job ) {
- $this->$q->ack( $job );
- }
- } while ( $job );
+ foreach ( array(
+ 'queueRand', 'queueRandTTL', 'queueTimestamp', 'queueTimestampTTL',
+ 'queueFifo', 'queueFifoTTL'
+ ) as $q ) {
+ if ( $this->$q ) {
+ do {
+ $job = $this->$q->pop();
+ if ( $job ) {
+ $this->$q->ack( $job );
+ }
+ } while ( $job );
+ }
+ $this->$q = null;
}
- $this->queueRand = null;
- $this->queueRandTTL = null;
- $this->queueFifo = null;
- $this->queueFifoTTL = null;
$wgMemc = $this->old['wgMemc'];
}
/**
* @dataProvider provider_queueLists
*/
- function testProperties( $queue, $order, $recycles, $desc ) {
+ function testProperties( $queue, $recycles, $desc ) {
$queue = $this->$queue;
+ if ( !$queue ) {
+ $this->markTestSkipped( $desc );
+ }
$this->assertEquals( wfWikiID(), $queue->getWiki(), "Proper wiki ID ($desc)" );
$this->assertEquals( 'null', $queue->getType(), "Proper job type ($desc)" );
/**
* @dataProvider provider_queueLists
*/
- function testBasicOperations( $queue, $order, $recycles, $desc ) {
+ function testBasicOperations( $queue, $recycles, $desc ) {
$queue = $this->$queue;
+ if ( !$queue ) {
+ $this->markTestSkipped( $desc );
+ }
+
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
$queue->flushCaches();
/**
* @dataProvider provider_queueLists
*/
- function testBasicDeduplication( $queue, $order, $recycles, $desc ) {
+ function testBasicDeduplication( $queue, $recycles, $desc ) {
$queue = $this->$queue;
+ if ( !$queue ) {
+ $this->markTestSkipped( $desc );
+ }
+
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
/**
* @dataProvider provider_queueLists
*/
- function testRootDeduplication( $queue, $order, $recycles, $desc ) {
+ function testRootDeduplication( $queue, $recycles, $desc ) {
$queue = $this->$queue;
+ if ( !$queue ) {
+ $this->markTestSkipped( $desc );
+ }
+
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
*/
function testJobOrder( $queue, $recycles, $desc ) {
$queue = $this->$queue;
+ if ( !$queue ) {
+ $this->markTestSkipped( $desc );
+ }
+
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
function provider_queueLists() {
return array(
- array( 'queueRand', 'rand', false, 'Random queue without ack()' ),
- array( 'queueRandTTL', 'rand', true, 'Random queue with ack()' ),
- array( 'queueFifo', 'fifo', false, 'Ordered queue without ack()' ),
- array( 'queueFifoTTL', 'fifo', true, 'Ordered queue with ack()' )
+ array( 'queueRand', false, 'Random queue without ack()' ),
+ array( 'queueRandTTL', true, 'Random queue with ack()' ),
+ array( 'queueTimestamp', false, 'Time ordered queue without ack()' ),
+ array( 'queueTimestampTTL', true, 'Time ordered queue with ack()' ),
+ array( 'queueFifo', false, 'FIFO ordered queue without ack()' ),
+ array( 'queueFifoTTL', true, 'FIFO ordered queue with ack()' )
);
}