From b567f3602e8bac048c9dce0f359ef0131d947c28 Mon Sep 17 00:00:00 2001 From: ASchulz Date: Tue, 28 Aug 2012 17:01:31 -0700 Subject: [PATCH] [JobQueue] Job queue refactoring and generalizing. * Added support for different queue types and methods for storing queues. * Treat each job type as being on its own queue, at least logically. * Added $wgJobTypeConf to configure queue types for each job type. * Improved the job DB table so that duplicate job checks actually work and are faster. Also improved the method for popping rows of the table. * Disabled duplicate job removal for everything except refreshLinks. The DELETE statements just add DB overhead and are not useful for cheap jobs, especially ones with start/end params (which are unlikely to have exact duplicates). Change-Id: I49824c7fa855fea4ddcac5c9901ece8c2c0101d0 --- includes/AutoLoader.php | 3 + includes/DefaultSettings.php | 8 + includes/Wiki.php | 49 ++- includes/installer/MysqlUpdater.php | 1 + includes/installer/SqliteUpdater.php | 1 + includes/job/Job.php | 338 +++--------------- includes/job/JobQueue.php | 185 ++++++++++ includes/job/JobQueueDB.php | 320 +++++++++++++++++ includes/job/JobQueueGroup.php | 156 ++++++++ includes/job/RefreshLinksJob.php | 2 +- maintenance/archives/patch-job_token.sql | 9 + maintenance/runJobs.php | 36 +- .../sqlite/archives/patch-job_token.sql | 8 + maintenance/tables.sql | 16 +- 14 files changed, 799 insertions(+), 333 deletions(-) create mode 100644 includes/job/JobQueue.php create mode 100644 includes/job/JobQueueDB.php create mode 100644 includes/job/JobQueueGroup.php create mode 100644 maintenance/archives/patch-job_token.sql create mode 100644 maintenance/sqlite/archives/patch-job_token.sql diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 3ae5a687fc..338c4242bf 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -655,6 +655,9 @@ $wgAutoloadLocalClasses = array( 'EmaillingJob' => 'includes/job/EmaillingJob.php', 'EnotifNotifyJob' => 'includes/job/EnotifNotifyJob.php', 'Job' => 'includes/job/Job.php', + 'JobQueue' => 'includes/job/JobQueue.php', + 'JobQueueDB' => 'includes/job/JobQueueDB.php', + 'JobQueueGroup' => 'includes/job/JobQueueGroup.php', 'RefreshLinksJob' => 'includes/job/RefreshLinksJob.php', 'RefreshLinksJob2' => 'includes/job/RefreshLinksJob.php', 'UploadFromUrlJob' => 'includes/job/UploadFromUrlJob.php', diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index 0f02efc319..6122af0d97 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -5403,6 +5403,14 @@ $wgJobClasses = array( */ $wgJobTypesExcludedFromDefaultQueue = array(); +/** + * Map of job types to configuration arrays. + * These settings should be global to all wikis. + */ +$wgJobTypeConf = array( + 'default' => array( 'class' => 'JobQueueDB' ), +); + /** * Additional functions to be performed with updateSpecialPages. * Expensive Querypages are already updated. diff --git a/includes/Wiki.php b/includes/Wiki.php index e6ccbe5a40..ed02a3dab7 100644 --- a/includes/Wiki.php +++ b/includes/Wiki.php @@ -596,28 +596,51 @@ class MediaWiki { if ( $wgJobRunRate <= 0 || wfReadOnly() ) { return; } + if ( $wgJobRunRate < 1 ) { $max = mt_getrandmax(); if ( mt_rand( 0, $max ) > $max * $wgJobRunRate ) { - return; + return; // the higher $wgJobRunRate, the less likely we return here } $n = 1; } else { $n = intval( $wgJobRunRate ); } - while ( $n-- && false != ( $job = Job::pop() ) ) { - $output = $job->toString() . "\n"; - $t = - microtime( true ); - $success = $job->run(); - $t += microtime( true ); - $t = round( $t * 1000 ); - if ( !$success ) { - $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n"; - } else { - $output .= "Success, Time: $t ms\n"; + $group = JobQueueGroup::singleton(); + $types = $group->getDefaultQueueTypes(); + shuffle( $types ); // avoid starvation + + // Scan the queues for a job N times... + do { + $jobFound = false; // found a job in any queue? + // Find a queue with a job on it and run it... + foreach ( $types as $i => $type ) { + $queue = $group->get( $type ); + if ( $queue->isEmpty() ) { + unset( $types[$i] ); // don't keep checking this queue + continue; + } + $job = $queue->pop(); + if ( $job ) { + $jobFound = true; + $output = $job->toString() . "\n"; + $t = - microtime( true ); + $success = $job->run(); + $queue->ack( $job ); // done + $t += microtime( true ); + $t = round( $t * 1000 ); + if ( !$success ) { + $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n"; + } else { + $output .= "Success, Time: $t ms\n"; + } + wfDebugLog( 'jobqueue', $output ); + break; + } else { + unset( $types[$i] ); // don't keep checking this queue + } } - wfDebugLog( 'jobqueue', $output ); - } + } while ( --$n && $jobFound ); } } diff --git a/includes/installer/MysqlUpdater.php b/includes/installer/MysqlUpdater.php index a6cb13f0ad..82de913618 100644 --- a/includes/installer/MysqlUpdater.php +++ b/includes/installer/MysqlUpdater.php @@ -225,6 +225,7 @@ class MysqlUpdater extends DatabaseUpdater { array( 'dropField', 'recentchanges', 'rc_moved_to_title', 'patch-rc_moved.sql' ), array( 'addTable', 'sites', 'patch-sites.sql' ), array( 'addField', 'filearchive', 'fa_sha1', 'patch-fa_sha1.sql' ), + array( 'addField', 'job', 'job_token', 'patch-job_token.sql' ), ); } diff --git a/includes/installer/SqliteUpdater.php b/includes/installer/SqliteUpdater.php index e7f39396c9..c3f7a81674 100644 --- a/includes/installer/SqliteUpdater.php +++ b/includes/installer/SqliteUpdater.php @@ -105,6 +105,7 @@ class SqliteUpdater extends DatabaseUpdater { array( 'dropField', 'recentchanges', 'rc_moved_to_title', 'patch-rc_moved.sql' ), array( 'addTable', 'sites', 'patch-sites.sql' ), array( 'addField', 'filearchive', 'fa_sha1', 'patch-fa_sha1.sql' ), + array( 'addField', 'job', 'job_token', 'patch-job_token.sql' ), ); } diff --git a/includes/job/Job.php b/includes/job/Job.php index 270671e768..d11446e02f 100644 --- a/includes/job/Job.php +++ b/includes/job/Job.php @@ -23,11 +23,11 @@ /** * Class to both describe a background job and handle jobs. + * This queue aspects of this class are now deprecated. * * @ingroup JobQueue */ abstract class Job { - /** * @var Title */ @@ -47,172 +47,12 @@ abstract class Job { * Run the job * @return boolean success */ - abstract function run(); + abstract public function run(); /*------------------------------------------------------------------------- * Static functions *------------------------------------------------------------------------*/ - /** - * Pop a job of a certain type. This tries less hard than pop() to - * actually find a job; it may be adversely affected by concurrent job - * runners. - * - * @param $type string - * - * @return Job - */ - static function pop_type( $type ) { - wfProfilein( __METHOD__ ); - - $dbw = wfGetDB( DB_MASTER ); - - $dbw->begin( __METHOD__ ); - - $row = $dbw->selectRow( - 'job', - '*', - array( 'job_cmd' => $type ), - __METHOD__, - array( 'LIMIT' => 1, 'FOR UPDATE' ) - ); - - if ( $row === false ) { - $dbw->commit( __METHOD__ ); - wfProfileOut( __METHOD__ ); - return false; - } - - /* Ensure we "own" this row */ - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - $dbw->commit( __METHOD__ ); - - if ( $affected == 0 ) { - wfProfileOut( __METHOD__ ); - return false; - } - - wfIncrStats( 'job-pop' ); - $namespace = $row->job_namespace; - $dbkey = $row->job_title; - $title = Title::makeTitleSafe( $namespace, $dbkey ); - $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), - $row->job_id ); - - $job->removeDuplicates(); - - wfProfileOut( __METHOD__ ); - return $job; - } - - /** - * Pop a job off the front of the queue - * - * @param $offset Integer: Number of jobs to skip - * @return Job or false if there's no jobs - */ - static function pop( $offset = 0 ) { - wfProfileIn( __METHOD__ ); - - $dbr = wfGetDB( DB_SLAVE ); - - /* Get a job from the slave, start with an offset, - scan full set afterwards, avoid hitting purged rows - - NB: If random fetch previously was used, offset - will always be ahead of few entries - */ - - $conditions = self::defaultQueueConditions(); - - $offset = intval( $offset ); - $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' ); - - $row = $dbr->selectRow( 'job', '*', - array_merge( $conditions, array( "job_id >= $offset" ) ), - __METHOD__, - $options - ); - - // Refetching without offset is needed as some of job IDs could have had delayed commits - // and have lower IDs than jobs already executed, blame concurrency :) - // - if ( $row === false ) { - if ( $offset != 0 ) { - $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options ); - } - - if ( $row === false ) { - wfProfileOut( __METHOD__ ); - return false; - } - } - - // Try to delete it from the master - $dbw = wfGetDB( DB_MASTER ); - $dbw->begin( __METHOD__ ); - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - $dbw->commit( __METHOD__ ); - - if ( !$affected ) { - $dbw->begin( __METHOD__ ); - - // Failed, someone else beat us to it - // Try getting a random row - $row = $dbw->selectRow( 'job', array( 'minjob' => 'MIN(job_id)', - 'maxjob' => 'MAX(job_id)' ), '1=1', __METHOD__ ); - if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) { - // No jobs to get - $dbw->rollback( __METHOD__ ); - wfProfileOut( __METHOD__ ); - return false; - } - // Get the random row - $row = $dbw->selectRow( 'job', '*', - 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ ); - if ( $row === false ) { - // Random job gone before we got the chance to select it - // Give up - $dbw->rollback( __METHOD__ ); - wfProfileOut( __METHOD__ ); - return false; - } - // Delete the random row - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - $dbw->commit( __METHOD__ ); - - if ( !$affected ) { - // Random job gone before we exclusively deleted it - // Give up - wfProfileOut( __METHOD__ ); - return false; - } - } - - // If execution got to here, there's a row in $row that has been deleted from the database - // by this thread. Hence the concurrent pop was successful. - wfIncrStats( 'job-pop' ); - $namespace = $row->job_namespace; - $dbkey = $row->job_title; - $title = Title::makeTitleSafe( $namespace, $dbkey ); - - if ( is_null( $title ) ) { - wfProfileOut( __METHOD__ ); - return false; - } - - $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id ); - - // Remove any duplicates it may have later in the queue - $job->removeDuplicates(); - - wfProfileOut( __METHOD__ ); - return $job; - } - /** * Create the appropriate object to handle a specific job * @@ -223,7 +63,7 @@ abstract class Job { * @throws MWException * @return Job */ - static function factory( $command, Title $title, $params = false, $id = 0 ) { + public static function factory( $command, Title $title, $params = false, $id = 0 ) { global $wgJobClasses; if( isset( $wgJobClasses[$command] ) ) { $class = $wgJobClasses[$command]; @@ -232,30 +72,6 @@ abstract class Job { throw new MWException( "Invalid job command `{$command}`" ); } - /** - * @param $params - * @return string - */ - static function makeBlob( $params ) { - if ( $params !== false ) { - return serialize( $params ); - } else { - return ''; - } - } - - /** - * @param $blob - * @return bool|mixed - */ - static function extractBlob( $blob ) { - if ( (string)$blob !== '' ) { - return unserialize( $blob ); - } else { - return false; - } - } - /** * Batch-insert a group of jobs into the queue. * This will be wrapped in a transaction with a forced commit. @@ -264,33 +80,10 @@ abstract class Job { * removed later on, when the first one is popped. * * @param $jobs array of Job objects + * @deprecated 1.20 */ - static function batchInsert( $jobs ) { - if ( !count( $jobs ) ) { - return; - } - $dbw = wfGetDB( DB_MASTER ); - $rows = array(); - - /** - * @var $job Job - */ - foreach ( $jobs as $job ) { - $rows[] = $job->insertFields(); - if ( count( $rows ) >= 50 ) { - # Do a small transaction to avoid slave lag - $dbw->begin( __METHOD__ ); - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $dbw->commit( __METHOD__ ); - $rows = array(); - } - } - if ( $rows ) { // last chunk - $dbw->begin( __METHOD__ ); - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $dbw->commit( __METHOD__ ); - } - wfIncrStats( 'job-insert', count( $jobs ) ); + public static function batchInsert( $jobs ) { + return JobQueueGroup::singleton()->push( $jobs ); } /** @@ -301,45 +94,10 @@ abstract class Job { * large batches of jobs can cause slave lag. * * @param $jobs array of Job objects + * @deprecated 1.20 */ - static function safeBatchInsert( $jobs ) { - if ( !count( $jobs ) ) { - return; - } - $dbw = wfGetDB( DB_MASTER ); - $rows = array(); - foreach ( $jobs as $job ) { - $rows[] = $job->insertFields(); - if ( count( $rows ) >= 500 ) { - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $rows = array(); - } - } - if ( $rows ) { // last chunk - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - } - wfIncrStats( 'job-insert', count( $jobs ) ); - } - - - /** - * SQL conditions to apply on most JobQueue queries - * - * Whenever we exclude jobs types from the default queue, we want to make - * sure that queries to the job queue actually ignore them. - * - * @return array SQL conditions suitable for Database:: methods - */ - static function defaultQueueConditions( ) { - global $wgJobTypesExcludedFromDefaultQueue; - $conditions = array(); - if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) { - $dbr = wfGetDB( DB_SLAVE ); - foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) { - $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType ); - } - } - return $conditions; + public static function safeBatchInsert( $jobs ) { + return JobQueueGroup::singleton()->push( $jobs, JobQueue::QoS_Atomic ); } /*------------------------------------------------------------------------- @@ -352,77 +110,63 @@ abstract class Job { * @param $params array|bool * @param $id int */ - function __construct( $command, $title, $params = false, $id = 0 ) { + public function __construct( $command, $title, $params = false, $id = 0 ) { $this->command = $command; $this->title = $title; $this->params = $params; $this->id = $id; - // A bit of premature generalisation - // Oh well, the whole class is premature generalisation really - $this->removeDuplicates = true; + $this->removeDuplicates = false; // expensive jobs may set this to true } /** - * Insert a single job into the queue. - * @return bool true on success + * @return integer May be 0 for jobs stored outside the DB */ - function insert() { - $fields = $this->insertFields(); + public function getId() { + return $this->id; + } - $dbw = wfGetDB( DB_MASTER ); + /** + * @return string + */ + public function getType() { + return $this->command; + } - if ( $this->removeDuplicates ) { - $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ ); - if ( $dbw->numRows( $res ) ) { - return true; - } - } - wfIncrStats( 'job-insert' ); - return $dbw->insert( 'job', $fields, __METHOD__ ); + /** + * @return Title + */ + public function getTitle() { + return $this->title; } /** * @return array */ - protected function insertFields() { - $dbw = wfGetDB( DB_MASTER ); - return array( - 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), - 'job_cmd' => $this->command, - 'job_namespace' => $this->title->getNamespace(), - 'job_title' => $this->title->getDBkey(), - 'job_timestamp' => $dbw->timestamp(), - 'job_params' => Job::makeBlob( $this->params ) - ); + public function getParams() { + return $this->params; } /** - * Remove jobs in the job queue which are duplicates of this job. - * This is deadlock-prone and so starts its own transaction. + * @return bool */ - function removeDuplicates() { - if ( !$this->removeDuplicates ) { - return; - } + public function ignoreDuplicates() { + return $this->removeDuplicates; + } - $fields = $this->insertFields(); - unset( $fields['job_id'] ); - unset( $fields['job_timestamp'] ); - $dbw = wfGetDB( DB_MASTER ); - $dbw->begin( __METHOD__ ); - $dbw->delete( 'job', $fields, __METHOD__ ); - $affected = $dbw->affectedRows(); - $dbw->commit( __METHOD__ ); - if ( $affected ) { - wfIncrStats( 'job-dup-delete', $affected ); - } + /** + * Insert a single job into the queue. + * @return bool true on success + * @deprecated 1.20 + */ + public function insert() { + return JobQueueGroup::singleton()->push( $this ); } /** * @return string */ - function toString() { + public function toString() { $paramString = ''; if ( $this->params ) { foreach ( $this->params as $key => $value ) { @@ -448,7 +192,7 @@ abstract class Job { $this->error = $error; } - function getLastError() { + public function getLastError() { return $this->error; } } diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php new file mode 100644 index 0000000000..6409cffc41 --- /dev/null +++ b/includes/job/JobQueue.php @@ -0,0 +1,185 @@ +wiki = $params['wiki']; + $this->type = $params['type']; + } + + /** + * Get a job queue object of the specified type. + * $params includes: + * class : what job class to use (determines job type) + * wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * type : The name of the job types this queue handles + * + * @param $params array + * @return JobQueue + * @throws MWException + */ + final public static function factory( array $params ) { + $class = $params['class']; + if ( !MWInit::classExists( $class ) ) { + throw new MWException( "Invalid job queue class '$class'." ); + } + $obj = new $class( $params ); + if ( !( $obj instanceof self ) ) { + throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); + } + return $obj; + } + + /** + * @return string Wiki ID + */ + final public function getWiki() { + return $this->wiki; + } + + /** + * @return string Job type that this queue handles + */ + final public function getType() { + return $this->type; + } + + /** + * @return bool Quickly check if the queue is empty + */ + final public function isEmpty() { + wfProfileIn( __METHOD__ ); + $res = $this->doIsEmpty(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::isEmpty() + * @return bool + */ + abstract protected function doIsEmpty(); + + /** + * Push a batch of jobs into the queue + * + * @param $jobs array List of Jobs + * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @return bool + */ + final public function batchPush( array $jobs, $flags = 0 ) { + foreach ( $jobs as $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + } + wfProfileIn( __METHOD__ ); + $ok = $this->doBatchPush( $jobs, $flags ); + if ( $ok ) { + wfIncrStats( 'job-insert', count( $jobs ) ); + } + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueue::batchPush() + * @return bool + */ + abstract protected function doBatchPush( array $jobs, $flags ); + + /** + * Pop a job off of the queue + * + * @return Job|bool Returns false on failure + */ + final public function pop() { + wfProfileIn( __METHOD__ ); + $job = $this->doPop(); + if ( $job ) { + wfIncrStats( 'job-pop' ); + } + wfProfileOut( __METHOD__ ); + return $job; + } + + /** + * @see JobQueue::pop() + * @return Job + */ + abstract protected function doPop(); + + /** + * Acknowledge that a job was completed + * + * @param $job Job + * @return bool + */ + final public function ack( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $ok = $this->doAck( $job ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueue::ack() + * @return bool + */ + abstract protected function doAck( Job $job ); + + /** + * Wait for any slaves or backup servers to catch up + * + * @return void + */ + final public function waitForBackups() { + wfProfileIn( __METHOD__ ); + $this->doWaitForBackups(); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::waitForBackups() + * @return void + */ + protected function doWaitForBackups() {} +} diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php new file mode 100644 index 0000000000..3d584ef727 --- /dev/null +++ b/includes/job/JobQueueDB.php @@ -0,0 +1,320 @@ +getEmptinessCacheKey(); + + $isEmpty = $wgMemc->get( $key ); + if ( $isEmpty === 'true' ) { + return true; + } elseif ( $isEmpty === 'false' ) { + return false; + } + + $found = $this->getSlaveDB()->selectField( + 'job', '1', array( 'job_cmd' => $this->type ), __METHOD__ + ); + + $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL ); + } + + /** + * @see JobQueue::doBatchPush() + * @return bool + */ + protected function doBatchPush( array $jobs, $flags ) { + if ( count( $jobs ) ) { + $dbw = $this->getMasterDB(); + + $rows = array(); + foreach ( $jobs as $job ) { + $rows[] = $this->insertFields( $job ); + } + $atomic = ( $flags & self::QoS_Atomic ); + $key = $this->getEmptinessCacheKey(); + $ttl = self::CACHE_TTL; + + $dbw->onTransactionIdle( function() use ( $dbw, $rows, $atomic, $key, $ttl ) { + global $wgMemc; + + $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled? + if ( $atomic ) { + $dbw->begin(); // wrap all the job additions in one transaction + } else { + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + } + try { + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { // avoid slave lag + $dbw->insert( 'job', $rowBatch, __METHOD__ ); + } + } catch ( DBError $e ) { + if ( $atomic ) { + $dbw->rollback(); + } else { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin() + } + throw $e; + } + if ( $atomic ) { + $dbw->commit(); + } else { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin() + } + + $wgMemc->set( $key, 'false', $ttl ); + } ); + } + + return true; + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + */ + protected function doPop() { + global $wgMemc; + + $uuid = wfRandomString( 32 ); // pop attempt + + $dbw = $this->getMasterDB(); + if ( $dbw->trxLevel() ) { + wfWarn( "Attempted to pop a job in a transaction; committing first." ); + $dbw->commit(); // push existing transaction + } + + $job = false; // job popped off + $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled? + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + try { + do { // retry when our row is invalid or deleted as a duplicate + $row = false; // row claimed + $rand = mt_rand( 0, 2147483648 ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + // Try to reserve a DB row... + if ( $this->claim( $uuid, $rand, $gte ) || $this->claim( $uuid, $rand, !$gte ) ) { + // Fetch any row that we just reserved... + $row = $dbw->selectRow( 'job', '*', + array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ ); + // Check if another process deleted it as a duplicate + if ( !$row ) { + wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." ); + continue; // try again + } + // Get the job object from the row... + $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); + if ( !$title ) { + $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); + wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." ); + continue; // try again + } + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + // Delete any *other* duplicate jobs in the queue... + if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) { + $dbw->delete( 'job', + array( 'job_sha1' => $row->job_sha1, + "job_id != {$dbw->addQuotes( $row->job_id )}" ), + __METHOD__ + ); + } + } else { + $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL ); + } + break; // done + } while( true ); + } catch ( DBError $e ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin() + throw $e; + } + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin() + + return $job; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * @param $uuid string 32 char hex string + * @param $rand integer Random unsigned integer (31 bits) + * @param $gte bool Search for job_random >= $random (otherwise job_random <= $random) + * @return integer Number of affected rows + */ + protected function claim( $uuid, $rand, $gte ) { + $dbw = $this->getMasterDB(); + $dir = $gte ? 'ASC' : 'DESC'; + $ineq = $gte ? '>=' : '<='; + if ( $dbw->getType() === 'mysql' ) { + // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the + // same table being changed in an UPDATE query in MySQL (gives Error: 1093). + // Oracle and Postgre have no such limitation. However, MySQL offers an + // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. + // The DB wrapper functions do not support this, so it's done manually. + $dbw->query( "UPDATE {$dbw->tableName( 'job' )} + SET + job_token = {$dbw->addQuotes( $uuid ) }, + job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )} + WHERE ( + job_cmd = {$dbw->addQuotes( $this->type )} + AND job_token = {$dbw->addQuotes( '' )} + AND job_random {$ineq} {$dbw->addQuotes( $rand )} + ) ORDER BY job_random {$dir} LIMIT 1", + __METHOD__ + ); + } else { + // Use a subquery to find the job, within an UPDATE to claim it. + // This uses as much of the DB wrapper functions as possible. + $dbw->update( 'job', + array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ), + array( 'job_id = (' . + $dbw->selectSQLText( 'job', 'job_id', + array( + 'job_cmd' => $this->type, + 'job_token' => '', + "job_random {$ineq} {$dbw->addQuotes( $rand )}" ), + __METHOD__, + array( 'ORDER BY' => "job_random {$dir}", 'LIMIT' => 1 ) ) . + ')' + ), + __METHOD__ + ); + } + return $dbw->affectedRows(); + } + + /** + * @see JobQueue::doAck() + * @return Job|bool + */ + protected function doAck( Job $job ) { + $dbw = $this->getMasterDB(); + if ( $dbw->trxLevel() ) { + wfWarn( "Attempted to ack a job in a transaction; committing first." ); + $dbw->commit(); // push existing transaction + } + + $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled? + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + try { + // Delete a row with a single DELETE without holding row locks over RTTs... + $dbw->delete( 'job', array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ) ); + } catch ( Exception $e ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin() + throw $e; + } + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin() + + return true; + } + + /** + * @see JobQueue::doWaitForBackups() + * @return void + */ + protected function doWaitForBackups() { + wfWaitForSlaves(); + } + + /** + * @return DatabaseBase + */ + protected function getSlaveDB() { + return wfGetDB( DB_SLAVE, array(), $this->wiki ); + } + + /** + * @return DatabaseBase + */ + protected function getMasterDB() { + return wfGetDB( DB_MASTER, array(), $this->wiki ); + } + + /** + * @param $job Job + * @return array + */ + protected function insertFields( Job $job ) { + // Rows that describe the nature of the job + $descFields = array( + 'job_cmd' => $job->getType(), + 'job_namespace' => $job->getTitle()->getNamespace(), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), + ); + // Additional job metadata + $dbw = $this->getMasterDB(); + $metaFields = array( + 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), + 'job_timestamp' => $dbw->timestamp(), + 'job_sha1' => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ), + 'job_random' => mt_rand( 0, 2147483647 ) // [0, 2^31 - 1] + ); + return ( $descFields + $metaFields ); + } + + /** + * @return string + */ + private function getEmptinessCacheKey() { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'isempty' ); + } + + /** + * @param $params + * @return string + */ + protected static function makeBlob( $params ) { + if ( $params !== false ) { + return serialize( $params ); + } else { + return ''; + } + } + + /** + * @param $blob + * @return bool|mixed + */ + protected static function extractBlob( $blob ) { + if ( (string)$blob !== '' ) { + return unserialize( $blob ); + } else { + return false; + } + } +} diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php new file mode 100644 index 0000000000..7d01a29c53 --- /dev/null +++ b/includes/job/JobQueueGroup.php @@ -0,0 +1,156 @@ +wiki = $wiki; + } + + /** + * @param $wiki string Wiki ID + * @return JobQueueGroup + */ + public static function singleton( $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( !isset( self::$instances[$wiki] ) ) { + self::$instances[$wiki] = new self( $wiki ); + } + return self::$instances[$wiki]; + } + + /** + * @param $type string + * @return JobQueue Job queue object for a given queue type + */ + public function get( $type ) { + global $wgJobTypeConf; + + $conf = false; + if ( isset( $wgJobTypeConf[$type] ) ) { + $conf = $wgJobTypeConf[$type]; + } else { + $conf = $wgJobTypeConf['default']; + } + + return JobQueue::factory( array( + 'class' => $conf['class'], + 'wiki' => $this->wiki, + 'type' => $type, + ) ); + } + + /** + * Insert jobs into the respective queues of with the belong + * + * @param $jobs Job|array A single Job or a list of Jobs + * @return bool + */ + public function push( $jobs ) { + $jobs = (array)$jobs; + + $jobsByType = array(); // (job type => list of jobs) + foreach ( $jobs as $job ) { + $jobsByType[$job->getType()][] = $job; + } + + $ok = true; + foreach ( $jobsByType as $type => $jobs ) { + if ( !$this->get( $type )->batchPush( $jobs ) ) { + $ok = false; + } + } + + return $ok; + } + + /** + * Pop a job off one of the job queues + * + * @param $type integer JobQueueGroup::TYPE_* constant + * @return Job|bool Returns false on failure + */ + public function pop( $type = self::TYPE_DEFAULT ) { + $types = ( $type == self::TYPE_DEFAULT ) + ? $this->getDefaultQueueTypes() + : $this->getQueueTypes(); + shuffle( $types ); // avoid starvation + + foreach ( $types as $type ) { // for each queue... + $job = $this->get( $type )->pop(); + if ( $job ) { + return $job; // found + } + } + + return false; // no jobs found + } + + /** + * Acknowledge that a job was completed + * + * @param $job Job + * @return bool + */ + public function ack( Job $job ) { + return $this->get( $job->getType() )->ack( $job ); + } + + /** + * Get the list of queue types + * + * @return array List of strings + */ + public function getQueueTypes() { + global $wgJobClasses; + + return array_keys( $wgJobClasses ); + } + + /** + * Get the list of default queue types + * + * @return array List of strings + */ + public function getDefaultQueueTypes() { + global $wgJobTypesExcludedFromDefaultQueue; + + return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); + } +} diff --git a/includes/job/RefreshLinksJob.php b/includes/job/RefreshLinksJob.php index c4370f4511..a29f29fe64 100644 --- a/includes/job/RefreshLinksJob.php +++ b/includes/job/RefreshLinksJob.php @@ -27,9 +27,9 @@ * @ingroup JobQueue */ class RefreshLinksJob extends Job { - function __construct( $title, $params = '', $id = 0 ) { parent::__construct( 'refreshLinks', $title, $params, $id ); + $this->removeDuplicates = true; // job is expensive } /** diff --git a/maintenance/archives/patch-job_token.sql b/maintenance/archives/patch-job_token.sql new file mode 100644 index 0000000000..080fa97cd5 --- /dev/null +++ b/maintenance/archives/patch-job_token.sql @@ -0,0 +1,9 @@ +ALTER TABLE /*_*/job + ADD COLUMN job_random integer unsigned NOT NULL default 0, + ADD COLUMN job_token varbinary(32) NOT NULL default '', + ADD COLUMN job_token_timestamp varbinary(14) NULL default NULL, + ADD COLUMN job_sha1 varbinary(32) NOT NULL default ''; + +CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1); +CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random); + diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php index e909bc06ae..80a227810e 100644 --- a/maintenance/runJobs.php +++ b/maintenance/runJobs.php @@ -52,6 +52,7 @@ class RunJobs extends Maintenance { public function execute() { global $wgTitle; + if ( $this->hasOption( 'procs' ) ) { $procs = intval( $this->getOption( 'procs' ) ); if ( $procs < 1 || $procs > 1000 ) { @@ -70,26 +71,17 @@ class RunJobs extends Maintenance { $dbw = wfGetDB( DB_MASTER ); $n = 0; - if ( $type === false ) { - $conds = Job::defaultQueueConditions( ); - } else { - $conds = array( 'job_cmd' => $type ); - } - - while ( $dbw->selectField( 'job', 'job_id', $conds, 'runJobs.php' ) ) { - $offset = 0; - for ( ; ; ) { - $job = !$type ? Job::pop( $offset ) : Job::pop_type( $type ); - - if ( !$job ) { - break; - } - - wfWaitForSlaves(); + $group = JobQueueGroup::singleton(); + do { + $job = ( $type === false ) + ? $group->pop() // job from any queue + : $group->get( $type )->pop(); // job from a single queue + if ( $job ) { // found a job + // Perform the job (logging success/failure and runtime)... $t = microtime( true ); - $offset = $job->id; $this->runJobsLog( $job->toString() . " STARTING" ); $status = $job->run(); + $group->ack( $job ); // done $t = microtime( true ) - $t; $timeMs = intval( $t * 1000 ); if ( !$status ) { @@ -97,15 +89,17 @@ class RunJobs extends Maintenance { } else { $this->runJobsLog( $job->toString() . " t=$timeMs good" ); } - - if ( $maxJobs && ++$n > $maxJobs ) { + // Break out if we hit the job count or wall time limits... + if ( $maxJobs && ++$n >= $maxJobs ) { break 2; } - if ( $maxTime && time() - $startTime > $maxTime ) { + if ( $maxTime && ( time() - $startTime ) > $maxTime ) { break 2; } + // Don't let any slaves/backups fall behind... + $group->get( $type )->waitForBackups(); } - } + } while ( $job ); // stop when there are no jobs } /** diff --git a/maintenance/sqlite/archives/patch-job_token.sql b/maintenance/sqlite/archives/patch-job_token.sql new file mode 100644 index 0000000000..4e4d28fde8 --- /dev/null +++ b/maintenance/sqlite/archives/patch-job_token.sql @@ -0,0 +1,8 @@ +ALTER TABLE /*_*/job ADD COLUMN job_random integer unsigned NOT NULL default 0; +ALTER TABLE /*_*/job ADD COLUMN job_token varbinary(32) NOT NULL default ''; +ALTER TABLE /*_*/job ADD COLUMN job_sha1 varbinary(32) NOT NULL default ''; +ALTER TABLE /*_*/job ADD COLUMN job_token_timestamp varbinary(14) NULL default NULL; + +CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1); +CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random); + diff --git a/maintenance/tables.sql b/maintenance/tables.sql index a06c21ceb9..197151b349 100644 --- a/maintenance/tables.sql +++ b/maintenance/tables.sql @@ -1289,9 +1289,23 @@ CREATE TABLE /*_*/job ( -- Any other parameters to the command -- Stored as a PHP serialized array, or an empty string if there are no parameters - job_params blob NOT NULL + job_params blob NOT NULL, + + -- Random, non-unique, number used for concurrent job acquisition + job_random integer unsigned NOT NULL default 0, + + -- Field that conveys process locks on rows via process UUIDs + job_token varbinary(32) NOT NULL default '', + + -- Timestamp when the job was locked + job_token_timestamp varbinary(14) NULL default NULL, + + -- Base 36 SHA1 of the job parameters relevant to detecting duplicates + job_sha1 varbinary(32) NOT NULL default '' ) /*$wgDBTableOptions*/; +CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1); +CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random); CREATE INDEX /*i*/job_cmd ON /*_*/job (job_cmd, job_namespace, job_title, job_params(128)); CREATE INDEX /*i*/job_timestamp ON /*_*/job (job_timestamp); -- 2.20.1