[JobQueue] Optimized redis queue to use Lua scripts.
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 13 Mar 2013 21:26:28 +0000 (14:26 -0700)
committerGerrit Code Review <gerrit@wikimedia.org>
Wed, 20 Mar 2013 01:04:53 +0000 (01:04 +0000)
* Cleaned up some data structures into hashes, which get better
  compression and play well with the KEYS parameter in Lua scripts.
  The claim list is now a sorted set with O(logN) removal in ack()
  and O(log(N)+M) searching in recycleAndDeleteStaleJobs().
* Made the class itself control object serialization, so that lua
  scripts have an easy time. Only the job data itself needs to be
  serialized, where as other things just get bloated.
* Used Lua scripts to get push(), pop() and ack() down to 1 RTT.
* Likewise rewrote recycleAndDeleteStaleJobs() to use a script.
* Fixed bug where claimed duplicate jobs removed the data on ack(),
  which meant that claimed duplicated jobs could no-op newer ones.
  De-duplication should only apply to unclaimed jobs like for the
  JobQueueDB class, so that unfinished jobs don't no-op new ones.
* Removed locking in recycleAndDeleteStaleJobs(), which would not do
  much since the exclusive set request would serialize on the lua
  script anyway. The lua script will finish quickly the next times
  if done more than once in a row due to sorted set usage.
  Also made recycleAndDeleteStaleJobs() run randomly to reduce the
  chance of a single calling tying up the server.
* Removed useless hDel() call in getJobFromUidInternal().
* Changed unit tests to handle the different supported orders better.
  Added tests for the 'timestamp' ordering.

Change-Id: Ib2d7aff18753195248ab856afd4a46e18b301db9

includes/job/JobQueueRedis.php
tests/phpunit/includes/jobqueue/JobQueueTest.php

index 3db8260..41855a7 100644 (file)
 /**
  * Class to handle job queues stored in Redis
  *
+ * This is faster, less resource intensive, queue that JobQueueDB.
+ * All data for a queue using this class is placed into one redis server.
+ *
+ * There are seven main redis keys used to track jobs:
+ *   - l-unclaimed  : A list of job IDs used for push/pop
+ *   - z-claimed    : A sorted set of (job ID, UNIX timestamp as score) used for job retries
+ *   - z-abandoned  : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
+ *   - h-idBySha1   : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
+ *   - h-sha1Byid   : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
+ *   - h-attempts   : A hash of (job ID => attempt count) used for job claiming/retries
+ *   - h-data       : A hash of (job ID => serialized blobs) for job storage
+ * Any given job ID can be in only one of l-unclaimed, z-claimed, and z-abandoned.
+ * If an ID appears in any of those lists, it should have a h-data entry for its ID.
+ * If a job has a non-empty SHA1 de-duplication value and its ID is in l-unclaimed,
+ * then there should be no other such jobs. Every h-idBySha1 entry has an h-sha1Byid
+ * entry and every h-sha1Byid must refer to an ID that is l-unclaimed. If a job has its
+ * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
+ *
+ * Additionally, "rootjob:* keys to track "root jobs" used for additional de-duplication.
+ * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
+ * All the keys are prefixed with the relevant wiki ID information.
+ *
+ * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations.
+ * Additionally, it should be noted that redis has different persistence modes, such
+ * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be
+ * made on the servers based on what queues are using it and what tolerance they have.
+ *
  * @ingroup JobQueue
  * @since 1.21
  */
