From b1aa60af41706dcd131eda7b72765872bb323f3e Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Mon, 11 Mar 2013 20:40:01 -0700 Subject: [PATCH] [JobQueue] Added support for delayed jobs with JobQueueRedis. * The queue can handle delaying jobs until a given timestamp is reached. * Added Job::getReleaseTimestamp() to let jobs specifiy delay amounts. * Added a "checkDelay" option and a supportsDelayedJobs() function to JobQueue. There are also getDelayedCount() and getAllDelayedJobs() functions. * Simplified a bit of code in doBatchPush() and pushBlobs(). * Improved the logic in redisEval(). Change-Id: I40b3e3438e659f6844bdbdd5e9d3ccc6c4dc82b2 --- includes/job/Job.php | 10 ++ includes/job/JobQueue.php | 103 ++++++++++++---- includes/job/JobQueueRedis.php | 207 +++++++++++++++++++++++++-------- 3 files changed, 247 insertions(+), 73 deletions(-) diff --git a/includes/job/Job.php b/includes/job/Job.php index bcf582e793..d8f55c3b81 100644 --- a/includes/job/Job.php +++ b/includes/job/Job.php @@ -176,6 +176,16 @@ abstract class Job { return $this->params; } + /** + * @return integer|null UNIX timestamp to delay running this job until, otherwise null + * @since 1.22 + */ + public function getReleaseTimestamp() { + return isset( $this->params['jobReleaseTimestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) + : null; + } + /** * @return bool Whether only one of each identical set of jobs should be run */ diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 5ef52b5ef1..9c152cdf69 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -34,6 +34,7 @@ abstract class JobQueue { protected $order; // string; job priority for pop() protected $claimTTL; // integer; seconds protected $maxTries; // integer; maximum number of times to try a job + protected $checkDelay; // boolean; allow delayed jobs const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions @@ -55,28 +56,36 @@ abstract class JobQueue { if ( !in_array( $this->order, $this->supportedOrders() ) ) { throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); } + $this->checkDelay = !empty( $params['checkDelay'] ); + if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { + throw new MWException( __CLASS__ . " does not support delayed jobs." ); + } } /** * Get a job queue object of the specified type. * $params includes: - * - class : What job class to use (determines job type) - * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) - * - type : The name of the job types this queue handles - * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". - * If "fifo" is used, the queue will effectively be FIFO. Note that - * job completion will not appear to be exactly FIFO if there are multiple - * job runners since jobs can take different times to finish once popped. - * If "timestamp" is used, the queue will at least be loosely ordered - * by timestamp, allowing for some jobs to be popped off out of order. - * If "random" is used, pop() will pick jobs in random order. - * Note that it may only be weakly random (e.g. a lottery of the oldest X). - * If "any" is choosen, the queue will use whatever order is the fastest. - * This might be useful for improving concurrency for job acquisition. - * - claimTTL : If supported, the queue will recycle jobs that have been popped - * but not acknowledged as completed after this many seconds. Recycling - * of jobs simple means re-inserting them into the queue. Jobs can be - * attempted up to three times before being discarded. + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that job + * completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simple means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. + * This lets delayed jobs wait in a staging area until a given timestamp is + * reached, at which point they will enter the queue. If this is not enabled + * or not supported, an exception will be thrown on delayed job insertion. * * Queue classes should throw an exception if they do not support the options given. * @@ -128,7 +137,14 @@ abstract class JobQueue { abstract protected function optimalOrder(); /** - * Quickly check if the queue is empty (has no available jobs). + * @return boolean Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return false; // not implemented + } + + /** + * Quickly check if the queue has no available (unacquired, non-delayed) jobs. * Queue classes should use caching if they are any slower without memcached. * * If caching is used, this might return false when there are actually no jobs. @@ -153,7 +169,7 @@ abstract class JobQueue { abstract protected function doIsEmpty(); /** - * Get the number of available (unacquired) jobs in the queue. + * Get the number of available (unacquired, non-delayed) jobs in the queue. * Queue classes should use caching if they are any slower without memcached. * * If caching is used, this number might be out of date for a minute. @@ -196,6 +212,31 @@ abstract class JobQueue { */ abstract protected function doGetAcquiredCount(); + /** + * Get the number of delayed jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws MWException + * @since 1.22 + */ + final public function getDelayedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetDelayedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getDelayedCount() + * @return integer + */ + protected function doGetDelayedCount() { + return 0; // not implemented + } + /** * Push a single jobs into the queue. * This does not require $wgJobClasses to be set for the given job type. @@ -227,7 +268,11 @@ abstract class JobQueue { foreach ( $jobs as $job ) { if ( $job->getType() !== $this->type ) { - throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + throw new MWException( + "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); + } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + throw new MWException( + "Got delayed '{$job->getType()}' job; delays are not supported." ); } } @@ -493,15 +538,27 @@ abstract class JobQueue { protected function doFlushCaches() {} /** - * Get an iterator to traverse over all of the jobs in this queue. - * This does not include jobs that are current acquired. In general, - * this should only be called on a queue that is no longer being popped. + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * This should only be called on a queue that is no longer being popped. * * @return Iterator|Traversable|Array * @throws MWException */ abstract public function getAllQueuedJobs(); + /** + * Get an iterator to traverse over all delayed jobs in this queue. + * This should only be called on a queue that is no longer being popped. + * + * @return Iterator|Traversable|Array + * @throws MWException + * @since 1.22 + */ + public function getAllDelayedJobs() { + return array(); // not implemented + } + /** * Namespace the queue with a key to isolate it for testing * diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 338dc79055..bd23174a8d 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -27,22 +27,23 @@ * This is faster, less resource intensive, queue that JobQueueDB. * All data for a queue using this class is placed into one redis server. * - * There are seven main redis keys used to track jobs: - * - l-unclaimed : A list of job IDs used for push/pop + * There are eight main redis keys used to track jobs: + * - l-unclaimed : A list of job IDs used for ready unclaimed jobs * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs + * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication * - h-sha1Byid : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries * - h-data : A hash of (job ID => serialized blobs) for job storage - * Any given job ID can be in only one of l-unclaimed, z-claimed, and z-abandoned. + * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. * If an ID appears in any of those lists, it should have a h-data entry for its ID. - * If a job has a non-empty SHA1 de-duplication value and its ID is in l-unclaimed, - * then there should be no other such jobs. Every h-idBySha1 entry has an h-sha1Byid + * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then + * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1Byid * entry and every h-sha1Byid must refer to an ID that is l-unclaimed. If a job has its * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. * - * Additionally, "rootjob:* keys to track "root jobs" used for additional de-duplication. + * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. * All the keys are prefixed with the relevant wiki ID information. * @@ -89,6 +90,10 @@ class JobQueueRedis extends JobQueue { return 'fifo'; } + protected function supportsDelayedJobs() { + return true; + } + /** * @see JobQueue::doIsEmpty() * @return bool @@ -132,6 +137,23 @@ class JobQueueRedis extends JobQueue { } } + /** + * @see JobQueue::doGetDelayedCount() + * @return integer + * @throws MWException + */ + protected function doGetDelayedCount() { + if ( !$this->checkDelay ) { + return 0; // no delayed jobs + } + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + /** * @see JobQueue::doBatchPush() * @param array $jobs @@ -150,13 +172,8 @@ class JobQueueRedis extends JobQueue { $items[$item['uuid']] = $item; } } - // Convert the field maps into serialized blobs - $tuples = array(); - foreach ( $items as $item ) { - $tuples[] = array( $item['uuid'], $item['sha1'], serialize( $item ) ); - } - if ( !count( $tuples ) ) { + if ( !count( $items ) ) { return true; // nothing to do } @@ -164,26 +181,26 @@ class JobQueueRedis extends JobQueue { try { // Actually push the non-duplicate jobs into the queue... if ( $flags & self::QoS_Atomic ) { - $batches = array( $tuples ); // all or nothing + $batches = array( $items ); // all or nothing } else { - $batches = array_chunk( $tuples, 500 ); // avoid tying up the server + $batches = array_chunk( $items, 500 ); // avoid tying up the server } $failed = 0; $pushed = 0; - foreach ( $batches as $tupleBatch ) { - $added = $this->pushBlobs( $conn, $tupleBatch ); + foreach ( $batches as $itemBatch ) { + $added = $this->pushBlobs( $conn, $itemBatch ); if ( is_int( $added ) ) { $pushed += $added; } else { - $failed += count( $tupleBatch ); + $failed += count( $itemBatch ); } } if ( $failed > 0 ) { wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); return false; } - wfIncrStats( 'job-insert', count( $tuples ) ); - wfIncrStats( 'job-insert-duplicate', count( $tuples ) - $failed - $pushed ); + wfIncrStats( 'job-insert', count( $items ) ); + wfIncrStats( 'job-insert-duplicate', count( $items ) - $failed - $pushed ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } @@ -193,30 +210,37 @@ class JobQueueRedis extends JobQueue { /** * @param RedisConnRef $conn - * @param array $tuples List of tuples of (job ID, job SHA1 or '', serialized blob) + * @param array $items List of results from JobQueueRedis::getNewJobFields() * @return integer Number of jobs inserted (duplicates are ignored) * @throws RedisException */ - protected function pushBlobs( RedisConnRef $conn, array $tuples ) { + protected function pushBlobs( RedisConnRef $conn, array $items ) { $args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] ) - foreach ( $tuples as $tuple ) { - $args[] = $tuple[0]; // id - $args[] = $tuple[1]; // sha1 - $args[] = $tuple[2]; // blob + foreach ( $items as $item ) { + $args[] = (string)$item['uuid']; + $args[] = (string)$item['sha1']; + $args[] = (string)$item['rtimestamp']; + $args[] = (string)serialize( $item ); } static $script = << 0 then + -- Insert into delayed queue (release time as score) + redis.call('zAdd',KEYS[4],rtimestamp,id) + else + -- Insert into unclaimed queue + redis.call('lPush',KEYS[1],id) + end if sha1 ~= '' then redis.call('hSet',KEYS[2],id,sha1) redis.call('hSet',KEYS[3],sha1,id) end - redis.call('hSet',KEYS[4],id,blob) + redis.call('hSet',KEYS[5],id,blob) pushed = pushed + 1 end end @@ -228,11 +252,12 @@ LUA; $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] + $this->getQueueKey( 'z-delayed' ), # KEYS[4] + $this->getQueueKey( 'h-data' ), # KEYS[5] ), $args ), - 4 # number of first argument(s) that are keys + 5 # number of first argument(s) that are keys ); } @@ -244,6 +269,12 @@ LUA; protected function doPop() { $job = false; + // Push ready delayed jobs into the queue every 10 jobs to spread the load. + // This is also done as a periodic task, but we don't want too much done at once. + if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { + $this->releaseReadyDelayedJobs(); + } + $conn = $this->getConnection(); try { do { @@ -463,7 +494,29 @@ LUA; } /** - * This function should not be called outside RedisJobQueue + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllDelayedJobs() { + $conn = $this->getConnection(); + if ( !$conn ) { + throw new MWException( "Unable to connect to redis server." ); + } + try { + $that = $this; + return new MappedIterator( // delayed jobs + $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), + function( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + } + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * This function should not be called outside JobQueueRedis * * @param $uid string * @param $conn RedisConnRef @@ -485,6 +538,43 @@ LUA; } } + /** + * Release any ready delayed jobs into the queue + * + * @return integer Number of jobs released + * @throws MWException + */ + public function releaseReadyDelayedJobs() { + $count = 0; + + $conn = $this->getConnection(); + try { + static $script = +<<redisEval( $conn, $script, + array( + $this->getQueueKey( 'z-delayed' ), // KEYS[1] + $this->getQueueKey( 'l-unclaimed' ), // KEYS[2] + time() // ARGV[1]; max "delay until" UNIX timestamp + ), + 2 # first two arguments are keys + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $count; + } + /** * Recycle or destroy any jobs that have been claimed for too long * @@ -564,16 +654,20 @@ LUA; * @return Array */ protected function doGetPeriodicTasks() { + $tasks = array(); if ( $this->claimTTL > 0 ) { - return array( - 'recycleAndDeleteStaleJobs' => array( - 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) - ) + $tasks['recycleAndDeleteStaleJobs'] = array( + 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), + 'period' => ceil( $this->claimTTL / 2 ) ); - } else { - return array(); } + if ( $this->checkDelay ) { + $tasks['releaseReadyDelayedJobs'] = array( + 'callback' => array( $this, 'releaseReadyDelayedJobs' ), + 'period' => 300 // 5 minutes + ); + } + return $tasks; } /** @@ -584,11 +678,22 @@ LUA; * @return mixed */ protected function redisEval( RedisConnRef $conn, $script, array $params, $numKeys ) { - $res = $conn->evalSha( sha1( $script ), $params, $numKeys ); - if ( $res === false && $conn->getLastError() != '' ) { // not in script cache? - wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() ); + $sha1 = sha1( $script ); // 40 char hex + + // Try to run the server-side cached copy of the script + $conn->clearLastError(); + $res = $conn->evalSha( $sha1, $params, $numKeys ); + // If the script is not in cache, use eval() to retry and cache it + if ( $conn->getLastError() && $conn->script( 'exists', $sha1 ) === array( 0 ) ) { + $conn->clearLastError(); $res = $conn->eval( $script, $params, $numKeys ); + wfDebugLog( 'JobQueueRedis', "Used eval() for Lua script $sha1." ); } + + if ( $conn->getLastError() ) { // script bug? + wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() ); + } + return $res; } @@ -599,16 +704,18 @@ LUA; protected function getNewJobFields( Job $job ) { return array( // Fields that describe the nature of the job - 'type' => $job->getType(), - 'namespace' => $job->getTitle()->getNamespace(), - 'title' => $job->getTitle()->getDBkey(), - 'params' => $job->getParams(), + 'type' => $job->getType(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getDBkey(), + 'params' => $job->getParams(), + // Some jobs cannot run until a "release timestamp" + 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, // Additional job metadata - 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), - 'sha1' => $job->ignoreDuplicates() + 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), + 'sha1' => $job->ignoreDuplicates() ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) : '', - 'timestamp' => time() // UNIX timestamp + 'timestamp' => time() // UNIX timestamp ); } -- 2.20.1