[JobQueue] Added support for delayed jobs with JobQueueRedis.
authorAaron Schulz <aschulz@wikimedia.org>
Tue, 12 Mar 2013 03:40:01 +0000 (20:40 -0700)
committerGerrit Code Review <gerrit@wikimedia.org>
Thu, 21 Mar 2013 07:24:40 +0000 (07:24 +0000)
* The queue can handle delaying jobs until a given timestamp is reached.
* Added Job::getReleaseTimestamp() to let jobs specifiy delay amounts.
* Added a "checkDelay" option and a supportsDelayedJobs() function to JobQueue.
  There are also getDelayedCount() and getAllDelayedJobs() functions.
* Simplified a bit of code in doBatchPush() and pushBlobs().
* Improved the logic in redisEval().

Change-Id: I40b3e3438e659f6844bdbdd5e9d3ccc6c4dc82b2

includes/job/Job.php
includes/job/JobQueue.php
includes/job/JobQueueRedis.php

index bcf582e..d8f55c3 100644 (file)
@@ -176,6 +176,16 @@ abstract class Job {
                return $this->params;
        }
 
+       /**
+        * @return integer|null UNIX timestamp to delay running this job until, otherwise null
+        * @since 1.22
+        */
+       public function getReleaseTimestamp() {
+               return isset( $this->params['jobReleaseTimestamp'] )
+                       ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
+                       : null;
+       }
+
        /**
         * @return bool Whether only one of each identical set of jobs should be run
         */
index 5ef52b5..9c152cd 100644 (file)
@@ -34,6 +34,7 @@ abstract class JobQueue {
        protected $order; // string; job priority for pop()
        protected $claimTTL; // integer; seconds
        protected $maxTries; // integer; maximum number of times to try a job
+       protected $checkDelay; // boolean; allow delayed jobs
 
        const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
 
@@ -55,28 +56,36 @@ abstract class JobQueue {
                if ( !in_array( $this->order, $this->supportedOrders() ) ) {
                        throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
                }
+               $this->checkDelay = !empty( $params['checkDelay'] );
+               if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
+                       throw new MWException( __CLASS__ . " does not support delayed jobs." );
+               }
        }
 
        /**
         * Get a job queue object of the specified type.
         * $params includes:
-        *   - class    : What job class to use (determines job type)
-        *   - wiki     : wiki ID of the wiki the jobs are for (defaults to current wiki)
-        *   - type     : The name of the job types this queue handles
-        *   - order    : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
-        *                If "fifo" is used, the queue will effectively be FIFO. Note that
-        *                job completion will not appear to be exactly FIFO if there are multiple
-        *                job runners since jobs can take different times to finish once popped.
-        *                If "timestamp" is used, the queue will at least be loosely ordered
-        *                by timestamp, allowing for some jobs to be popped off out of order.
-        *                If "random" is used, pop() will pick jobs in random order.
-        *                Note that it may only be weakly random (e.g. a lottery of the oldest X).
-        *                If "any" is choosen, the queue will use whatever order is the fastest.
-        *                This might be useful for improving concurrency for job acquisition.
-        *   - claimTTL : If supported, the queue will recycle jobs that have been popped
-        *                but not acknowledged as completed after this many seconds. Recycling
-        *                of jobs simple means re-inserting them into the queue. Jobs can be
-        *                attempted up to three times before being discarded.
+        *   - class      : What job class to use (determines job type)
+        *   - wiki       : wiki ID of the wiki the jobs are for (defaults to current wiki)
+        *   - type       : The name of the job types this queue handles
+        *   - order      : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+        *                  If "fifo" is used, the queue will effectively be FIFO. Note that job
+        *                  completion will not appear to be exactly FIFO if there are multiple
+        *                  job runners since jobs can take different times to finish once popped.
+        *                  If "timestamp" is used, the queue will at least be loosely ordered
+        *                  by timestamp, allowing for some jobs to be popped off out of order.
+        *                  If "random" is used, pop() will pick jobs in random order.
+        *                  Note that it may only be weakly random (e.g. a lottery of the oldest X).
+        *                  If "any" is choosen, the queue will use whatever order is the fastest.
+        *                  This might be useful for improving concurrency for job acquisition.
+        *   - claimTTL   : If supported, the queue will recycle jobs that have been popped
+        *                  but not acknowledged as completed after this many seconds. Recycling
+        *                  of jobs simple means re-inserting them into the queue. Jobs can be
+        *                  attempted up to three times before being discarded.
+        *   - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
+        *                  This lets delayed jobs wait in a staging area until a given timestamp is
+        *                  reached, at which point they will enter the queue. If this is not enabled
+        *                  or not supported, an exception will be thrown on delayed job insertion.
         *
         * Queue classes should throw an exception if they do not support the options given.
         *
@@ -128,7 +137,14 @@ abstract class JobQueue {
        abstract protected function optimalOrder();
 
        /**
-        * Quickly check if the queue is empty (has no available jobs).
+        * @return boolean Whether delayed jobs are supported
+        */
+       protected function supportsDelayedJobs() {
+               return false; // not implemented
+       }
+
+       /**
+        * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
         * Queue classes should use caching if they are any slower without memcached.
         *
         * If caching is used, this might return false when there are actually no jobs.
@@ -153,7 +169,7 @@ abstract class JobQueue {
        abstract protected function doIsEmpty();
 
        /**
-        * Get the number of available (unacquired) jobs in the queue.
+        * Get the number of available (unacquired, non-delayed) jobs in the queue.
         * Queue classes should use caching if they are any slower without memcached.
         *
         * If caching is used, this number might be out of date for a minute.
@@ -196,6 +212,31 @@ abstract class JobQueue {
         */
        abstract protected function doGetAcquiredCount();
 
