protected $order; // string; job priority for pop()
protected $claimTTL; // integer; seconds
protected $maxTries; // integer; maximum number of times to try a job
+ protected $checkDelay; // boolean; allow delayed jobs
const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
if ( !in_array( $this->order, $this->supportedOrders() ) ) {
throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
}
+ $this->checkDelay = !empty( $params['checkDelay'] );
+ if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
+ throw new MWException( __CLASS__ . " does not support delayed jobs." );
+ }
}
/**
* Get a job queue object of the specified type.
* $params includes:
- * - class : What job class to use (determines job type)
- * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
- * - type : The name of the job types this queue handles
- * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
- * If "fifo" is used, the queue will effectively be FIFO. Note that
- * job completion will not appear to be exactly FIFO if there are multiple
- * job runners since jobs can take different times to finish once popped.
- * If "timestamp" is used, the queue will at least be loosely ordered
- * by timestamp, allowing for some jobs to be popped off out of order.
- * If "random" is used, pop() will pick jobs in random order.
- * Note that it may only be weakly random (e.g. a lottery of the oldest X).
- * If "any" is choosen, the queue will use whatever order is the fastest.
- * This might be useful for improving concurrency for job acquisition.
- * - claimTTL : If supported, the queue will recycle jobs that have been popped
- * but not acknowledged as completed after this many seconds. Recycling
- * of jobs simple means re-inserting them into the queue. Jobs can be
- * attempted up to three times before being discarded.
+ * - class : What job class to use (determines job type)
+ * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
+ * - type : The name of the job types this queue handles
+ * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+ * If "fifo" is used, the queue will effectively be FIFO. Note that job
+ * completion will not appear to be exactly FIFO if there are multiple
+ * job runners since jobs can take different times to finish once popped.
+ * If "timestamp" is used, the queue will at least be loosely ordered
+ * by timestamp, allowing for some jobs to be popped off out of order.
+ * If "random" is used, pop() will pick jobs in random order.
+ * Note that it may only be weakly random (e.g. a lottery of the oldest X).
+ * If "any" is choosen, the queue will use whatever order is the fastest.
+ * This might be useful for improving concurrency for job acquisition.
+ * - claimTTL : If supported, the queue will recycle jobs that have been popped
+ * but not acknowledged as completed after this many seconds. Recycling
+ * of jobs simple means re-inserting them into the queue. Jobs can be
+ * attempted up to three times before being discarded.
+ * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
+ * This lets delayed jobs wait in a staging area until a given timestamp is
+ * reached, at which point they will enter the queue. If this is not enabled
+ * or not supported, an exception will be thrown on delayed job insertion.
*
* Queue classes should throw an exception if they do not support the options given.
*
abstract protected function optimalOrder();
/**
- * Quickly check if the queue is empty (has no available jobs).
+ * @return boolean Whether delayed jobs are supported
+ */
+ protected function supportsDelayedJobs() {
+ return false; // not implemented
+ }
+
+ /**
+ * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
* Queue classes should use caching if they are any slower without memcached.
*
* If caching is used, this might return false when there are actually no jobs.
abstract protected function doIsEmpty();
/**
- * Get the number of available (unacquired) jobs in the queue.
+ * Get the number of available (unacquired, non-delayed) jobs in the queue.
* Queue classes should use caching if they are any slower without memcached.
*
* If caching is used, this number might be out of date for a minute.
*/
abstract protected function doGetAcquiredCount();
+ /**
+ * Get the number of delayed jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return integer
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getDelayedCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetDelayedCount();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getDelayedCount()
+ * @return integer
+ */
+ protected function doGetDelayedCount() {
+ return 0; // not implemented
+ }
+
/**
* Push a single jobs into the queue.
* This does not require $wgJobClasses to be set for the given job type.
foreach ( $jobs as $job ) {
if ( $job->getType() !== $this->type ) {
- throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ throw new MWException(
+ "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
+ } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+ throw new MWException(
+ "Got delayed '{$job->getType()}' job; delays are not supported." );
}
}
protected function doFlushCaches() {}
/**
- * Get an iterator to traverse over all of the jobs in this queue.
- * This does not include jobs that are current acquired. In general,
- * this should only be called on a queue that is no longer being popped.
+ * Get an iterator to traverse over all available jobs in this queue.
+ * This does not include jobs that are currently acquired or delayed.
+ * This should only be called on a queue that is no longer being popped.
*
* @return Iterator|Traversable|Array
* @throws MWException
*/
abstract public function getAllQueuedJobs();
+ /**
+ * Get an iterator to traverse over all delayed jobs in this queue.
+ * This should only be called on a queue that is no longer being popped.
+ *
+ * @return Iterator|Traversable|Array
+ * @throws MWException
+ * @since 1.22
+ */
+ public function getAllDelayedJobs() {
+ return array(); // not implemented
+ }
+
/**
* Namespace the queue with a key to isolate it for testing
*
* This is faster, less resource intensive, queue that JobQueueDB.
* All data for a queue using this class is placed into one redis server.
*
- * There are seven main redis keys used to track jobs:
- * - l-unclaimed : A list of job IDs used for push/pop
+ * There are eight main redis keys used to track jobs:
+ * - l-unclaimed : A list of job IDs used for ready unclaimed jobs
* - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries
* - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
+ * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs
* - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
* - h-sha1Byid : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
* - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries
* - h-data : A hash of (job ID => serialized blobs) for job storage
- * Any given job ID can be in only one of l-unclaimed, z-claimed, and z-abandoned.
+ * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned.
* If an ID appears in any of those lists, it should have a h-data entry for its ID.
- * If a job has a non-empty SHA1 de-duplication value and its ID is in l-unclaimed,
- * then there should be no other such jobs. Every h-idBySha1 entry has an h-sha1Byid
+ * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then
+ * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1Byid
* entry and every h-sha1Byid must refer to an ID that is l-unclaimed. If a job has its
* ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
*
- * Additionally, "rootjob:* keys to track "root jobs" used for additional de-duplication.
+ * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
* Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
* All the keys are prefixed with the relevant wiki ID information.
*
return 'fifo';
}
+ protected function supportsDelayedJobs() {
+ return true;
+ }
+
/**
* @see JobQueue::doIsEmpty()
* @return bool
}
}
+ /**
+ * @see JobQueue::doGetDelayedCount()
+ * @return integer
+ * @throws MWException
+ */
+ protected function doGetDelayedCount() {
+ if ( !$this->checkDelay ) {
+ return 0; // no delayed jobs
+ }
+ $conn = $this->getConnection();
+ try {
+ return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ }
+
/**
* @see JobQueue::doBatchPush()
* @param array $jobs
$items[$item['uuid']] = $item;
}
}
- // Convert the field maps into serialized blobs
- $tuples = array();
- foreach ( $items as $item ) {
- $tuples[] = array( $item['uuid'], $item['sha1'], serialize( $item ) );
- }
- if ( !count( $tuples ) ) {
+ if ( !count( $items ) ) {
return true; // nothing to do
}
try {
// Actually push the non-duplicate jobs into the queue...
if ( $flags & self::QoS_Atomic ) {
- $batches = array( $tuples ); // all or nothing
+ $batches = array( $items ); // all or nothing
} else {
- $batches = array_chunk( $tuples, 500 ); // avoid tying up the server
+ $batches = array_chunk( $items, 500 ); // avoid tying up the server
}
$failed = 0;
$pushed = 0;
- foreach ( $batches as $tupleBatch ) {
- $added = $this->pushBlobs( $conn, $tupleBatch );
+ foreach ( $batches as $itemBatch ) {
+ $added = $this->pushBlobs( $conn, $itemBatch );
if ( is_int( $added ) ) {
$pushed += $added;
} else {
- $failed += count( $tupleBatch );
+ $failed += count( $itemBatch );
}
}
if ( $failed > 0 ) {
wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
return false;
}
- wfIncrStats( 'job-insert', count( $tuples ) );
- wfIncrStats( 'job-insert-duplicate', count( $tuples ) - $failed - $pushed );
+ wfIncrStats( 'job-insert', count( $items ) );
+ wfIncrStats( 'job-insert-duplicate', count( $items ) - $failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
/**
* @param RedisConnRef $conn
- * @param array $tuples List of tuples of (job ID, job SHA1 or '', serialized blob)
+ * @param array $items List of results from JobQueueRedis::getNewJobFields()
* @return integer Number of jobs inserted (duplicates are ignored)
* @throws RedisException
*/
- protected function pushBlobs( RedisConnRef $conn, array $tuples ) {
+ protected function pushBlobs( RedisConnRef $conn, array $items ) {
$args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] )
- foreach ( $tuples as $tuple ) {
- $args[] = $tuple[0]; // id
- $args[] = $tuple[1]; // sha1
- $args[] = $tuple[2]; // blob
+ foreach ( $items as $item ) {
+ $args[] = (string)$item['uuid'];
+ $args[] = (string)$item['sha1'];
+ $args[] = (string)$item['rtimestamp'];
+ $args[] = (string)serialize( $item );
}
static $script =
<<<LUA
- if #ARGV % 3 ~= 0 then return redis.error_reply('Unmatched arguments') end
+ if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
local pushed = 0
- for i = 1,#ARGV,3 do
- local id,sha1,blob = ARGV[i],ARGV[i+1],ARGV[i+2]
+ for i = 1,#ARGV,4 do
+ local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then
- redis.call('lPush',KEYS[1],id)
+ if 1*rtimestamp > 0 then
+ -- Insert into delayed queue (release time as score)
+ redis.call('zAdd',KEYS[4],rtimestamp,id)
+ else
+ -- Insert into unclaimed queue
+ redis.call('lPush',KEYS[1],id)
+ end
if sha1 ~= '' then
redis.call('hSet',KEYS[2],id,sha1)
redis.call('hSet',KEYS[3],sha1,id)
end
- redis.call('hSet',KEYS[4],id,blob)
+ redis.call('hSet',KEYS[5],id,blob)
pushed = pushed + 1
end
end
$this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
$this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
$this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
- $this->getQueueKey( 'h-data' ), # KEYS[4]
+ $this->getQueueKey( 'z-delayed' ), # KEYS[4]
+ $this->getQueueKey( 'h-data' ), # KEYS[5]
),
$args
),
- 4 # number of first argument(s) that are keys
+ 5 # number of first argument(s) that are keys
);
}
protected function doPop() {
$job = false;
+ // Push ready delayed jobs into the queue every 10 jobs to spread the load.
+ // This is also done as a periodic task, but we don't want too much done at once.
+ if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
+ $this->releaseReadyDelayedJobs();
+ }
+
$conn = $this->getConnection();
try {
do {
}
/**
- * This function should not be called outside RedisJobQueue
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllDelayedJobs() {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ throw new MWException( "Unable to connect to redis server." );
+ }
+ try {
+ $that = $this;
+ return new MappedIterator( // delayed jobs
+ $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
+ function( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ }
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ }
+
+ /**
+ * This function should not be called outside JobQueueRedis
*
* @param $uid string
* @param $conn RedisConnRef
}
}
+ /**
+ * Release any ready delayed jobs into the queue
+ *
+ * @return integer Number of jobs released
+ * @throws MWException
+ */
+ public function releaseReadyDelayedJobs() {
+ $count = 0;
+
+ $conn = $this->getConnection();
+ try {
+ static $script =
+<<<LUA
+ -- Get the list of ready delayed jobs, sorted by readiness
+ local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
+ -- Migrate the jobs from the "delayed" set to the "unclaimed" list
+ for k,id in ipairs(ids) do
+ redis.call('lPush',KEYS[2],id)
+ redis.call('zRem',KEYS[1],id)
+ end
+ return #ids
+LUA;
+ $count += (int)$this->redisEval( $conn, $script,
+ array(
+ $this->getQueueKey( 'z-delayed' ), // KEYS[1]
+ $this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
+ time() // ARGV[1]; max "delay until" UNIX timestamp
+ ),
+ 2 # first two arguments are keys
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+
+ return $count;
+ }
+
/**
* Recycle or destroy any jobs that have been claimed for too long
*
* @return Array
*/
protected function doGetPeriodicTasks() {
+ $tasks = array();
if ( $this->claimTTL > 0 ) {
- return array(
- 'recycleAndDeleteStaleJobs' => array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- )
+ $tasks['recycleAndDeleteStaleJobs'] = array(
+ 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+ 'period' => ceil( $this->claimTTL / 2 )
);
- } else {
- return array();
}
+ if ( $this->checkDelay ) {
+ $tasks['releaseReadyDelayedJobs'] = array(
+ 'callback' => array( $this, 'releaseReadyDelayedJobs' ),
+ 'period' => 300 // 5 minutes
+ );
+ }
+ return $tasks;
}
/**
* @return mixed
*/
protected function redisEval( RedisConnRef $conn, $script, array $params, $numKeys ) {
- $res = $conn->evalSha( sha1( $script ), $params, $numKeys );
- if ( $res === false && $conn->getLastError() != '' ) { // not in script cache?
- wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
+ $sha1 = sha1( $script ); // 40 char hex
+
+ // Try to run the server-side cached copy of the script
+ $conn->clearLastError();
+ $res = $conn->evalSha( $sha1, $params, $numKeys );
+ // If the script is not in cache, use eval() to retry and cache it
+ if ( $conn->getLastError() && $conn->script( 'exists', $sha1 ) === array( 0 ) ) {
+ $conn->clearLastError();
$res = $conn->eval( $script, $params, $numKeys );
+ wfDebugLog( 'JobQueueRedis', "Used eval() for Lua script $sha1." );
}
+
+ if ( $conn->getLastError() ) { // script bug?
+ wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
+ }
+
return $res;
}
protected function getNewJobFields( Job $job ) {
return array(
// Fields that describe the nature of the job
- 'type' => $job->getType(),
- 'namespace' => $job->getTitle()->getNamespace(),
- 'title' => $job->getTitle()->getDBkey(),
- 'params' => $job->getParams(),
+ 'type' => $job->getType(),
+ 'namespace' => $job->getTitle()->getNamespace(),
+ 'title' => $job->getTitle()->getDBkey(),
+ 'params' => $job->getParams(),
+ // Some jobs cannot run until a "release timestamp"
+ 'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
// Additional job metadata
- 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
- 'sha1' => $job->ignoreDuplicates()
+ 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
+ 'sha1' => $job->ignoreDuplicates()
? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
: '',
- 'timestamp' => time() // UNIX timestamp
+ 'timestamp' => time() // UNIX timestamp
);
}