* Added support for different queue types and methods for storing queues.
* Treat each job type as being on its own queue, at least logically.
* Added $wgJobTypeConf to configure queue types for each job type.
* Improved the job DB table so that duplicate job checks actually work
and are faster. Also improved the method for popping rows of the table.
* Disabled duplicate job removal for everything except refreshLinks.
The DELETE statements just add DB overhead and are not useful for cheap
jobs, especially ones with start/end params (which are unlikely to have
exact duplicates).
Change-Id: I49824c7fa855fea4ddcac5c9901ece8c2c0101d0
'EmaillingJob' => 'includes/job/EmaillingJob.php',
'EnotifNotifyJob' => 'includes/job/EnotifNotifyJob.php',
'Job' => 'includes/job/Job.php',
+ 'JobQueue' => 'includes/job/JobQueue.php',
+ 'JobQueueDB' => 'includes/job/JobQueueDB.php',
+ 'JobQueueGroup' => 'includes/job/JobQueueGroup.php',
'RefreshLinksJob' => 'includes/job/RefreshLinksJob.php',
'RefreshLinksJob2' => 'includes/job/RefreshLinksJob.php',
'UploadFromUrlJob' => 'includes/job/UploadFromUrlJob.php',
*/
$wgJobTypesExcludedFromDefaultQueue = array();
+/**
+ * Map of job types to configuration arrays.
+ * These settings should be global to all wikis.
+ */
+$wgJobTypeConf = array(
+ 'default' => array( 'class' => 'JobQueueDB' ),
+);
+
/**
* Additional functions to be performed with updateSpecialPages.
* Expensive Querypages are already updated.
if ( $wgJobRunRate <= 0 || wfReadOnly() ) {
return;
}
+
if ( $wgJobRunRate < 1 ) {
$max = mt_getrandmax();
if ( mt_rand( 0, $max ) > $max * $wgJobRunRate ) {
- return;
+ return; // the higher $wgJobRunRate, the less likely we return here
}
$n = 1;
} else {
$n = intval( $wgJobRunRate );
}
- while ( $n-- && false != ( $job = Job::pop() ) ) {
- $output = $job->toString() . "\n";
- $t = - microtime( true );
- $success = $job->run();
- $t += microtime( true );
- $t = round( $t * 1000 );
- if ( !$success ) {
- $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
- } else {
- $output .= "Success, Time: $t ms\n";
+ $group = JobQueueGroup::singleton();
+ $types = $group->getDefaultQueueTypes();
+ shuffle( $types ); // avoid starvation
+
+ // Scan the queues for a job N times...
+ do {
+ $jobFound = false; // found a job in any queue?
+ // Find a queue with a job on it and run it...
+ foreach ( $types as $i => $type ) {
+ $queue = $group->get( $type );
+ if ( $queue->isEmpty() ) {
+ unset( $types[$i] ); // don't keep checking this queue
+ continue;
+ }
+ $job = $queue->pop();
+ if ( $job ) {
+ $jobFound = true;
+ $output = $job->toString() . "\n";
+ $t = - microtime( true );
+ $success = $job->run();
+ $queue->ack( $job ); // done
+ $t += microtime( true );
+ $t = round( $t * 1000 );
+ if ( !$success ) {
+ $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
+ } else {
+ $output .= "Success, Time: $t ms\n";
+ }
+ wfDebugLog( 'jobqueue', $output );
+ break;
+ } else {
+ unset( $types[$i] ); // don't keep checking this queue
+ }
}
- wfDebugLog( 'jobqueue', $output );
- }
+ } while ( --$n && $jobFound );
}
}
array( 'dropField', 'recentchanges', 'rc_moved_to_title', 'patch-rc_moved.sql' ),
array( 'addTable', 'sites', 'patch-sites.sql' ),
array( 'addField', 'filearchive', 'fa_sha1', 'patch-fa_sha1.sql' ),
+ array( 'addField', 'job', 'job_token', 'patch-job_token.sql' ),
);
}
array( 'dropField', 'recentchanges', 'rc_moved_to_title', 'patch-rc_moved.sql' ),
array( 'addTable', 'sites', 'patch-sites.sql' ),
array( 'addField', 'filearchive', 'fa_sha1', 'patch-fa_sha1.sql' ),
+ array( 'addField', 'job', 'job_token', 'patch-job_token.sql' ),
);
}
/**
* Class to both describe a background job and handle jobs.
+ * This queue aspects of this class are now deprecated.
*
* @ingroup JobQueue
*/
abstract class Job {
-
/**
* @var Title
*/
* Run the job
* @return boolean success
*/
- abstract function run();
+ abstract public function run();
/*-------------------------------------------------------------------------
* Static functions
*------------------------------------------------------------------------*/
- /**
- * Pop a job of a certain type. This tries less hard than pop() to
- * actually find a job; it may be adversely affected by concurrent job
- * runners.
- *
- * @param $type string
- *
- * @return Job
- */
- static function pop_type( $type ) {
- wfProfilein( __METHOD__ );
-
- $dbw = wfGetDB( DB_MASTER );
-
- $dbw->begin( __METHOD__ );
-
- $row = $dbw->selectRow(
- 'job',
- '*',
- array( 'job_cmd' => $type ),
- __METHOD__,
- array( 'LIMIT' => 1, 'FOR UPDATE' )
- );
-
- if ( $row === false ) {
- $dbw->commit( __METHOD__ );
- wfProfileOut( __METHOD__ );
- return false;
- }
-
- /* Ensure we "own" this row */
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- $affected = $dbw->affectedRows();
- $dbw->commit( __METHOD__ );
-
- if ( $affected == 0 ) {
- wfProfileOut( __METHOD__ );
- return false;
- }
-
- wfIncrStats( 'job-pop' );
- $namespace = $row->job_namespace;
- $dbkey = $row->job_title;
- $title = Title::makeTitleSafe( $namespace, $dbkey );
- $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ),
- $row->job_id );
-
- $job->removeDuplicates();
-
- wfProfileOut( __METHOD__ );
- return $job;
- }
-
- /**
- * Pop a job off the front of the queue
- *
- * @param $offset Integer: Number of jobs to skip
- * @return Job or false if there's no jobs
- */
- static function pop( $offset = 0 ) {
- wfProfileIn( __METHOD__ );
-
- $dbr = wfGetDB( DB_SLAVE );
-
- /* Get a job from the slave, start with an offset,
- scan full set afterwards, avoid hitting purged rows
-
- NB: If random fetch previously was used, offset
- will always be ahead of few entries
- */
-
- $conditions = self::defaultQueueConditions();
-
- $offset = intval( $offset );
- $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' );
-
- $row = $dbr->selectRow( 'job', '*',
- array_merge( $conditions, array( "job_id >= $offset" ) ),
- __METHOD__,
- $options
- );
-
- // Refetching without offset is needed as some of job IDs could have had delayed commits
- // and have lower IDs than jobs already executed, blame concurrency :)
- //
- if ( $row === false ) {
- if ( $offset != 0 ) {
- $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options );
- }
-
- if ( $row === false ) {
- wfProfileOut( __METHOD__ );
- return false;
- }
- }
-
- // Try to delete it from the master
- $dbw = wfGetDB( DB_MASTER );
- $dbw->begin( __METHOD__ );
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- $affected = $dbw->affectedRows();
- $dbw->commit( __METHOD__ );
-
- if ( !$affected ) {
- $dbw->begin( __METHOD__ );
-
- // Failed, someone else beat us to it
- // Try getting a random row
- $row = $dbw->selectRow( 'job', array( 'minjob' => 'MIN(job_id)',
- 'maxjob' => 'MAX(job_id)' ), '1=1', __METHOD__ );
- if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
- // No jobs to get
- $dbw->rollback( __METHOD__ );
- wfProfileOut( __METHOD__ );
- return false;
- }
- // Get the random row
- $row = $dbw->selectRow( 'job', '*',
- 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
- if ( $row === false ) {
- // Random job gone before we got the chance to select it
- // Give up
- $dbw->rollback( __METHOD__ );
- wfProfileOut( __METHOD__ );
- return false;
- }
- // Delete the random row
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- $affected = $dbw->affectedRows();
- $dbw->commit( __METHOD__ );
-
- if ( !$affected ) {
- // Random job gone before we exclusively deleted it
- // Give up
- wfProfileOut( __METHOD__ );
- return false;
- }
- }
-
- // If execution got to here, there's a row in $row that has been deleted from the database
- // by this thread. Hence the concurrent pop was successful.
- wfIncrStats( 'job-pop' );
- $namespace = $row->job_namespace;
- $dbkey = $row->job_title;
- $title = Title::makeTitleSafe( $namespace, $dbkey );
-
- if ( is_null( $title ) ) {
- wfProfileOut( __METHOD__ );
- return false;
- }
-
- $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
-
- // Remove any duplicates it may have later in the queue
- $job->removeDuplicates();
-
- wfProfileOut( __METHOD__ );
- return $job;
- }
-
/**
* Create the appropriate object to handle a specific job
*
* @throws MWException
* @return Job
*/
- static function factory( $command, Title $title, $params = false, $id = 0 ) {
+ public static function factory( $command, Title $title, $params = false, $id = 0 ) {
global $wgJobClasses;
if( isset( $wgJobClasses[$command] ) ) {
$class = $wgJobClasses[$command];
throw new MWException( "Invalid job command `{$command}`" );
}
- /**
- * @param $params
- * @return string
- */
- static function makeBlob( $params ) {
- if ( $params !== false ) {
- return serialize( $params );
- } else {
- return '';
- }
- }
-
- /**
- * @param $blob
- * @return bool|mixed
- */
- static function extractBlob( $blob ) {
- if ( (string)$blob !== '' ) {
- return unserialize( $blob );
- } else {
- return false;
- }
- }
-
/**
* Batch-insert a group of jobs into the queue.
* This will be wrapped in a transaction with a forced commit.
* removed later on, when the first one is popped.
*
* @param $jobs array of Job objects
+ * @deprecated 1.20
*/
- static function batchInsert( $jobs ) {
- if ( !count( $jobs ) ) {
- return;
- }
- $dbw = wfGetDB( DB_MASTER );
- $rows = array();
-
- /**
- * @var $job Job
- */
- foreach ( $jobs as $job ) {
- $rows[] = $job->insertFields();
- if ( count( $rows ) >= 50 ) {
- # Do a small transaction to avoid slave lag
- $dbw->begin( __METHOD__ );
- $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
- $dbw->commit( __METHOD__ );
- $rows = array();
- }
- }
- if ( $rows ) { // last chunk
- $dbw->begin( __METHOD__ );
- $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
- $dbw->commit( __METHOD__ );
- }
- wfIncrStats( 'job-insert', count( $jobs ) );
+ public static function batchInsert( $jobs ) {
+ return JobQueueGroup::singleton()->push( $jobs );
}
/**
* large batches of jobs can cause slave lag.
*
* @param $jobs array of Job objects
+ * @deprecated 1.20
*/
- static function safeBatchInsert( $jobs ) {
- if ( !count( $jobs ) ) {
- return;
- }
- $dbw = wfGetDB( DB_MASTER );
- $rows = array();
- foreach ( $jobs as $job ) {
- $rows[] = $job->insertFields();
- if ( count( $rows ) >= 500 ) {
- $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
- $rows = array();
- }
- }
- if ( $rows ) { // last chunk
- $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
- }
- wfIncrStats( 'job-insert', count( $jobs ) );
- }
-
-
- /**
- * SQL conditions to apply on most JobQueue queries
- *
- * Whenever we exclude jobs types from the default queue, we want to make
- * sure that queries to the job queue actually ignore them.
- *
- * @return array SQL conditions suitable for Database:: methods
- */
- static function defaultQueueConditions( ) {
- global $wgJobTypesExcludedFromDefaultQueue;
- $conditions = array();
- if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) {
- $dbr = wfGetDB( DB_SLAVE );
- foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) {
- $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType );
- }
- }
- return $conditions;
+ public static function safeBatchInsert( $jobs ) {
+ return JobQueueGroup::singleton()->push( $jobs, JobQueue::QoS_Atomic );
}
/*-------------------------------------------------------------------------
* @param $params array|bool
* @param $id int
*/
- function __construct( $command, $title, $params = false, $id = 0 ) {
+ public function __construct( $command, $title, $params = false, $id = 0 ) {
$this->command = $command;
$this->title = $title;
$this->params = $params;
$this->id = $id;
- // A bit of premature generalisation
- // Oh well, the whole class is premature generalisation really
- $this->removeDuplicates = true;
+ $this->removeDuplicates = false; // expensive jobs may set this to true
}
/**
- * Insert a single job into the queue.
- * @return bool true on success
+ * @return integer May be 0 for jobs stored outside the DB
*/
- function insert() {
- $fields = $this->insertFields();
+ public function getId() {
+ return $this->id;
+ }
- $dbw = wfGetDB( DB_MASTER );
+ /**
+ * @return string
+ */
+ public function getType() {
+ return $this->command;
+ }
- if ( $this->removeDuplicates ) {
- $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
- if ( $dbw->numRows( $res ) ) {
- return true;
- }
- }
- wfIncrStats( 'job-insert' );
- return $dbw->insert( 'job', $fields, __METHOD__ );
+ /**
+ * @return Title
+ */
+ public function getTitle() {
+ return $this->title;
}
/**
* @return array
*/
- protected function insertFields() {
- $dbw = wfGetDB( DB_MASTER );
- return array(
- 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
- 'job_cmd' => $this->command,
- 'job_namespace' => $this->title->getNamespace(),
- 'job_title' => $this->title->getDBkey(),
- 'job_timestamp' => $dbw->timestamp(),
- 'job_params' => Job::makeBlob( $this->params )
- );
+ public function getParams() {
+ return $this->params;
}
/**
- * Remove jobs in the job queue which are duplicates of this job.
- * This is deadlock-prone and so starts its own transaction.
+ * @return bool
*/
- function removeDuplicates() {
- if ( !$this->removeDuplicates ) {
- return;
- }
+ public function ignoreDuplicates() {
+ return $this->removeDuplicates;
+ }
- $fields = $this->insertFields();
- unset( $fields['job_id'] );
- unset( $fields['job_timestamp'] );
- $dbw = wfGetDB( DB_MASTER );
- $dbw->begin( __METHOD__ );
- $dbw->delete( 'job', $fields, __METHOD__ );
- $affected = $dbw->affectedRows();
- $dbw->commit( __METHOD__ );
- if ( $affected ) {
- wfIncrStats( 'job-dup-delete', $affected );
- }
+ /**
+ * Insert a single job into the queue.
+ * @return bool true on success
+ * @deprecated 1.20
+ */
+ public function insert() {
+ return JobQueueGroup::singleton()->push( $this );
}
/**
* @return string
*/
- function toString() {
+ public function toString() {
$paramString = '';
if ( $this->params ) {
foreach ( $this->params as $key => $value ) {
$this->error = $error;
}
- function getLastError() {
+ public function getLastError() {
return $this->error;
}
}
--- /dev/null
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @defgroup JobQueue JobQueue
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing and running of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.20
+ */
+abstract class JobQueue {
+ protected $wiki; // string; wiki ID
+ protected $type; // string; job type
+
+ const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
+
+ /**
+ * @param $params array
+ */
+ protected function __construct( array $params ) {
+ $this->wiki = $params['wiki'];
+ $this->type = $params['type'];
+ }
+
+ /**
+ * Get a job queue object of the specified type.
+ * $params includes:
+ * class : what job class to use (determines job type)
+ * wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
+ * type : The name of the job types this queue handles
+ *
+ * @param $params array
+ * @return JobQueue
+ * @throws MWException
+ */
+ final public static function factory( array $params ) {
+ $class = $params['class'];
+ if ( !MWInit::classExists( $class ) ) {
+ throw new MWException( "Invalid job queue class '$class'." );
+ }
+ $obj = new $class( $params );
+ if ( !( $obj instanceof self ) ) {
+ throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
+ }
+ return $obj;
+ }
+
+ /**
+ * @return string Wiki ID
+ */
+ final public function getWiki() {
+ return $this->wiki;
+ }
+
+ /**
+ * @return string Job type that this queue handles
+ */
+ final public function getType() {
+ return $this->type;
+ }
+
+ /**
+ * @return bool Quickly check if the queue is empty
+ */
+ final public function isEmpty() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doIsEmpty();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::isEmpty()
+ * @return bool
+ */
+ abstract protected function doIsEmpty();
+
+ /**
+ * Push a batch of jobs into the queue
+ *
+ * @param $jobs array List of Jobs
+ * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
+ * @return bool
+ */
+ final public function batchPush( array $jobs, $flags = 0 ) {
+ foreach ( $jobs as $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ }
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doBatchPush( $jobs, $flags );
+ if ( $ok ) {
+ wfIncrStats( 'job-insert', count( $jobs ) );
+ }
+ wfProfileOut( __METHOD__ );
+ return $ok;
+ }
+
+ /**
+ * @see JobQueue::batchPush()
+ * @return bool
+ */
+ abstract protected function doBatchPush( array $jobs, $flags );
+
+ /**
+ * Pop a job off of the queue
+ *
+ * @return Job|bool Returns false on failure
+ */
+ final public function pop() {
+ wfProfileIn( __METHOD__ );
+ $job = $this->doPop();
+ if ( $job ) {
+ wfIncrStats( 'job-pop' );
+ }
+ wfProfileOut( __METHOD__ );
+ return $job;
+ }
+
+ /**
+ * @see JobQueue::pop()
+ * @return Job
+ */
+ abstract protected function doPop();
+
+ /**
+ * Acknowledge that a job was completed
+ *
+ * @param $job Job
+ * @return bool
+ */
+ final public function ack( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doAck( $job );
+ wfProfileOut( __METHOD__ );
+ return $ok;
+ }
+
+ /**
+ * @see JobQueue::ack()
+ * @return bool
+ */
+ abstract protected function doAck( Job $job );
+
+ /**
+ * Wait for any slaves or backup servers to catch up
+ *
+ * @return void
+ */
+ final public function waitForBackups() {
+ wfProfileIn( __METHOD__ );
+ $this->doWaitForBackups();
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::waitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {}
+}
--- /dev/null
+<?php
+/**
+ * Database-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle job queues stored in the DB
+ *
+ * @ingroup JobQueue
+ * @since 1.20
+ */
+class JobQueueDB extends JobQueue {
+ const CACHE_TTL = 30; // integer; seconds
+
+ /**
+ * @see JobQueue::doIsEmpty()
+ * @return bool
+ */
+ protected function doIsEmpty() {
+ global $wgMemc;
+
+ $key = $this->getEmptinessCacheKey();
+
+ $isEmpty = $wgMemc->get( $key );
+ if ( $isEmpty === 'true' ) {
+ return true;
+ } elseif ( $isEmpty === 'false' ) {
+ return false;
+ }
+
+ $found = $this->getSlaveDB()->selectField(
+ 'job', '1', array( 'job_cmd' => $this->type ), __METHOD__
+ );
+
+ $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL );
+ }
+
+ /**
+ * @see JobQueue::doBatchPush()
+ * @return bool
+ */
+ protected function doBatchPush( array $jobs, $flags ) {
+ if ( count( $jobs ) ) {
+ $dbw = $this->getMasterDB();
+
+ $rows = array();
+ foreach ( $jobs as $job ) {
+ $rows[] = $this->insertFields( $job );
+ }
+ $atomic = ( $flags & self::QoS_Atomic );
+ $key = $this->getEmptinessCacheKey();
+ $ttl = self::CACHE_TTL;
+
+ $dbw->onTransactionIdle( function() use ( $dbw, $rows, $atomic, $key, $ttl ) {
+ global $wgMemc;
+
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
+ if ( $atomic ) {
+ $dbw->begin(); // wrap all the job additions in one transaction
+ } else {
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ }
+ try {
+ foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { // avoid slave lag
+ $dbw->insert( 'job', $rowBatch, __METHOD__ );
+ }
+ } catch ( DBError $e ) {
+ if ( $atomic ) {
+ $dbw->rollback();
+ } else {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+ }
+ throw $e;
+ }
+ if ( $atomic ) {
+ $dbw->commit();
+ } else {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+ }
+
+ $wgMemc->set( $key, 'false', $ttl );
+ } );
+ }
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doPop()
+ * @return Job|bool
+ */
+ protected function doPop() {
+ global $wgMemc;
+
+ $uuid = wfRandomString( 32 ); // pop attempt
+
+ $dbw = $this->getMasterDB();
+ if ( $dbw->trxLevel() ) {
+ wfWarn( "Attempted to pop a job in a transaction; committing first." );
+ $dbw->commit(); // push existing transaction
+ }
+
+ $job = false; // job popped off
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ try {
+ do { // retry when our row is invalid or deleted as a duplicate
+ $row = false; // row claimed
+ $rand = mt_rand( 0, 2147483648 ); // encourage concurrent UPDATEs
+ $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+ // Try to reserve a DB row...
+ if ( $this->claim( $uuid, $rand, $gte ) || $this->claim( $uuid, $rand, !$gte ) ) {
+ // Fetch any row that we just reserved...
+ $row = $dbw->selectRow( 'job', '*',
+ array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
+ // Check if another process deleted it as a duplicate
+ if ( !$row ) {
+ wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+ continue; // try again
+ }
+ // Get the job object from the row...
+ $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+ if ( !$title ) {
+ $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+ wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
+ continue; // try again
+ }
+ $job = Job::factory( $row->job_cmd, $title,
+ self::extractBlob( $row->job_params ), $row->job_id );
+ // Delete any *other* duplicate jobs in the queue...
+ if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
+ $dbw->delete( 'job',
+ array( 'job_sha1' => $row->job_sha1,
+ "job_id != {$dbw->addQuotes( $row->job_id )}" ),
+ __METHOD__
+ );
+ }
+ } else {
+ $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
+ }
+ break; // done
+ } while( true );
+ } catch ( DBError $e ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+ throw $e;
+ }
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+
+ return $job;
+ }
+
+ /**
+ * Reserve a row with a single UPDATE without holding row locks over RTTs...
+ * @param $uuid string 32 char hex string
+ * @param $rand integer Random unsigned integer (31 bits)
+ * @param $gte bool Search for job_random >= $random (otherwise job_random <= $random)
+ * @return integer Number of affected rows
+ */
+ protected function claim( $uuid, $rand, $gte ) {
+ $dbw = $this->getMasterDB();
+ $dir = $gte ? 'ASC' : 'DESC';
+ $ineq = $gte ? '>=' : '<=';
+ if ( $dbw->getType() === 'mysql' ) {
+ // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
+ // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
+ // Oracle and Postgre have no such limitation. However, MySQL offers an
+ // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
+ // The DB wrapper functions do not support this, so it's done manually.
+ $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
+ SET
+ job_token = {$dbw->addQuotes( $uuid ) },
+ job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
+ WHERE (
+ job_cmd = {$dbw->addQuotes( $this->type )}
+ AND job_token = {$dbw->addQuotes( '' )}
+ AND job_random {$ineq} {$dbw->addQuotes( $rand )}
+ ) ORDER BY job_random {$dir} LIMIT 1",
+ __METHOD__
+ );
+ } else {
+ // Use a subquery to find the job, within an UPDATE to claim it.
+ // This uses as much of the DB wrapper functions as possible.
+ $dbw->update( 'job',
+ array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+ array( 'job_id = (' .
+ $dbw->selectSQLText( 'job', 'job_id',
+ array(
+ 'job_cmd' => $this->type,
+ 'job_token' => '',
+ "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
+ __METHOD__,
+ array( 'ORDER BY' => "job_random {$dir}", 'LIMIT' => 1 ) ) .
+ ')'
+ ),
+ __METHOD__
+ );
+ }
+ return $dbw->affectedRows();
+ }
+
+ /**
+ * @see JobQueue::doAck()
+ * @return Job|bool
+ */
+ protected function doAck( Job $job ) {
+ $dbw = $this->getMasterDB();
+ if ( $dbw->trxLevel() ) {
+ wfWarn( "Attempted to ack a job in a transaction; committing first." );
+ $dbw->commit(); // push existing transaction
+ }
+
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ try {
+ // Delete a row with a single DELETE without holding row locks over RTTs...
+ $dbw->delete( 'job', array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ) );
+ } catch ( Exception $e ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+ throw $e;
+ }
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doWaitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {
+ wfWaitForSlaves();
+ }
+
+ /**
+ * @return DatabaseBase
+ */
+ protected function getSlaveDB() {
+ return wfGetDB( DB_SLAVE, array(), $this->wiki );
+ }
+
+ /**
+ * @return DatabaseBase
+ */
+ protected function getMasterDB() {
+ return wfGetDB( DB_MASTER, array(), $this->wiki );
+ }
+
+ /**
+ * @param $job Job
+ * @return array
+ */
+ protected function insertFields( Job $job ) {
+ // Rows that describe the nature of the job
+ $descFields = array(
+ 'job_cmd' => $job->getType(),
+ 'job_namespace' => $job->getTitle()->getNamespace(),
+ 'job_title' => $job->getTitle()->getDBkey(),
+ 'job_params' => self::makeBlob( $job->getParams() ),
+ );
+ // Additional job metadata
+ $dbw = $this->getMasterDB();
+ $metaFields = array(
+ 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+ 'job_timestamp' => $dbw->timestamp(),
+ 'job_sha1' => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ),
+ 'job_random' => mt_rand( 0, 2147483647 ) // [0, 2^31 - 1]
+ );
+ return ( $descFields + $metaFields );
+ }
+
+ /**
+ * @return string
+ */
+ private function getEmptinessCacheKey() {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'isempty' );
+ }
+
+ /**
+ * @param $params
+ * @return string
+ */
+ protected static function makeBlob( $params ) {
+ if ( $params !== false ) {
+ return serialize( $params );
+ } else {
+ return '';
+ }
+ }
+
+ /**
+ * @param $blob
+ * @return bool|mixed
+ */
+ protected static function extractBlob( $blob ) {
+ if ( (string)$blob !== '' ) {
+ return unserialize( $blob );
+ } else {
+ return false;
+ }
+ }
+}
--- /dev/null
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle enqueueing of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.20
+ */
+class JobQueueGroup {
+ /** @var Array */
+ protected static $instances = array();
+
+ protected $wiki; // string; wiki ID
+
+ const TYPE_DEFAULT = 1; // integer; job not in $wgJobTypesExcludedFromDefaultQueue
+ const TYPE_ANY = 2; // integer; any job
+
+ /**
+ * @param $wiki string Wiki ID
+ */
+ protected function __construct( $wiki ) {
+ $this->wiki = $wiki;
+ }
+
+ /**
+ * @param $wiki string Wiki ID
+ * @return JobQueueGroup
+ */
+ public static function singleton( $wiki = false ) {
+ $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
+ if ( !isset( self::$instances[$wiki] ) ) {
+ self::$instances[$wiki] = new self( $wiki );
+ }
+ return self::$instances[$wiki];
+ }
+
+ /**
+ * @param $type string
+ * @return JobQueue Job queue object for a given queue type
+ */
+ public function get( $type ) {
+ global $wgJobTypeConf;
+
+ $conf = false;
+ if ( isset( $wgJobTypeConf[$type] ) ) {
+ $conf = $wgJobTypeConf[$type];
+ } else {
+ $conf = $wgJobTypeConf['default'];
+ }
+
+ return JobQueue::factory( array(
+ 'class' => $conf['class'],
+ 'wiki' => $this->wiki,
+ 'type' => $type,
+ ) );
+ }
+
+ /**
+ * Insert jobs into the respective queues of with the belong
+ *
+ * @param $jobs Job|array A single Job or a list of Jobs
+ * @return bool
+ */
+ public function push( $jobs ) {
+ $jobs = (array)$jobs;
+
+ $jobsByType = array(); // (job type => list of jobs)
+ foreach ( $jobs as $job ) {
+ $jobsByType[$job->getType()][] = $job;
+ }
+
+ $ok = true;
+ foreach ( $jobsByType as $type => $jobs ) {
+ if ( !$this->get( $type )->batchPush( $jobs ) ) {
+ $ok = false;
+ }
+ }
+
+ return $ok;
+ }
+
+ /**
+ * Pop a job off one of the job queues
+ *
+ * @param $type integer JobQueueGroup::TYPE_* constant
+ * @return Job|bool Returns false on failure
+ */
+ public function pop( $type = self::TYPE_DEFAULT ) {
+ $types = ( $type == self::TYPE_DEFAULT )
+ ? $this->getDefaultQueueTypes()
+ : $this->getQueueTypes();
+ shuffle( $types ); // avoid starvation
+
+ foreach ( $types as $type ) { // for each queue...
+ $job = $this->get( $type )->pop();
+ if ( $job ) {
+ return $job; // found
+ }
+ }
+
+ return false; // no jobs found
+ }
+
+ /**
+ * Acknowledge that a job was completed
+ *
+ * @param $job Job
+ * @return bool
+ */
+ public function ack( Job $job ) {
+ return $this->get( $job->getType() )->ack( $job );
+ }
+
+ /**
+ * Get the list of queue types
+ *
+ * @return array List of strings
+ */
+ public function getQueueTypes() {
+ global $wgJobClasses;
+
+ return array_keys( $wgJobClasses );
+ }
+
+ /**
+ * Get the list of default queue types
+ *
+ * @return array List of strings
+ */
+ public function getDefaultQueueTypes() {
+ global $wgJobTypesExcludedFromDefaultQueue;
+
+ return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
+ }
+}
* @ingroup JobQueue
*/
class RefreshLinksJob extends Job {
-
function __construct( $title, $params = '', $id = 0 ) {
parent::__construct( 'refreshLinks', $title, $params, $id );
+ $this->removeDuplicates = true; // job is expensive
}
/**
--- /dev/null
+ALTER TABLE /*_*/job
+ ADD COLUMN job_random integer unsigned NOT NULL default 0,
+ ADD COLUMN job_token varbinary(32) NOT NULL default '',
+ ADD COLUMN job_token_timestamp varbinary(14) NULL default NULL,
+ ADD COLUMN job_sha1 varbinary(32) NOT NULL default '';
+
+CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1);
+CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random);
+
public function execute() {
global $wgTitle;
+
if ( $this->hasOption( 'procs' ) ) {
$procs = intval( $this->getOption( 'procs' ) );
if ( $procs < 1 || $procs > 1000 ) {
$dbw = wfGetDB( DB_MASTER );
$n = 0;
- if ( $type === false ) {
- $conds = Job::defaultQueueConditions( );
- } else {
- $conds = array( 'job_cmd' => $type );
- }
-
- while ( $dbw->selectField( 'job', 'job_id', $conds, 'runJobs.php' ) ) {
- $offset = 0;
- for ( ; ; ) {
- $job = !$type ? Job::pop( $offset ) : Job::pop_type( $type );
-
- if ( !$job ) {
- break;
- }
-
- wfWaitForSlaves();
+ $group = JobQueueGroup::singleton();
+ do {
+ $job = ( $type === false )
+ ? $group->pop() // job from any queue
+ : $group->get( $type )->pop(); // job from a single queue
+ if ( $job ) { // found a job
+ // Perform the job (logging success/failure and runtime)...
$t = microtime( true );
- $offset = $job->id;
$this->runJobsLog( $job->toString() . " STARTING" );
$status = $job->run();
+ $group->ack( $job ); // done
$t = microtime( true ) - $t;
$timeMs = intval( $t * 1000 );
if ( !$status ) {
} else {
$this->runJobsLog( $job->toString() . " t=$timeMs good" );
}
-
- if ( $maxJobs && ++$n > $maxJobs ) {
+ // Break out if we hit the job count or wall time limits...
+ if ( $maxJobs && ++$n >= $maxJobs ) {
break 2;
}
- if ( $maxTime && time() - $startTime > $maxTime ) {
+ if ( $maxTime && ( time() - $startTime ) > $maxTime ) {
break 2;
}
+ // Don't let any slaves/backups fall behind...
+ $group->get( $type )->waitForBackups();
}
- }
+ } while ( $job ); // stop when there are no jobs
}
/**
--- /dev/null
+ALTER TABLE /*_*/job ADD COLUMN job_random integer unsigned NOT NULL default 0;
+ALTER TABLE /*_*/job ADD COLUMN job_token varbinary(32) NOT NULL default '';
+ALTER TABLE /*_*/job ADD COLUMN job_sha1 varbinary(32) NOT NULL default '';
+ALTER TABLE /*_*/job ADD COLUMN job_token_timestamp varbinary(14) NULL default NULL;
+
+CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1);
+CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random);
+
-- Any other parameters to the command
-- Stored as a PHP serialized array, or an empty string if there are no parameters
- job_params blob NOT NULL
+ job_params blob NOT NULL,
+
+ -- Random, non-unique, number used for concurrent job acquisition
+ job_random integer unsigned NOT NULL default 0,
+
+ -- Field that conveys process locks on rows via process UUIDs
+ job_token varbinary(32) NOT NULL default '',
+
+ -- Timestamp when the job was locked
+ job_token_timestamp varbinary(14) NULL default NULL,
+
+ -- Base 36 SHA1 of the job parameters relevant to detecting duplicates
+ job_sha1 varbinary(32) NOT NULL default ''
) /*$wgDBTableOptions*/;
+CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1);
+CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random);
CREATE INDEX /*i*/job_cmd ON /*_*/job (job_cmd, job_namespace, job_title, job_params(128));
CREATE INDEX /*i*/job_timestamp ON /*_*/job (job_timestamp);