+       /**
+        * Get the number of delayed jobs (these are temporarily out of the queue).
+        * Queue classes should use caching if they are any slower without memcached.
+        *
+        * If caching is used, this number might be out of date for a minute.
+        *
+        * @return integer
+        * @throws MWException
+        * @since 1.22
+        */
+       final public function getDelayedCount() {
+               wfProfileIn( __METHOD__ );
+               $res = $this->doGetDelayedCount();
+               wfProfileOut( __METHOD__ );
+               return $res;
+       }
+
+       /**
+        * @see JobQueue::getDelayedCount()
+        * @return integer
+        */
+       protected function doGetDelayedCount() {
+               return 0; // not implemented
+       }
+
        /**
         * Push a single jobs into the queue.
         * This does not require $wgJobClasses to be set for the given job type.
@@ -227,7 +268,11 @@ abstract class JobQueue {
 
                foreach ( $jobs as $job ) {
                        if ( $job->getType() !== $this->type ) {
-                               throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+                               throw new MWException(
+                                       "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
+                       } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+                               throw new MWException(
+                                       "Got delayed '{$job->getType()}' job; delays are not supported." );
                        }
                }
 
@@ -493,15 +538,27 @@ abstract class JobQueue {
        protected function doFlushCaches() {}
 
        /**
-        * Get an iterator to traverse over all of the jobs in this queue.
-        * This does not include jobs that are current acquired. In general,
-        * this should only be called on a queue that is no longer being popped.
+        * Get an iterator to traverse over all available jobs in this queue.
+        * This does not include jobs that are currently acquired or delayed.
+        * This should only be called on a queue that is no longer being popped.
         *
         * @return Iterator|Traversable|Array
         * @throws MWException
         */
        abstract public function getAllQueuedJobs();
 
