protected $wiki; // string; wiki ID
protected $type; // string; job type
protected $order; // string; job priority for pop()
+ protected $claimTTL; // integer; seconds
const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
* @param $params array
*/
protected function __construct( array $params ) {
- $this->wiki = $params['wiki'];
- $this->type = $params['type'];
- $this->order = isset( $params['order'] ) ? $params['order'] : 'random';
+ $this->wiki = $params['wiki'];
+ $this->type = $params['type'];
+ $this->order = isset( $params['order'] ) ? $params['order'] : 'random';
+ $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
}
/**
* Get a job queue object of the specified type.
* $params includes:
- * class : What job class to use (determines job type)
- * wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
- * type : The name of the job types this queue handles
- * order : Order that pop() selects jobs, either "timestamp" or "random".
- * If "timestamp" is used, the queue will effectively be FIFO. Note that
- * pop() will not be exactly FIFO, and even if it was, job completion would
- * not appear to be exactly FIFO since jobs can take different times to finish.
- * If "random" is used, pop() will pick jobs in random order. This might be
- * useful for improving concurrency depending on the queue storage medium.
+ * class : What job class to use (determines job type)
+ * wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
+ * type : The name of the job types this queue handles
+ * order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+ * If "fifo" is used, the queue will effectively be FIFO. Note that
+ * job completion will not appear to be exactly FIFO if there are multiple
+ * job runners since jobs can take different times to finish once popped.
+ * If "timestamp" is used, the queue will at least be loosely ordered
+ * by timestamp, allowing for some jobs to be popped off out of order.
+ * If "random" is used, pop() will pick jobs in random order. This might be
+ * useful for improving concurrency depending on the queue storage medium.
+ * claimTTL : If supported, the queue will recycle jobs that have been popped
+ * but not acknowledged as completed after this many seconds.
*
* @param $params array
* @return JobQueue
* @since 1.21
*/
class JobQueueDB extends JobQueue {
- const CACHE_TTL = 300; // integer; seconds
- const MAX_JOB_RANDOM = 2147483647; // 2^31 - 1; used for job_random
+ const CACHE_TTL = 300; // integer; seconds to cache queue information
+ const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
+ const MAX_ATTEMPTS = 3; // integer; number of times to try a job
+ const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
/**
* @see JobQueue::doIsEmpty()
$autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
try {
+ // Occasionally recycle jobs back into the queue that have been claimed too long
+ if ( mt_rand( 0, 99 ) == 0 ) {
+ $this->recycleStaleJobs();
+ }
do { // retry when our row is invalid or deleted as a duplicate
// Try to reserve a row in the DB...
- if ( $this->order === 'timestamp' ) { // oldest first
+ if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
$row = $this->claimOldest( $uuid );
} else { // random first
$rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
);
if ( $row ) { // claim the job
$dbw->update( 'job', // update by PK
- array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+ array(
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ),
array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
__METHOD__
);
// same table being changed in an UPDATE query in MySQL (gives Error: 1093).
// Oracle and Postgre have no such limitation. However, MySQL offers an
// alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
- $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
- SET
- job_token = {$dbw->addQuotes( $uuid ) },
- job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
- WHERE (
- job_cmd = {$dbw->addQuotes( $this->type )}
- AND job_token = {$dbw->addQuotes( '' )}
- ) ORDER BY job_random ASC LIMIT 1",
+ $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
+ "SET " .
+ "job_token = {$dbw->addQuotes( $uuid ) }, " .
+ "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
+ "job_attempts = job_attempts+1 " .
+ "WHERE ( " .
+ "job_cmd = {$dbw->addQuotes( $this->type )} " .
+ "AND job_token = {$dbw->addQuotes( '' )} " .
+ ") ORDER BY job_id ASC LIMIT 1",
__METHOD__
);
} else {
// Use a subquery to find the job, within an UPDATE to claim it.
// This uses as much of the DB wrapper functions as possible.
$dbw->update( 'job',
- array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+ array(
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ),
array( 'job_id = (' .
$dbw->selectSQLText( 'job', 'job_id',
array( 'job_cmd' => $this->type, 'job_token' => '' ),
__METHOD__,
- array( 'ORDER BY' => 'job_random ASC', 'LIMIT' => 1 ) ) .
+ array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
')'
),
__METHOD__
return $row;
}
+ /**
+ * Recycle or destroy any jobs that have been claimed for too long
+ *
+ * @return integer Number of jobs recycled/deleted
+ */
+ protected function recycleStaleJobs() {
+ $now = time();
+ $dbw = $this->getMasterDB();
+
+ if ( $this->claimTTL > 0 ) { // re-try stale jobs...
+ $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+ // Reset job_token for these jobs so that other runners will pick them up.
+ // Set the timestamp to the current time, as it is useful to now that the job
+ // was already tried before.
+ $dbw->update( 'job',
+ array(
+ 'job_token' => '',
+ 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+ array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+ "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ),
+ __METHOD__
+ );
+ }
+
+ // Just destroy stale jobs...
+ $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+ $conds = array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
+ );
+ if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+ $conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}";
+ }
+
+ return $dbw->affectedRows();
+ }
+
/**
* @see JobQueue::doAck()
* @return Job|bool
'job_params' => self::makeBlob( $job->getParams() ),
);
// Additional job metadata
- if ( $this->order === 'timestamp' ) { // oldest first
- $random = time() - 1325376000; // seconds since "January 1, 2012"
- } else { // random first
- $random = mt_rand( 0, self::MAX_JOB_RANDOM );
- }
$dbw = $this->getMasterDB();
$metaFields = array(
'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
'job_timestamp' => $dbw->timestamp(),
'job_sha1' => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ),
- 'job_random' => $random
+ 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
);
return ( $descFields + $metaFields );
}
-- Stored as a PHP serialized array, or an empty string if there are no parameters
job_params blob NOT NULL,
- -- Random, non-unique, number used for job acquisition
- -- Either a simple timestamp or a totally random number (for lock concurrency)
+ -- Random, non-unique, number used for job acquisition (for lock concurrency)
job_random integer unsigned NOT NULL default 0,
+ -- The number of times this job has been locked
+ job_attempts integer unsigned NOT NULL default 0,
+
-- Field that conveys process locks on rows via process UUIDs
job_token varbinary(32) NOT NULL default '',
CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1);
CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random);
+CREATE INDEX /*i*/job_cmd_token_id ON /*_*/job (job_cmd,job_token,job_id);
CREATE INDEX /*i*/job_cmd ON /*_*/job (job_cmd, job_namespace, job_title, job_params(128));
CREATE INDEX /*i*/job_timestamp ON /*_*/job (job_timestamp);