From 59da375cd9c93ab0f8ee3a9fbe2e02f8e8630ed5 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 24 Oct 2012 10:14:54 -0700 Subject: [PATCH] [JobQueue] Added support for approximate FIFO job queues. Change-Id: Icd8c084174e26a2a69f370e7bf6f858eba76e520 --- includes/DefaultSettings.php | 2 +- includes/job/JobQueue.php | 12 +++++- includes/job/JobQueueDB.php | 73 +++++++++++++++++++--------------- includes/job/JobQueueGroup.php | 12 ++---- maintenance/tables.sql | 3 +- 5 files changed, 59 insertions(+), 43 deletions(-) diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index cea63e07a3..2bf24a6f35 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -5436,7 +5436,7 @@ $wgJobTypesExcludedFromDefaultQueue = array(); * These settings should be global to all wikis. */ $wgJobTypeConf = array( - 'default' => array( 'class' => 'JobQueueDB' ), + 'default' => array( 'class' => 'JobQueueDB', 'order' => 'random' ), ); /** diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 6409cffc41..4637bd2b22 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -31,6 +31,7 @@ abstract class JobQueue { protected $wiki; // string; wiki ID protected $type; // string; job type + protected $order; // string; job priority for pop() const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions @@ -38,8 +39,9 @@ abstract class JobQueue { * @param $params array */ protected function __construct( array $params ) { - $this->wiki = $params['wiki']; - $this->type = $params['type']; + $this->wiki = $params['wiki']; + $this->type = $params['type']; + $this->order = isset( $params['order'] ) ? $params['order'] : 'random'; } /** @@ -48,6 +50,12 @@ abstract class JobQueue { * class : what job class to use (determines job type) * wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) * type : The name of the job types this queue handles + * order : Order that pop() selects jobs, either "timestamp" or "random". + * If "timestamp" is used, the queue will effectively be FIFO. Note that + * pop() will not be exactly FIFO, and even if it was, job completion would + * not appear to be exactly FIFO since jobs can take different times to finish. + * If "random" is used, pop() will pick jobs in random order. This might be + * useful for improving concurrency depending on the queue storage medium. * * @param $params array * @return JobQueue diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index bea4a6f69a..f6003b239d 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -121,38 +121,44 @@ class JobQueueDB extends JobQueue { $dbw->clearFlag( DBO_TRX ); // make each query its own transaction try { do { // retry when our row is invalid or deleted as a duplicate - $row = false; // row claimed - $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs - $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand - // Try to reserve a DB row... - if ( $this->claim( $uuid, $rand, $gte ) || $this->claim( $uuid, $rand, !$gte ) ) { - // Fetch any row that we just reserved... - $row = $dbw->selectRow( 'job', '*', - array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ ); - // Check if another process deleted it as a duplicate - if ( !$row ) { - wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." ); - continue; // try again - } - // Get the job object from the row... - $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); - if ( !$title ) { - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." ); - continue; // try again - } - $job = Job::factory( $row->job_cmd, $title, - self::extractBlob( $row->job_params ), $row->job_id ); - // Delete any *other* duplicate jobs in the queue... - if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) { - $dbw->delete( 'job', - array( 'job_sha1' => $row->job_sha1, - "job_id != {$dbw->addQuotes( $row->job_id )}" ), - __METHOD__ - ); - } - } else { + // Try to reserve a row in the DB... + if ( $this->order === 'timestamp' ) { // oldest first + $found = $this->claim( $uuid, 0, true ); + } else { // random first + $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + $found = $this->claim( $uuid, $rand, $gte ) + || $this->claim( $uuid, $rand, !$gte ); // try both directions + } + // Check if we found a row to reserve... + if ( !$found ) { $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL ); + break; // nothing to do + } + // Fetch any row that we just reserved... + $row = $dbw->selectRow( 'job', '*', + array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ ); + // Check if another process deleted it as a duplicate + if ( !$row ) { + wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." ); + continue; // try again + } + // Get the job object from the row... + $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); + if ( !$title ) { + $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); + wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." ); + continue; // try again + } + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + // Delete any *other* duplicate jobs in the queue... + if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) { + $dbw->delete( 'job', + array( 'job_sha1' => $row->job_sha1, + "job_id != {$dbw->addQuotes( $row->job_id )}" ), + __METHOD__ + ); } break; // done } while( true ); @@ -274,6 +280,11 @@ class JobQueueDB extends JobQueue { 'job_params' => self::makeBlob( $job->getParams() ), ); // Additional job metadata + if ( $this->order === 'timestamp' ) { // oldest first + $random = time() - 1325376000; // seconds since "January 1, 2012" + } else { // random first + $random = mt_rand( 0, self::MAX_JOB_RANDOM ); + } $dbw = $this->getMasterDB(); $metaFields = array( 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index 69bcf011be..4ebd531d15 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -62,18 +62,14 @@ class JobQueueGroup { public function get( $type ) { global $wgJobTypeConf; - $conf = false; + $conf = array( 'wiki' => $this->wiki, 'type' => $type ); if ( isset( $wgJobTypeConf[$type] ) ) { - $conf = $wgJobTypeConf[$type]; + $conf = $conf + $wgJobTypeConf[$type]; } else { - $conf = $wgJobTypeConf['default']; + $conf = $conf + $wgJobTypeConf['default']; } - return JobQueue::factory( array( - 'class' => $conf['class'], - 'wiki' => $this->wiki, - 'type' => $type, - ) ); + return JobQueue::factory( $conf ); } /** diff --git a/maintenance/tables.sql b/maintenance/tables.sql index 197151b349..a4cdefd18f 100644 --- a/maintenance/tables.sql +++ b/maintenance/tables.sql @@ -1291,7 +1291,8 @@ CREATE TABLE /*_*/job ( -- Stored as a PHP serialized array, or an empty string if there are no parameters job_params blob NOT NULL, - -- Random, non-unique, number used for concurrent job acquisition + -- Random, non-unique, number used for job acquisition + -- Either a simple timestamp or a totally random number (for lock concurrency) job_random integer unsigned NOT NULL default 0, -- Field that conveys process locks on rows via process UUIDs -- 2.20.1