@@ -41,6 +68,7 @@ class JobQueueRedis extends JobQueue {
        /**
         * @params include:
         *   - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+        *                   Note that the serializer option is ignored "none" is always used.
         *   - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
         *                   If a hostname is specified but no port, the standard port number
         *                   6379 will be used. Required.
@@ -48,6 +76,7 @@ class JobQueueRedis extends JobQueue {
         */
        public function __construct( array $params ) {
                parent::__construct( $params );
+               $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
                $this->server = $params['redisServer'];
                $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
        }
@@ -66,12 +95,7 @@ class JobQueueRedis extends JobQueue {
         * @throws MWException
         */
        protected function doIsEmpty() {
-               $conn = $this->getConnection();
-               try {
-                       return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
-               } catch ( RedisException $e ) {
-                       $this->throwRedisException( $this->server, $conn, $e );
-               }
+               return $this->doGetSize() == 0;
        }
 
        /**
@@ -99,7 +123,10 @@ class JobQueueRedis extends JobQueue {
                }
                $conn = $this->getConnection();
                try {
-                       return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
+                       $conn->multi( Redis::PIPELINE );
+                       $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
+                       $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
+                       return array_sum( $conn->exec() );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -113,65 +140,50 @@ class JobQueueRedis extends JobQueue {
         * @throws MWException
         */
        protected function doBatchPush( array $jobs, $flags ) {
-               if ( !count( $jobs ) ) {
-                       return true;
-               }
-
-               // Convert the jobs into a list of field maps
-               $items = array(); // (uid => job fields map)
+               // Convert the jobs into field maps (de-duplicated against each other)
+               $items = array(); // (job ID => job fields map)
                foreach ( $jobs as $job ) {
                        $item = $this->getNewJobFields( $job );
-                       $items[$item['uid']] = $item;
+                       if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
+                               $items[$item['sha1']] = $item;
+                       } else {
+                               $items[$item['uuid']] = $item;
+                       }
                }
-
-               $dedupUids = array(); // list of uids to check for duplicates
+               // Convert the field maps into serialized blobs
+               $tuples = array();
                foreach ( $items as $item ) {
-                       if ( $this->isHashUid( $item['uid'] ) ) { // hash identifier => de-duplicate
-                               $dedupUids[] = $item['uid'];
-                       }
+                       $tuples[] = array( $item['uuid'], $item['sha1'], serialize( $item ) );
+               }
+
+               if ( !count( $tuples ) ) {
+                       return true; // nothing to do
                }
 
                $conn = $this->getConnection();
                try {
-                       // Find which of these jobs are duplicates of unclaimed jobs in the queue...
-                       if ( count( $dedupUids ) ) {
-                               $conn->multi( Redis::PIPELINE );
-                               foreach ( $dedupUids as $uid ) { // check if job data exists
-                                       $conn->exists( $this->prefixWithQueueKey( 'data', $uid ) );
-                               }
-                               if ( $this->claimTTL > 0 ) { // check which jobs were claimed
-                                       foreach ( $dedupUids as $uid ) {
-                                               $conn->hExists( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime' );
-                                       }
-                                       list( $exists, $claimed ) = array_chunk( $conn->exec(), count( $dedupUids ) );
+                       // Actually push the non-duplicate jobs into the queue...
+                       if ( $flags & self::QoS_Atomic ) {
+                               $batches = array( $tuples ); // all or nothing
+                       } else {
+                               $batches = array_chunk( $tuples, 500 ); // avoid tying up the server
+                       }
+                       $failed = 0;
+                       $pushed = 0;
+                       foreach ( $batches as $tupleBatch ) {
+                               $added = $this->pushBlobs( $conn, $tupleBatch );
+                               if ( is_int( $added ) ) {
+                                       $pushed += $added;
                                } else {
-                                       $exists = $conn->exec();
-                                       $claimed = array(); // no claim system
-                               }
-                               // Remove the duplicate jobs to cut down on pushing duplicate uids...
-                               foreach ( $dedupUids as $k => $uid ) {
-                                       if ( $exists[$k] && empty( $claimed[$k] ) ) {
-                                               unset( $items[$uid] );
-                                       }
+                                       $failed += count( $tupleBatch );
                                }
                        }
-                       // Actually push the non-duplicate jobs into the queue...
-                       if ( count( $items ) ) {
-                               $uids = array_keys( $items );
-                               $conn->multi( Redis::MULTI ); // begin (atomic trx)
-                               $conn->mSet( $this->prefixKeysWithQueueKey( 'data', $items ) );
-                               call_user_func_array(
-                                       array( $conn, 'lPush' ),
-                                       array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uids )
-                               );
-                               $res = $conn->exec(); // commit (atomic trx)
-                               if ( in_array( false, $res, true ) ) {
-                                       wfDebugLog( 'JobQueueRedis', "Could not insert {$this->type} job(s)." );
-                                       return false;
-                               }
+                       if ( $failed > 0 ) {
+                               wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
+                               return false;
                        }
-                       wfIncrStats( 'job-insert', count( $items ) );
-                       wfIncrStats( 'job-insert-duplicate', count( $jobs ) - count( $items ) );
+                       wfIncrStats( 'job-insert', count( $tuples ) );
+                       wfIncrStats( 'job-insert-duplicate', count( $tuples ) - $failed - $pushed );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -179,6 +191,51 @@ class JobQueueRedis extends JobQueue {
                return true;
        }
 
+       /**
+        * @param RedisConnRef $conn
+        * @param array $tuples List of tuples of (job ID, job SHA1 or '', serialized blob)
+        * @return integer Number of jobs inserted (duplicates are ignored)
+        * @throws RedisException
+        */
+       protected function pushBlobs( RedisConnRef $conn, array $tuples ) {
+               $args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] )
+               foreach ( $tuples as $tuple ) {
+                       $args[] = $tuple[0]; // id
+                       $args[] = $tuple[1]; // sha1
+                       $args[] = $tuple[2]; // blob
+               }
+               static $script =
+<<<LUA
+               if #ARGV % 3 ~= 0 then return redis.error_reply('Unmatched arguments') end
+               local pushed = 0
+               for i = 1,#ARGV,3 do
+                       local id,sha1,blob = ARGV[i],ARGV[i+1],ARGV[i+2]
+                       if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then
+                               redis.call('lPush',KEYS[1],id)
+                               if sha1 ~= '' then
+                                       redis.call('hSet',KEYS[2],id,sha1)
+                                       redis.call('hSet',KEYS[3],sha1,id)
+                               end
+                               redis.call('hSet',KEYS[4],id,blob)
+                               pushed = pushed + 1
+                       end
+               end
+               return pushed
+LUA;
+               return $this->redisEval( $conn, $script,
+                       array_merge(
+                               array(
+                                       $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+                                       $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+                                       $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+                                       $this->getQueueKey( 'h-data' ), # KEYS[4]
+                               ),
+                               $args
+                       ),
+                       4 # number of first argument(s) that are keys
+               );
+       }
+
        /**
         * @see JobQueue::doPop()
         * @return Job|bool
@@ -187,42 +244,27 @@ class JobQueueRedis extends JobQueue {
        protected function doPop() {
                $job = false;
 
-               if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
-                       $this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list
-               }
-
                $conn = $this->getConnection();
                try {
                        do {
-                               // Atomically pop an item off the queue and onto the "claimed" list
-                               $uid = $conn->rpoplpush(
-                                       $this->getQueueKey( 'l-unclaimed' ),
-                                       $this->getQueueKey( 'l-claimed' )
-                               );
-                               if ( $uid === false ) {
+                               if ( $this->claimTTL > 0 ) {
+                                       // Keep the claimed job list down for high-traffic queues
+                                       if ( mt_rand( 0, 99 ) == 0 ) {
+                                               $this->recycleAndDeleteStaleJobs();
+                                       }
+                                       $blob = $this->popAndAcquireBlob( $conn );
+                               } else {
+                                       $blob = $this->popAndDeleteBlob( $conn );
+                               }
+                               if ( $blob === false ) {
                                        break; // no jobs; nothing to do
                                }
 
                                wfIncrStats( 'job-pop' );
-                               $conn->multi( Redis::PIPELINE );
-                               $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
-                               if ( $this->claimTTL > 0 ) {
-                                       // Set the claim timestamp metadata. If this step fails, then
-                                       // the timestamp will be assumed to be the current timestamp by
-                                       // recycleAndDeleteStaleJobs() as of the next time that it runs.
-                                       // If two runners claim duplicate jobs, one will abort here.
-                                       $conn->hSetNx( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', time() );
-                               } else {
-                                       // If this fails, the message key will be deleted in cleanupClaimedJobs().
-                                       // If two runners claim duplicate jobs, one of them will abort here.
-                                       $conn->delete(
-                                               $this->prefixWithQueueKey( 'h-meta', $uid ),
-                                               $this->prefixWithQueueKey( 'data', $uid ) );
-                               }
-                               list( $item, $ok ) = $conn->exec();
-                               if ( $item === false || ( $this->claimTTL && !$ok ) ) {
-                                       wfDebug( "Could not find or delete job $uid; probably was a duplicate." );
-                                       continue; // job was probably a duplicate
+                               $item = unserialize( $blob );
+                               if ( $item === false ) {
+                                       wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
+                                       continue;
                                }
 
                                // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
@@ -243,6 +285,72 @@ class JobQueueRedis extends JobQueue {
                return $job;
        }
 
+       /**
+        * @param RedisConnRef $conn
+        * @return array serialized string or false
+        * @throws RedisException
+        */
+       protected function popAndDeleteBlob( RedisConnRef $conn ) {
+               static $script =
+<<<LUA
+               -- Pop an item off the queue
+               local id = redis.call('rpop',KEYS[1])
+               if not id then return false end
+               -- Get the job data and remove it
+               local item = redis.call('hGet',KEYS[4],id)
+               redis.call('hDel',KEYS[4],id)
+               -- Allow new duplicates of this job
+               local sha1 = redis.call('hGet',KEYS[2],id)
+               if sha1 then redis.call('hDel',KEYS[3],sha1) end
+               redis.call('hDel',KEYS[2],id)
+               -- Return the job data
+               return item
+LUA;
+               return $this->redisEval( $conn, $script,
+                       array(
+                               $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+                               $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+                               $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+                               $this->getQueueKey( 'h-data' ), # KEYS[4]
+                       ),
+                       4 # number of first argument(s) that are keys
+               );
+       }
+
+       /**
+        * @param RedisConnRef $conn
+        * @return array serialized string or false
+        * @throws RedisException
+        */
+       protected function popAndAcquireBlob( RedisConnRef $conn ) {
+               static $script =
+<<<LUA
+               -- Pop an item off the queue
+               local id = redis.call('rPop',KEYS[1])
+               if not id then return false end
+               -- Allow new duplicates of this job
+               local sha1 = redis.call('hGet',KEYS[2],id)
+               if sha1 then redis.call('hDel',KEYS[3],sha1) end
+               redis.call('hDel',KEYS[2],id)
+               -- Mark the jobs as claimed and return it
+               redis.call('zAdd',KEYS[4],ARGV[1],id)
+               redis.call('hIncrBy',KEYS[5],id,1)
+               return redis.call('hGet',KEYS[6],id)
+LUA;
+               return $this->redisEval( $conn, $script,
+                       array(
+                               $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+                               $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+                               $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+                               $this->getQueueKey( 'z-claimed' ), # KEYS[4]
+                               $this->getQueueKey( 'h-attempts' ), # KEYS[5]
+                               $this->getQueueKey( 'h-data' ), # KEYS[6]
+                               time(), # ARGV[1] (injected to be replication-safe)
+                       ),
+                       6 # number of first argument(s) that are keys
+               );
+       }
+
        /**
         * @see JobQueue::doAck()
         * @param Job $job
@@ -257,20 +365,25 @@ class JobQueueRedis extends JobQueue {
                                // the job was transformed into a DuplicateJob or anything of the sort.
                                $item = $job->metadata['sourceFields'];
 
-                               $conn->multi( Redis::MULTI ); // begin (atomic trx)
-                               // Remove the first instance of this job scanning right-to-left.
-                               // This is O(N) in the worst case, but is likely to be much faster since
-                               // jobs are pushed to the left and we are starting from the right, where
-                               // the longest running jobs are likely to be. These should be the first
-                               // jobs to be acknowledged assuming that job run times are roughly equal.
-                               $conn->lRem( $this->getQueueKey( 'l-claimed' ), $item['uid'], -1 );
-                               // Delete the job data and its claim metadata
-                               $conn->delete(
-                                       $this->prefixWithQueueKey( 'h-meta', $item['uid'] ),
-                                       $this->prefixWithQueueKey( 'data', $item['uid'] ) );
-                               $res = $conn->exec(); // commit (atomic trx)
-
-                               if ( in_array( false, $res, true ) ) {
+                               static $script =
+<<<LUA
+                               -- Unmark the job as claimed
+                               redis.call('zRem',KEYS[1],ARGV[1])
+                               redis.call('hDel',KEYS[2],ARGV[1])
+                               -- Delete the job data itself
+                               return redis.call('hDel',KEYS[3],ARGV[1])
+LUA;
+                               $res = $this->redisEval( $conn, $script,
+                                       array(
+                                               $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+                                               $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+                                               $this->getQueueKey( 'h-data' ), # KEYS[3]
+                                               $item['uuid'] # ARGV[1]
+                                       ),
+                                       3 # number of first argument(s) that are keys
+                               );
+
+                               if ( !$res ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
                                        return false;
                                }
@@ -369,14 +482,13 @@ class JobQueueRedis extends JobQueue {
         */
        public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
                try {
-                       $fields = $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
-                       if ( !is_array( $fields ) ) { // wtf?
-                               $conn->delete( $this->prefixWithQueueKey( 'data', $uid ) );
-                               throw new MWException( "Could not find job with UID '$uid'." );
+                       $item = unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+                       if ( !is_array( $item ) ) { // this shouldn't happen
+                               throw new MWException( "Could not find job with ID '$uid'." );
                        }
-                       $title = Title::makeTitle( $fields['namespace'], $fields['title'] );
-                       $job = Job::factory( $fields['type'], $title, $fields['params'] );
-                       $job->metadata['sourceFields'] = $fields;
+                       $title = Title::makeTitle( $item['namespace'], $item['title'] );
+                       $job = Job::factory( $item['type'], $title, $item['params'] );
+                       $job->metadata['sourceFields'] = $item;
                        return $job;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -396,125 +508,61 @@ class JobQueueRedis extends JobQueue {
                $count = 0;
                // For each job item that can be retried, we need to add it back to the
                // main queue and remove it from the list of currenty claimed job items.
+               // For those that cannot, they are marked as dead and kept around for
+               // investigation and manual job restoration but are eventually deleted.
                $conn = $this->getConnection();
                try {
-                       // Avoid duplicate insertions of items to be re-enqueued
-                       $conn->multi( Redis::MULTI );
-                       $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
-                       $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
-                       if ( $conn->exec() !== array( true, true ) ) { // lock
-                               return $count; // already in progress
-                       }
-
                        $now = time();
-                       $claimCutoff = $now - $this->claimTTL;
-                       $pruneCutoff = $now - self::MAX_AGE_PRUNE;
-
-                       // Get the list of all claimed jobs
-                       $claimedUids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
-                       // Get a map of (uid => claim metadata) for all claimed jobs
-                       $metadata = $conn->mGet( $this->prefixValuesWithQueueKey( 'h-meta', $claimedUids ) );
-
-                       $uidsPush = array(); // items IDs to move to the "unclaimed" queue
-                       $uidsRemove = array(); // item IDs to remove from "claimed" queue
-                       foreach ( $claimedUids as $i => $uid ) { // all claimed items
-                               $info = $metadata[$i] ? $metadata[$i] : array();
-                               if ( isset( $info['ctime'] ) || isset( $info['rctime'] ) ) {
-                                       // Prefer "ctime" (set by pop()) over "rctime" (set by this function)
-                                       $ctime = isset( $info['ctime'] ) ? $info['ctime'] : $info['rctime'];
-                                       // Claimed job claimed for too long?
-                                       if ( $ctime < $claimCutoff ) {
-                                               // Get the number of failed attempts
-                                               $attempts = isset( $info['attempts'] ) ? $info['attempts'] : 0;
-                                               if ( $attempts < $this->maxTries ) {
-                                                       $uidsPush[] = $uid; // retry it
-                                               } elseif ( $ctime < $pruneCutoff ) {
-                                                       $uidsRemove[] = $uid; // just remove it
-                                               }
-                                       }
-                               } else {
-                                       // If pop() failed to set the claim timestamp, set it to the current time.
-                                       // Since that function sets this non-atomically *after* moving the job to
-                                       // the "claimed" queue, it may be the case that it just didn't set it yet.
-                                       $conn->hSet( $this->prefixWithQueueKey( 'h-meta', $uid ), 'rctime', $now );
-                               }
-                       }
-
-                       $conn->multi( Redis::MULTI ); // begin (atomic trx)
-                       if ( count( $uidsPush ) ) { // move from "l-claimed" to "l-unclaimed"
-                               call_user_func_array(
-                                       array( $conn, 'lPush' ),
-                                       array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uidsPush )
-                               );
-                               foreach ( $uidsPush as $uid ) {
-                                       $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
-                                       $conn->hDel( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', 'rctime' );
-                                       $conn->hIncrBy( $this->prefixWithQueueKey( 'h-meta', $uid ), 'attempts', 1 );
-                               }
-                       }
-                       foreach ( $uidsRemove as $uid ) { // remove from "l-claimed"
-                               $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
-                               $conn->delete( // delete job data and metadata
-                                       $this->prefixWithQueueKey( 'h-meta', $uid ),
-                                       $this->prefixWithQueueKey( 'data', $uid ) );
-                       }
-                       $res = $conn->exec(); // commit (atomic trx)
-
-                       if ( in_array( false, $res, true ) ) {
-                               wfDebugLog( 'JobQueueRedis', "Could not recycle {$this->type} job(s)." );
-                       } else {
-                               $count += ( count( $uidsPush ) + count( $uidsRemove ) );
-                               wfIncrStats( 'job-recycle', count( $uidsPush ) );
-                       }
-
-                       $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
-               } catch ( RedisException $e ) {
-                       $this->throwRedisException( $this->server, $conn, $e );
-               }
-
-               return $count;
-       }
-
-       /**
-        * Destroy any jobs that have been claimed
-        *
-        * @return integer Number of jobs deleted
-        * @throws MWException
-        */
-       protected function cleanupClaimedJobs() {
-               $count = 0;
-               // Make sure the message for claimed jobs was deleted
-               // and remove the claimed job IDs from the "claimed" list.
-               $conn = $this->getConnection();
-               try {
-                       // Avoid races and duplicate effort
-                       $conn->multi( Redis::MULTI );
-                       $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
-                       $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
-                       if ( $conn->exec() !== array( true, true ) ) { // lock
-                               return $count; // already in progress
-                       }
-                       // Get the list of all claimed jobs
-                       $uids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
-                       if ( count( $uids ) ) {
-                               // Delete the message keys and delist the corresponding ids.
-                               // Since the only other changes to "l-claimed" are left pushes, we can just strip
-                               // off the elements read here using a right trim based on the number of ids read.
-                               $conn->multi( Redis::MULTI ); // begin (atomic trx)
-                               $conn->lTrim( $this->getQueueKey( 'l-claimed' ), 0, -count( $uids ) - 1 );
-                               $conn->delete( array_merge(
-                                       $this->prefixValuesWithQueueKey( 'h-meta', $uids ),
-                                       $this->prefixValuesWithQueueKey( 'data', $uids )
-                               ) );
-                               $res = $conn->exec(); // commit (atomic trx)
-
-                               if ( in_array( false, $res, true ) ) {
-                                       wfDebugLog( 'JobQueueRedis', "Could not purge {$this->type} job(s)." );
-                               } else {
-                                       $count += count( $uids );
-                               }
+                       static $script =
+<<<LUA
+                       local released,abandoned,pruned = 0,0,0
+                       -- Get all non-dead jobs that have an expired claim on them.
+                       -- The score for each item is the last claim timestamp (UNIX).
+                       local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1],'WITHSCORES')
+                       for id,timestamp in ipairs(staleClaims) do
+                               local attempts = redis.call('hGet',KEYS[2],id)
+                               if attempts < ARGV[3] then
+                                       -- Claim expired and retries left: re-enqueue the job
+                                       redis.call('lPush',KEYS[3],id)
+                                       redis.call('hIncrBy',KEYS[2],id,1)
+                                       released = released + 1
+                               else
+                                       -- Claim expired and no retries left: mark the job as dead
+                                       redis.call('zAdd',KEYS[5],timestamp,id)
+                                       abandoned = abandoned + 1
+                               end
+                               redis.call('zRem',KEYS[1],id)
+                       end
+                       -- Get all of the dead jobs that have been marked as dead for too long.
+                       -- The score for each item is the last claim timestamp (UNIX).
+                       local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2],'WITHSCORES')
+                       for id,timestamp in ipairs(deadClaims) do
+                               -- Stale and out of retries: remove any traces of the job
+                               redis.call('zRem',KEYS[5],id)
+                               redis.call('hDel',KEYS[2],id)
+                               redis.call('hDel',KEYS[4],id)
+                               pruned = pruned + 1
+                       end
+                       return {released,abandoned,pruned}
+LUA;
+                       $res = $this->redisEval( $conn, $script,
+                               array(
+                                       $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+                                       $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+                                       $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
+                                       $this->getQueueKey( 'h-data' ), # KEYS[4]
+                                       $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
+                                       $now - $this->claimTTL, # ARGV[1]
+                                       $now - self::MAX_AGE_PRUNE, # ARGV[2]
+                                       $this->maxTries # ARGV[3]
+                               ),
+                               5 # number of first argument(s) that are keys
+                       );
+                       if ( $res ) {
+                               list( $released, $abandoned, $pruned ) = $res;
+                               $count += $released + $pruned;
+                               wfIncrStats( 'job-recycle', count( $released ) );
                        }
-                       $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -538,6 +586,22 @@ class JobQueueRedis extends JobQueue {
                }
        }
 
+       /**
+        * @param RedisConnRef $conn
+        * @param string $script
+        * @param array $params
+        * @param integer $numKeys
+        * @return mixed
+        */
+       protected function redisEval( RedisConnRef $conn, $script, array $params, $numKeys ) {
+               $res = $conn->evalSha( sha1( $script ), $params, $numKeys );
+               if ( $res === false && $conn->getLastError() != '' ) { // not in script cache?
+                       wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
+                       $res = $conn->eval( $script, $params, $numKeys );
+               }
+               return $res;
+       }
+
        /**
         * @param $job Job
         * @return array
@@ -549,10 +613,11 @@ class JobQueueRedis extends JobQueue {
                        'namespace' => $job->getTitle()->getNamespace(),
                        'title'     => $job->getTitle()->getDBkey(),
                        'params'    => $job->getParams(),
-                       // Additional metadata
-                       'uid'       => $job->ignoreDuplicates()
+                       // Additional job metadata
+                       'uuid'      => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
+                       'sha1'      => $job->ignoreDuplicates()
                                ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
-                               : wfRandomString( 32 ),
+                               : '',
                        'timestamp' => time() // UNIX timestamp
                );
        }
@@ -571,14 +636,6 @@ class JobQueueRedis extends JobQueue {
                return false;
        }
 
-       /**
-        * @param string $uid Job UID
-        * @return bool Whether $uid is a SHA-1 hash based identifier for de-duplication
-        */
-       protected function isHashUid( $uid ) {
-               return strlen( $uid ) == 31;
-       }
-
        /**
         * Get a connection to the server that handles all sub-queues for this queue
         *
@@ -626,41 +683,6 @@ class JobQueueRedis extends JobQueue {
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
        }
 
-       /**
-        * @param $prop string
-        * @param $string string
-        * @return string
-        */
-       private function prefixWithQueueKey( $prop, $string ) {
-               return $this->getQueueKey( $prop ) . ':' . $string;
-       }
-
-       /**
-        * @param $prop string
-        * @param $items array
-        * @return Array
-        */
-       private function prefixValuesWithQueueKey( $prop, array $items ) {
-               $res = array();
-               foreach ( $items as $item ) {
-                       $res[] = $this->prefixWithQueueKey( $prop, $item );
-               }
-               return $res;
-       }
-
-       /**
-        * @param $prop string
-        * @param $items array
-        * @return Array
-        */
-       private function prefixKeysWithQueueKey( $prop, array $items ) {
-               $res = array();
-               foreach ( $items as $key => $item ) {
-                       $res[$this->prefixWithQueueKey( $prop, $key )] = $item;
-               }
-               return $res;
-       }
-
        /**
         * @param $key string
         * @return void
index ad4c8df..f3f9c11 100644 (file)
@@ -32,44 +32,52 @@ class JobQueueTest extends MediaWikiTestCase {
                }
                $baseConfig['type'] = 'null';
                $baseConfig['wiki'] = wfWikiID();
-               $this->queueRand = JobQueue::factory(
-                       array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig );
-               $this->queueRandTTL = JobQueue::factory(
-                       array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig );
-               $this->queueFifo = JobQueue::factory(
-                       array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig );
-               $this->queueFifoTTL = JobQueue::factory(
-                       array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig );
-               if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables
-                       foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
-                               $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
-                       }
+               $variants = array(
+                       'queueRand' => array( 'order' => 'random', 'claimTTL' => 0 ),
+                       'queueRandTTL' => array( 'order' => 'random', 'claimTTL' => 10 ),
+                       'queueTimestamp' => array( 'order' => 'timestamp', 'claimTTL' => 0 ),
+                       'queueTimestampTTL' => array( 'order' => 'timestamp', 'claimTTL' => 10 ),
+                       'queueFifo' => array( 'order' => 'fifo', 'claimTTL' => 0 ),
+                       'queueFifoTTL' => array( 'order' => 'fifo', 'claimTTL' => 10 ),
+               );
+               foreach ( $variants as $q => $settings ) {
+                       try {
+                               $this->$q = JobQueue::factory( $settings + $baseConfig );
+                               if ( ! ( $this->$q instanceof JobQueueDB ) ) {
+                                       $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
+                               }
+                       } catch ( MWException $e ) {}; // unsupported? (@TODO: what if it was another error?)
                }
        }
 
        protected function tearDown() {
                global $wgMemc;
                parent::tearDown();
-               foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
-                       do {
-                               $job = $this->$q->pop();
-                               if ( $job ) {
-                                       $this->$q->ack( $job );
-                               }
-                       } while ( $job );
+               foreach ( array(
+                       'queueRand', 'queueRandTTL', 'queueTimestamp', 'queueTimestampTTL',
+                       'queueFifo', 'queueFifoTTL'
+               ) as $q ) {
+                       if ( $this->$q ) {
+                               do {
+                                       $job = $this->$q->pop();
+                                       if ( $job ) {
+                                               $this->$q->ack( $job );
+                                       }
+                               } while ( $job );
+                       }
+                       $this->$q = null;
                }
-               $this->queueRand = null;
-               $this->queueRandTTL = null;
-               $this->queueFifo = null;
-               $this->queueFifoTTL = null;
                $wgMemc = $this->old['wgMemc'];
        }
 
        /**
         * @dataProvider provider_queueLists
         */
-       function testProperties( $queue, $order, $recycles, $desc ) {
+       function testProperties( $queue, $recycles, $desc ) {
                $queue = $this->$queue;
+               if ( !$queue ) {
+                       $this->markTestSkipped( $desc );
+               }
 
                $this->assertEquals( wfWikiID(), $queue->getWiki(), "Proper wiki ID ($desc)" );
                $this->assertEquals( 'null', $queue->getType(), "Proper job type ($desc)" );
@@ -78,8 +86,12 @@ class JobQueueTest extends MediaWikiTestCase {
        /**
         * @dataProvider provider_queueLists
         */
-       function testBasicOperations( $queue, $order, $recycles, $desc ) {
+       function testBasicOperations( $queue, $recycles, $desc ) {
                $queue = $this->$queue;
+               if ( !$queue ) {
+                       $this->markTestSkipped( $desc );
+               }
+
                $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
 
                $queue->flushCaches();
@@ -137,8 +149,12 @@ class JobQueueTest extends MediaWikiTestCase {
        /**
         * @dataProvider provider_queueLists
         */
-       function testBasicDeduplication( $queue, $order, $recycles, $desc ) {
+       function testBasicDeduplication( $queue, $recycles, $desc ) {
                $queue = $this->$queue;
+               if ( !$queue ) {
+                       $this->markTestSkipped( $desc );
+               }
+
 
                $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
 
@@ -186,8 +202,12 @@ class JobQueueTest extends MediaWikiTestCase {
        /**
         * @dataProvider provider_queueLists
         */
-       function testRootDeduplication( $queue, $order, $recycles, $desc ) {
+       function testRootDeduplication( $queue, $recycles, $desc ) {
                $queue = $this->$queue;
+               if ( !$queue ) {
+                       $this->markTestSkipped( $desc );
+               }
+
 
                $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
 
@@ -238,6 +258,10 @@ class JobQueueTest extends MediaWikiTestCase {
         */
        function testJobOrder( $queue, $recycles, $desc ) {
                $queue = $this->$queue;
+               if ( !$queue ) {
+                       $this->markTestSkipped( $desc );
+               }
+
 
                $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
 
@@ -266,10 +290,12 @@ class JobQueueTest extends MediaWikiTestCase {
 
        function provider_queueLists() {
                return array(
-                       array( 'queueRand', 'rand', false, 'Random queue without ack()' ),
-                       array( 'queueRandTTL', 'rand', true, 'Random queue with ack()' ),
-                       array( 'queueFifo', 'fifo', false, 'Ordered queue without ack()' ),
-                       array( 'queueFifoTTL', 'fifo', true, 'Ordered queue with ack()' )
+                       array( 'queueRand', false, 'Random queue without ack()' ),
+                       array( 'queueRandTTL', true, 'Random queue with ack()' ),
+                       array( 'queueTimestamp', false, 'Time ordered queue without ack()' ),
+                       array( 'queueTimestampTTL', true, 'Time ordered queue with ack()' ),
+                       array( 'queueFifo', false, 'FIFO ordered queue without ack()' ),
+                       array( 'queueFifoTTL', true, 'FIFO ordered queue with ack()' )
                );
        }