+       /**
+        * Get an iterator to traverse over all delayed jobs in this queue.
+        * This should only be called on a queue that is no longer being popped.
+        *
+        * @return Iterator|Traversable|Array
+        * @throws MWException
+        * @since 1.22
+        */
+       public function getAllDelayedJobs() {
+               return array(); // not implemented
+       }
+
        /**
         * Namespace the queue with a key to isolate it for testing
         *
index 338dc79..bd23174 100644 (file)
  * This is faster, less resource intensive, queue that JobQueueDB.
  * All data for a queue using this class is placed into one redis server.
  *
- * There are seven main redis keys used to track jobs:
- *   - l-unclaimed  : A list of job IDs used for push/pop
+ * There are eight main redis keys used to track jobs:
+ *   - l-unclaimed  : A list of job IDs used for ready unclaimed jobs
  *   - z-claimed    : A sorted set of (job ID, UNIX timestamp as score) used for job retries
  *   - z-abandoned  : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
+ *   - z-delayed    : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs
  *   - h-idBySha1   : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
  *   - h-sha1Byid   : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
  *   - h-attempts   : A hash of (job ID => attempt count) used for job claiming/retries
  *   - h-data       : A hash of (job ID => serialized blobs) for job storage
- * Any given job ID can be in only one of l-unclaimed, z-claimed, and z-abandoned.
+ * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned.
  * If an ID appears in any of those lists, it should have a h-data entry for its ID.
- * If a job has a non-empty SHA1 de-duplication value and its ID is in l-unclaimed,
- * then there should be no other such jobs. Every h-idBySha1 entry has an h-sha1Byid
+ * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then
+ * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1Byid
  * entry and every h-sha1Byid must refer to an ID that is l-unclaimed. If a job has its
  * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
  *
- * Additionally, "rootjob:* keys to track "root jobs" used for additional de-duplication.
+ * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
  * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
  * All the keys are prefixed with the relevant wiki ID information.
  *
@@ -89,6 +90,10 @@ class JobQueueRedis extends JobQueue {
                return 'fifo';
        }
 
+       protected function supportsDelayedJobs() {
+               return true;
+       }
+
        /**
         * @see JobQueue::doIsEmpty()
         * @return bool
@@ -132,6 +137,23 @@ class JobQueueRedis extends JobQueue {
                }
        }
 
+       /**
+        * @see JobQueue::doGetDelayedCount()
+        * @return integer
+        * @throws MWException
+        */
+       protected function doGetDelayedCount() {
+               if ( !$this->checkDelay ) {
+                       return 0; // no delayed jobs
+               }
+               $conn = $this->getConnection();
+               try {
+                       return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
        /**
         * @see JobQueue::doBatchPush()
         * @param array $jobs
@@ -150,13 +172,8 @@ class JobQueueRedis extends JobQueue {
                                $items[$item['uuid']] = $item;
                        }
                }
-               // Convert the field maps into serialized blobs
-               $tuples = array();
-               foreach ( $items as $item ) {
-                       $tuples[] = array( $item['uuid'], $item['sha1'], serialize( $item ) );
-               }
 
-               if ( !count( $tuples ) ) {
+               if ( !count( $items ) ) {
                        return true; // nothing to do
                }
 
@@ -164,26 +181,26 @@ class JobQueueRedis extends JobQueue {
                try {
                        // Actually push the non-duplicate jobs into the queue...
                        if ( $flags & self::QoS_Atomic ) {
-                               $batches = array( $tuples ); // all or nothing
+                               $batches = array( $items ); // all or nothing
                        } else {
-                               $batches = array_chunk( $tuples, 500 ); // avoid tying up the server
+                               $batches = array_chunk( $items, 500 ); // avoid tying up the server
                        }
                        $failed = 0;
                        $pushed = 0;
-                       foreach ( $batches as $tupleBatch ) {
-                               $added = $this->pushBlobs( $conn, $tupleBatch );
+                       foreach ( $batches as $itemBatch ) {
+                               $added = $this->pushBlobs( $conn, $itemBatch );
                                if ( is_int( $added ) ) {
                                        $pushed += $added;
                                } else {
-                                       $failed += count( $tupleBatch );
+                                       $failed += count( $itemBatch );
                                }
                        }
                        if ( $failed > 0 ) {
                                wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
                                return false;
                        }
-                       wfIncrStats( 'job-insert', count( $tuples ) );
-                       wfIncrStats( 'job-insert-duplicate', count( $tuples ) - $failed - $pushed );
+                       wfIncrStats( 'job-insert', count( $items ) );
+                       wfIncrStats( 'job-insert-duplicate', count( $items ) - $failed - $pushed );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -193,30 +210,37 @@ class JobQueueRedis extends JobQueue {
 
        /**
         * @param RedisConnRef $conn
-        * @param array $tuples List of tuples of (job ID, job SHA1 or '', serialized blob)
+        * @param array $items List of results from JobQueueRedis::getNewJobFields()
         * @return integer Number of jobs inserted (duplicates are ignored)
         * @throws RedisException
         */
-       protected function pushBlobs( RedisConnRef $conn, array $tuples ) {
+       protected function pushBlobs( RedisConnRef $conn, array $items ) {
                $args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] )
-               foreach ( $tuples as $tuple ) {
-                       $args[] = $tuple[0]; // id
-                       $args[] = $tuple[1]; // sha1
-                       $args[] = $tuple[2]; // blob
+               foreach ( $items as $item ) {
+                       $args[] = (string)$item['uuid'];
+                       $args[] = (string)$item['sha1'];
+                       $args[] = (string)$item['rtimestamp'];
+                       $args[] = (string)serialize( $item );
                }
                static $script =
 <<<LUA
-               if #ARGV % 3 ~= 0 then return redis.error_reply('Unmatched arguments') end
+               if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
                local pushed = 0
-               for i = 1,#ARGV,3 do
-                       local id,sha1,blob = ARGV[i],ARGV[i+1],ARGV[i+2]
+               for i = 1,#ARGV,4 do
+                       local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
                        if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then
-                               redis.call('lPush',KEYS[1],id)
+                               if 1*rtimestamp > 0 then
+                                       -- Insert into delayed queue (release time as score)
+                                       redis.call('zAdd',KEYS[4],rtimestamp,id)
+                               else
+                                       -- Insert into unclaimed queue
+                                       redis.call('lPush',KEYS[1],id)
+                               end
                                if sha1 ~= '' then
                                        redis.call('hSet',KEYS[2],id,sha1)
                                        redis.call('hSet',KEYS[3],sha1,id)
                                end
-                               redis.call('hSet',KEYS[4],id,blob)
+                               redis.call('hSet',KEYS[5],id,blob)
                                pushed = pushed + 1
                        end
                end
@@ -228,11 +252,12 @@ LUA;
                                        $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
                                        $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
                                        $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
-                                       $this->getQueueKey( 'h-data' ), # KEYS[4]
+                                       $this->getQueueKey( 'z-delayed' ), # KEYS[4]
+                                       $this->getQueueKey( 'h-data' ), # KEYS[5]
                                ),
                                $args
                        ),
-                       4 # number of first argument(s) that are keys
+                       5 # number of first argument(s) that are keys
                );
        }
 
