wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
return false;
}
- wfIncrStats( 'job-insert', count( $items ) );
- wfIncrStats( 'job-insert-duplicate', count( $items ) - $failed - $pushed );
+ JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
+ JobQueue::incrStats( 'job-insert-duplicate', $this->type,
+ count( $items ) - $failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
* @throws RedisException
*/
protected function pushBlobs( RedisConnRef $conn, array $items ) {
- $args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] )
+ $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
foreach ( $items as $item ) {
$args[] = (string)$item['uuid'];
$args[] = (string)$item['sha1'];
end
return pushed
LUA;
- return $this->redisEval( $conn, $script,
+ return $conn->luaEval( $script,
array_merge(
array(
$this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
break; // no jobs; nothing to do
}
- wfIncrStats( 'job-pop' );
+ JobQueue::incrStats( 'job-pop', $this->type );
$item = unserialize( $blob );
if ( $item === false ) {
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
-- Return the job data
return item
LUA;
- return $this->redisEval( $conn, $script,
+ return $conn->luaEval( $script,
array(
$this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
$this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
redis.call('hIncrBy',KEYS[5],id,1)
return redis.call('hGet',KEYS[6],id)
LUA;
- return $this->redisEval( $conn, $script,
+ return $conn->luaEval( $script,
array(
$this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
$this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
* @throws MWException
*/
protected function doAck( Job $job ) {
+ if ( !isset( $job->metadata['uuid'] ) ) {
+ throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
+ }
if ( $this->claimTTL > 0 ) {
$conn = $this->getConnection();
try {
- // Get the exact field map this Job came from, regardless of whether
- // the job was transformed into a DuplicateJob or anything of the sort.
- $item = $job->metadata['sourceFields'];
-
static $script =
<<<LUA
-- Unmark the job as claimed
-- Delete the job data itself
return redis.call('hDel',KEYS[3],ARGV[1])
LUA;
- $res = $this->redisEval( $conn, $script,
+ $res = $conn->luaEval( $script,
array(
$this->getQueueKey( 'z-claimed' ), # KEYS[1]
$this->getQueueKey( 'h-attempts' ), # KEYS[2]
$this->getQueueKey( 'h-data' ), # KEYS[3]
- $item['uuid'] # ARGV[1]
+ $job->metadata['uuid'] # ARGV[1]
),
3 # number of first argument(s) that are keys
);
* @return bool
*/
protected function doIsRootJobOldDuplicate( Job $job ) {
- $params = $job->getParams();
- if ( !isset( $params['rootJobSignature'] ) ) {
+ if ( !$job->hasRootJobParams() ) {
return false; // job has no de-deplication info
- } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
- wfDebugLog( 'JobQueueRedis', "Cannot check root job; missing 'rootJobTimestamp'." );
- return false;
}
+ $params = $job->getRootJobParams();
$conn = $this->getConnection();
try {
*/
public function getAllQueuedJobs() {
$conn = $this->getConnection();
- if ( !$conn ) {
- throw new MWException( "Unable to connect to redis server." );
- }
try {
$that = $this;
return new MappedIterator(
*/
public function getAllDelayedJobs() {
$conn = $this->getConnection();
- if ( !$conn ) {
- throw new MWException( "Unable to connect to redis server." );
- }
try {
$that = $this;
return new MappedIterator( // delayed jobs
}
$title = Title::makeTitle( $item['namespace'], $item['title'] );
$job = Job::factory( $item['type'], $title, $item['params'] );
- $job->metadata['sourceFields'] = $item;
+ $job->metadata['uuid'] = $item['uuid'];
return $job;
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
end
return #ids
LUA;
- $count += (int)$this->redisEval( $conn, $script,
+ $count += (int)$conn->luaEval( $script,
array(
$this->getQueueKey( 'z-delayed' ), // KEYS[1]
$this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
end
return {released,abandoned,pruned}
LUA;
- $res = $this->redisEval( $conn, $script,
+ $res = $conn->luaEval( $script,
array(
$this->getQueueKey( 'z-claimed' ), # KEYS[1]
$this->getQueueKey( 'h-attempts' ), # KEYS[2]
if ( $res ) {
list( $released, $abandoned, $pruned ) = $res;
$count += $released + $pruned;
- wfIncrStats( 'job-recycle', count( $released ) );
+ JobQueue::incrStats( 'job-recycle', $this->type, $released );
+ JobQueue::incrStats( 'job-abandon', $this->type, $abandoned );
}
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
return $tasks;
}
- /**
- * @param RedisConnRef $conn
- * @param string $script
- * @param array $params
- * @param integer $numKeys
- * @return mixed
- */
- protected function redisEval( RedisConnRef $conn, $script, array $params, $numKeys ) {
- $sha1 = sha1( $script ); // 40 char hex
-
- // Try to run the server-side cached copy of the script
- $conn->clearLastError();
- $res = $conn->evalSha( $sha1, $params, $numKeys );
- // If the script is not in cache, use eval() to retry and cache it
- if ( $conn->getLastError() && $conn->script( 'exists', $sha1 ) === array( 0 ) ) {
- $conn->clearLastError();
- $res = $conn->eval( $script, $params, $numKeys );
- wfDebugLog( 'JobQueueRedis', "Used eval() for Lua script $sha1." );
- }
-
- if ( $conn->getLastError() ) { // script bug?
- wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
- }
-
- return $res;
- }
-
/**
* @param $job Job
* @return array
$title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
if ( $title ) {
$job = Job::factory( $fields['type'], $title, $fields['params'] );
- $job->metadata['sourceFields'] = $fields;
+ $job->metadata['uuid'] = $fields['uuid'];
return $job;
}
return false;