abstract class JobQueue {
protected $wiki; // string; wiki ID
protected $type; // string; job type
+ protected $order; // string; job priority for pop()
const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
* @param $params array
*/
protected function __construct( array $params ) {
- $this->wiki = $params['wiki'];
- $this->type = $params['type'];
+ $this->wiki = $params['wiki'];
+ $this->type = $params['type'];
+ $this->order = isset( $params['order'] ) ? $params['order'] : 'random';
}
/**
* class : what job class to use (determines job type)
* wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
* type : The name of the job types this queue handles
+ * order : Order that pop() selects jobs, either "timestamp" or "random".
+ * If "timestamp" is used, the queue will effectively be FIFO. Note that
+ * pop() will not be exactly FIFO, and even if it was, job completion would
+ * not appear to be exactly FIFO since jobs can take different times to finish.
+ * If "random" is used, pop() will pick jobs in random order. This might be
+ * useful for improving concurrency depending on the queue storage medium.
*
* @param $params array
* @return JobQueue
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
try {
do { // retry when our row is invalid or deleted as a duplicate
- $row = false; // row claimed
- $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
- $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
- // Try to reserve a DB row...
- if ( $this->claim( $uuid, $rand, $gte ) || $this->claim( $uuid, $rand, !$gte ) ) {
- // Fetch any row that we just reserved...
- $row = $dbw->selectRow( 'job', '*',
- array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
- // Check if another process deleted it as a duplicate
- if ( !$row ) {
- wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
- continue; // try again
- }
- // Get the job object from the row...
- $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
- if ( !$title ) {
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
- continue; // try again
- }
- $job = Job::factory( $row->job_cmd, $title,
- self::extractBlob( $row->job_params ), $row->job_id );
- // Delete any *other* duplicate jobs in the queue...
- if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
- $dbw->delete( 'job',
- array( 'job_sha1' => $row->job_sha1,
- "job_id != {$dbw->addQuotes( $row->job_id )}" ),
- __METHOD__
- );
- }
- } else {
+ // Try to reserve a row in the DB...
+ if ( $this->order === 'timestamp' ) { // oldest first
+ $found = $this->claim( $uuid, 0, true );
+ } else { // random first
+ $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+ $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+ $found = $this->claim( $uuid, $rand, $gte )
+ || $this->claim( $uuid, $rand, !$gte ); // try both directions
+ }
+ // Check if we found a row to reserve...
+ if ( !$found ) {
$wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
+ break; // nothing to do
+ }
+ // Fetch any row that we just reserved...
+ $row = $dbw->selectRow( 'job', '*',
+ array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
+ // Check if another process deleted it as a duplicate
+ if ( !$row ) {
+ wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+ continue; // try again
+ }
+ // Get the job object from the row...
+ $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+ if ( !$title ) {
+ $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+ wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
+ continue; // try again
+ }
+ $job = Job::factory( $row->job_cmd, $title,
+ self::extractBlob( $row->job_params ), $row->job_id );
+ // Delete any *other* duplicate jobs in the queue...
+ if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
+ $dbw->delete( 'job',
+ array( 'job_sha1' => $row->job_sha1,
+ "job_id != {$dbw->addQuotes( $row->job_id )}" ),
+ __METHOD__
+ );
}
break; // done
} while( true );
'job_params' => self::makeBlob( $job->getParams() ),
);
// Additional job metadata
+ if ( $this->order === 'timestamp' ) { // oldest first
+ $random = time() - 1325376000; // seconds since "January 1, 2012"
+ } else { // random first
+ $random = mt_rand( 0, self::MAX_JOB_RANDOM );
+ }
$dbw = $this->getMasterDB();
$metaFields = array(
'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),