@@ -244,6 +269,12 @@ LUA;
        protected function doPop() {
                $job = false;
 
+               // Push ready delayed jobs into the queue every 10 jobs to spread the load.
+               // This is also done as a periodic task, but we don't want too much done at once.
+               if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
+                       $this->releaseReadyDelayedJobs();
+               }
+
                $conn = $this->getConnection();
                try {
                        do {
@@ -463,7 +494,29 @@ LUA;
        }
 
        /**
-        * This function should not be called outside RedisJobQueue
+        * @see JobQueue::getAllQueuedJobs()
+        * @return Iterator
+        */
+       public function getAllDelayedJobs() {
+               $conn = $this->getConnection();
+               if ( !$conn ) {
+                       throw new MWException( "Unable to connect to redis server." );
+               }
+               try {
+                       $that = $this;
+                       return new MappedIterator( // delayed jobs
+                               $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
+                               function( $uid ) use ( $that, $conn ) {
+                                       return $that->getJobFromUidInternal( $uid, $conn );
+                               }
+                       );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
+       /**
+        * This function should not be called outside JobQueueRedis
         *
         * @param $uid string
         * @param $conn RedisConnRef
@@ -485,6 +538,43 @@ LUA;
                }
        }
 
+       /**
+        * Release any ready delayed jobs into the queue
+        *
+        * @return integer Number of jobs released
+        * @throws MWException
+        */
+       public function releaseReadyDelayedJobs() {
+               $count = 0;
+
+               $conn = $this->getConnection();
+               try {
+                       static $script =
+<<<LUA
+                       -- Get the list of ready delayed jobs, sorted by readiness
+                       local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
+                       -- Migrate the jobs from the "delayed" set to the "unclaimed" list
+                       for k,id in ipairs(ids) do
+                               redis.call('lPush',KEYS[2],id)
+                               redis.call('zRem',KEYS[1],id)
+                       end
+                       return #ids
+LUA;
+                       $count += (int)$this->redisEval( $conn, $script,
+                               array(
+                                       $this->getQueueKey( 'z-delayed' ), // KEYS[1]
+                                       $this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
+                                       time() // ARGV[1]; max "delay until" UNIX timestamp
+                               ),
+                               2 # first two arguments are keys
+                       );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+
+               return $count;
+       }
+
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *
@@ -564,16 +654,20 @@ LUA;
         * @return Array
         */
        protected function doGetPeriodicTasks() {
+               $tasks = array();
                if ( $this->claimTTL > 0 ) {
-                       return array(
-                               'recycleAndDeleteStaleJobs' => array(
-                                       'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
-                                       'period'   => ceil( $this->claimTTL / 2 )
-                               )
+                       $tasks['recycleAndDeleteStaleJobs'] = array(
+                               'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+                               'period'   => ceil( $this->claimTTL / 2 )
                        );
-               } else {
-                       return array();
                }
+               if ( $this->checkDelay ) {
+                       $tasks['releaseReadyDelayedJobs'] = array(
+                               'callback' => array( $this, 'releaseReadyDelayedJobs' ),
+                               'period'   => 300 // 5 minutes
+                       );
+               }
+               return $tasks;
        }
 
        /**
@@ -584,11 +678,22 @@ LUA;
         * @return mixed
         */
        protected function redisEval( RedisConnRef $conn, $script, array $params, $numKeys ) {
-               $res = $conn->evalSha( sha1( $script ), $params, $numKeys );
-               if ( $res === false && $conn->getLastError() != '' ) { // not in script cache?
-                       wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
+               $sha1 = sha1( $script ); // 40 char hex
+
+               // Try to run the server-side cached copy of the script
+               $conn->clearLastError();
+               $res = $conn->evalSha( $sha1, $params, $numKeys );
+               // If the script is not in cache, use eval() to retry and cache it
+               if ( $conn->getLastError() && $conn->script( 'exists', $sha1 ) === array( 0 ) ) {
+                       $conn->clearLastError();
                        $res = $conn->eval( $script, $params, $numKeys );
+                       wfDebugLog( 'JobQueueRedis', "Used eval() for Lua script $sha1." );
                }
+
+               if ( $conn->getLastError() ) { // script bug?
+                       wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
+               }
+
                return $res;
        }
 
@@ -599,16 +704,18 @@ LUA;
        protected function getNewJobFields( Job $job ) {
                return array(
                        // Fields that describe the nature of the job
-                       'type'      => $job->getType(),
-                       'namespace' => $job->getTitle()->getNamespace(),
-                       'title'     => $job->getTitle()->getDBkey(),
-                       'params'    => $job->getParams(),
+                       'type'       => $job->getType(),
+                       'namespace'  => $job->getTitle()->getNamespace(),
+                       'title'      => $job->getTitle()->getDBkey(),
+                       'params'     => $job->getParams(),
+                       // Some jobs cannot run until a "release timestamp"
+                       'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
                        // Additional job metadata
-                       'uuid'      => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
-                       'sha1'      => $job->ignoreDuplicates()
+                       'uuid'       => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
+                       'sha1'       => $job->ignoreDuplicates()
                                ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
                                : '',
-                       'timestamp' => time() // UNIX timestamp
+                       'timestamp'  => time() // UNIX timestamp
                );
        }