# includes/job/jobs
'DoubleRedirectJob' => 'includes/job/jobs/DoubleRedirectJob.php',
+ 'DuplicateJob' => 'includes/job/jobs/DuplicateJob.php',
'EmaillingJob' => 'includes/job/jobs/EmaillingJob.php',
'EnotifNotifyJob' => 'includes/job/jobs/EnotifNotifyJob.php',
'HTMLCacheUpdateJob' => 'includes/job/jobs/HTMLCacheUpdateJob.php',
}
function queueRecursiveJobs() {
- global $wgUpdateRowsPerJob;
wfProfileIn( __METHOD__ );
- $cache = $this->mTitle->getBacklinkCache();
- $batches = $cache->partition( 'templatelinks', $wgUpdateRowsPerJob );
- if ( !$batches ) {
- wfProfileOut( __METHOD__ );
- return;
- }
- $jobs = array();
- foreach ( $batches as $batch ) {
- list( $start, $end ) = $batch;
- $params = array(
- 'table' => 'templatelinks',
- 'start' => $start,
- 'end' => $end,
+ if ( $this->mTitle->getBacklinkCache()->hasLinks( 'templatelinks' ) ) {
+ $job = new RefreshLinksJob2(
+ $this->mTitle,
+ array(
+ 'table' => 'templatelinks',
+ ) + Job::newRootJobParams( // "overall" refresh links job info
+ "refreshlinks:templatelinks:{$this->mTitle->getPrefixedText()}"
+ )
);
- $jobs[] = new RefreshLinksJob2( $this->mTitle, $params );
+ JobQueueGroup::singleton()->push( $job );
+ JobQueueGroup::singleton()->deduplicateRootJob( $job );
}
- Job::batchInsert( $jobs );
wfProfileOut( __METHOD__ );
}
return $conds;
}
+ /**
+ * Check if there are any backlinks
+ * @param $table String
+ * @return bool
+ */
+ public function hasLinks( $table ) {
+ return ( $this->getNumLinks( $table, 1 ) > 0 );
+ }
+
/**
* Get the approximate number of backlinks
* @param $table String
+ * @param $max integer Only count up to this many backlinks
* @return integer
*/
- public function getNumLinks( $table ) {
+ public function getNumLinks( $table, $max = INF ) {
global $wgMemc;
// 1) try partition cache ...
if ( isset( $this->partitionCache[$table] ) ) {
$entry = reset( $this->partitionCache[$table] );
- return $entry['numRows'];
+ return min( $max, $entry['numRows'] );
}
// 2) ... then try full result cache ...
if ( isset( $this->fullResultCache[$table] ) ) {
- return $this->fullResultCache[$table]->numRows();
+ return min( $max, $this->fullResultCache[$table]->numRows() );
}
$memcKey = wfMemcKey( 'numbacklinks', md5( $this->title->getPrefixedDBkey() ), $table );
// 3) ... fallback to memcached ...
$count = $wgMemc->get( $memcKey );
if ( $count ) {
- return $count;
+ return min( $max, $count );
}
// 4) fetch from the database ...
- $count = $this->getLinks( $table )->count();
- $wgMemc->set( $memcKey, $count, self::CACHE_EXPIRY );
+ if ( is_infinite( $max ) ) { // full count
+ $count = $this->getLinks( $table )->count();
+ $wgMemc->set( $memcKey, $count, self::CACHE_EXPIRY );
+ } else { // with limit
+ $count = $this->getDB()->select(
+ array( $table, 'page' ),
+ '1',
+ $this->getConditions( $table ),
+ __METHOD__,
+ array( 'LIMIT' => $max )
+ )->numRows();
+ }
return $count;
}
/**
* Class to invalidate the HTML cache of all the pages linking to a given title.
- * Small numbers of links will be done immediately, large numbers are pushed onto
- * the job queue.
- *
- * This class is designed to work efficiently with small numbers of links, and
- * to work reasonably well with up to ~10^5 links. Above ~10^6 links, the memory
- * and time requirements of loading all backlinked IDs in doUpdate() might become
- * prohibitive. The requirements measured at Wikimedia are approximately:
- *
- * memory: 48 bytes per row
- * time: 16us per row for the query plus processing
- *
- * The reason this query is done is to support partitioning of the job
- * by backlinked ID. The memory issue could be allieviated by doing this query in
- * batches, but of course LIMIT with an offset is inefficient on the DB side.
- *
- * The class is nevertheless a vast improvement on the previous method of using
- * File::getLinksTo() and Title::touchArray(), which uses about 2KB of memory per
- * link.
*
* @ingroup Cache
*/
*/
public $mTitle;
- public $mTable, $mPrefix, $mStart, $mEnd;
- public $mRowsPerJob, $mRowsPerQuery;
+ public $mTable;
/**
* @param $titleTo
* @param $start bool
* @param $end bool
*/
- function __construct( $titleTo, $table, $start = false, $end = false ) {
- global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
-
+ function __construct( Title $titleTo, $table ) {
$this->mTitle = $titleTo;
$this->mTable = $table;
- $this->mStart = $start;
- $this->mEnd = $end;
- $this->mRowsPerJob = $wgUpdateRowsPerJob;
- $this->mRowsPerQuery = $wgUpdateRowsPerQuery;
- $this->mCache = $this->mTitle->getBacklinkCache();
}
public function doUpdate() {
- if ( $this->mStart || $this->mEnd ) {
- $this->doPartialUpdate();
- return;
- }
-
- # Get an estimate of the number of rows from the BacklinkCache
- $numRows = $this->mCache->getNumLinks( $this->mTable );
- if ( $numRows > $this->mRowsPerJob * 2 ) {
- # Do fast cached partition
- $this->insertJobs();
- } else {
- # Get the links from the DB
- $titleArray = $this->mCache->getLinks( $this->mTable );
- # Check if the row count estimate was correct
- if ( $titleArray->count() > $this->mRowsPerJob * 2 ) {
- # Not correct, do accurate partition
- wfDebug( __METHOD__.": row count estimate was incorrect, repartitioning\n" );
- $this->insertJobsFromTitles( $titleArray );
- } else {
- $this->invalidateTitles( $titleArray );
- }
- }
- }
-
- /**
- * Update some of the backlinks, defined by a page ID range
- */
- protected function doPartialUpdate() {
- $titleArray = $this->mCache->getLinks( $this->mTable, $this->mStart, $this->mEnd );
- if ( $titleArray->count() <= $this->mRowsPerJob * 2 ) {
- # This partition is small enough, do the update
- $this->invalidateTitles( $titleArray );
- } else {
- # Partitioning was excessively inaccurate. Divide the job further.
- # This can occur when a large number of links are added in a short
- # period of time, say by updating a heavily-used template.
- $this->insertJobsFromTitles( $titleArray );
- }
- }
-
- /**
- * Partition the current range given by $this->mStart and $this->mEnd,
- * using a pre-calculated title array which gives the links in that range.
- * Queue the resulting jobs.
- *
- * @param $titleArray array
- */
- protected function insertJobsFromTitles( $titleArray ) {
- # We make subpartitions in the sense that the start of the first job
- # will be the start of the parent partition, and the end of the last
- # job will be the end of the parent partition.
- $jobs = array();
- $start = $this->mStart; # start of the current job
- $numTitles = 0;
- foreach ( $titleArray as $title ) {
- $id = $title->getArticleID();
- # $numTitles is now the number of titles in the current job not
- # including the current ID
- if ( $numTitles >= $this->mRowsPerJob ) {
- # Add a job up to but not including the current ID
- $params = array(
- 'table' => $this->mTable,
- 'start' => $start,
- 'end' => $id - 1
- );
- $jobs[] = new HTMLCacheUpdateJob( $this->mTitle, $params );
- $start = $id;
- $numTitles = 0;
- }
- $numTitles++;
- }
- # Last job
- $params = array(
- 'table' => $this->mTable,
- 'start' => $start,
- 'end' => $this->mEnd
- );
- $jobs[] = new HTMLCacheUpdateJob( $this->mTitle, $params );
- wfDebug( __METHOD__.": repartitioning into " . count( $jobs ) . " jobs\n" );
-
- if ( count( $jobs ) < 2 ) {
- # I don't think this is possible at present, but handling this case
- # makes the code a bit more robust against future code updates and
- # avoids a potential infinite loop of repartitioning
- wfDebug( __METHOD__.": repartitioning failed!\n" );
- $this->invalidateTitles( $titleArray );
- return;
- }
-
- Job::batchInsert( $jobs );
- }
+ wfProfileIn( __METHOD__ );
- /**
- * @return mixed
- */
- protected function insertJobs() {
- $batches = $this->mCache->partition( $this->mTable, $this->mRowsPerJob );
- if ( !$batches ) {
- return;
- }
- $jobs = array();
- foreach ( $batches as $batch ) {
- $params = array(
+ $job = new HTMLCacheUpdateJob(
+ $this->mTitle,
+ array(
'table' => $this->mTable,
- 'start' => $batch[0],
- 'end' => $batch[1],
- );
- $jobs[] = new HTMLCacheUpdateJob( $this->mTitle, $params );
- }
- Job::batchInsert( $jobs );
- }
-
- /**
- * Invalidate an array (or iterator) of Title objects, right now
- * @param $titleArray array
- */
- protected function invalidateTitles( $titleArray ) {
- global $wgUseFileCache, $wgUseSquid;
-
- $dbw = wfGetDB( DB_MASTER );
- $timestamp = $dbw->timestamp();
-
- # Get all IDs in this query into an array
- $ids = array();
- foreach ( $titleArray as $title ) {
- $ids[] = $title->getArticleID();
- }
-
- if ( !$ids ) {
- return;
- }
-
- # Update page_touched
- $batches = array_chunk( $ids, $this->mRowsPerQuery );
- foreach ( $batches as $batch ) {
- $dbw->update( 'page',
- array( 'page_touched' => $timestamp ),
- array( 'page_id' => $batch ),
- __METHOD__
- );
- }
+ ) + Job::newRootJobParams( // "overall" refresh links job info
+ "htmlCacheUpdate:{$this->mTable}:{$this->mTitle->getPrefixedText()}"
+ )
+ );
- # Update squid
- if ( $wgUseSquid ) {
- $u = SquidUpdate::newFromTitles( $titleArray );
- $u->doUpdate();
+ $count = $this->mTitle->getBacklinkCache()->getNumLinks( $this->mTable, 200 );
+ if ( $count >= 200 ) { // many backlinks
+ JobQueueGroup::singleton()->push( $job );
+ JobQueueGroup::singleton()->deduplicateRootJob( $job );
+ } else { // few backlinks ($count might be off even if 0)
+ $job->run(); // just do the purge query now
}
- # Update file cache
- if ( $wgUseFileCache ) {
- foreach ( $titleArray as $title ) {
- HTMLFileCache::clearFileCache( $title );
- }
- }
+ wfProfileOut( __METHOD__ );
}
}
return $this->removeDuplicates;
}
+ /**
+ * Subclasses may need to override this to make duplication detection work
+ *
+ * @return Array Map of key/values
+ */
+ public function getDeduplicationInfo() {
+ $info = array(
+ 'type' => $this->getType(),
+ 'namespace' => $this->getTitle()->getNamespace(),
+ 'title' => $this->getTitle()->getDBkey(),
+ 'params' => $this->getParams()
+ );
+ // Identical jobs with different "root" jobs should count as duplicates
+ if ( is_array( $info['params'] ) ) {
+ unset( $info['params']['rootJobSignature'] );
+ unset( $info['params']['rootJobTimestamp'] );
+ }
+ return $info;
+ }
+
+ /**
+ * @param $key string A key that identifies the task
+ * @return Array
+ */
+ public static function newRootJobParams( $key ) {
+ return array(
+ 'rootJobSignature' => sha1( $key ),
+ 'rootJobTimestamp' => wfTimestampNow()
+ );
+ }
+
+ /**
+ * @return Array
+ */
+ public function getRootJobParams() {
+ return array(
+ 'rootJobSignature' => isset( $this->params['rootJobSignature'] )
+ ? $this->params['rootJobSignature']
+ : null,
+ 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
+ ? $this->params['rootJobTimestamp']
+ : null
+ );
+ }
+
/**
* Insert a single job into the queue.
* @return bool true on success
*/
abstract protected function doAck( Job $job );
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ * This is used to turn older, duplicate, job entries into no-ops. The root job
+ * information will remain in the registry until it simply falls out of cache.
+ *
+ * This requires that $job has two special fields in the "params" array:
+ * - rootJobSignature : hash (e.g. SHA1) that identifies the task
+ * - rootJobTimestamp : TS_MW timestamp of this instance of the task
+ *
+ * A "root job" is a conceptual job that consist of potentially many smaller jobs
+ * that are actually inserted into the queue. For example, "refreshLinks" jobs are
+ * spawned when a template is edited. One can think of the task as "update links
+ * of pages that use template X" and an instance of that task as a "root job".
+ * However, what actually goes into the queue are potentially many refreshLinks2 jobs.
+ * Since these jobs include things like page ID ranges and DB master positions, and morph
+ * into smaller refreshLinks2 jobs recursively, simple duplicate detection (like job_sha1)
+ * for individual jobs being identical is not useful.
+ *
+ * In the case of "refreshLinks", if these jobs are still in the queue when the template
+ * is edited again, we want all of these old refreshLinks jobs for that template to become
+ * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
+ * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
+ * previous "root job" for the same task of "update links of pages that use template X".
+ *
+ * @param $job Job
+ * @return bool
+ */
+ final public function deduplicateRootJob( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $ok = $this->doDeduplicateRootJob( $job );
+ wfProfileOut( __METHOD__ );
+ return $ok;
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @param $job Job
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( Job $job ) {
+ return true;
+ }
+
/**
* Wait for any slaves or backup servers to catch up
*
);
wfIncrStats( 'job-pop', $dbw->affectedRows() );
}
+ // Flag this job as an old duplicate based on its "root" job...
+ if ( $this->isRootJobOldDuplicate( $job ) ) {
+ $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
+ }
break; // done
} while( true );
} catch ( DBError $e ) {
return true;
}
+ /**
+ * @see JobQueue::doDeduplicateRootJob()
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( Job $job ) {
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+ }
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Callers should call batchInsert() and then this function so that if the insert
+ // fails, the de-duplication registration will be aborted. Since the insert is
+ // deferred till "transaction idle", do that same here, so that the ordering is
+ // maintained. Having only the de-duplication registration succeed would cause
+ // jobs to become no-ops without any actual jobs that made them redundant.
+ $this->getMasterDB()->onTransactionIdle( function() use ( $params, $key ) {
+ global $wgMemc;
+
+ $timestamp = $wgMemc->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $wgMemc->set( $key, $params['rootJobTimestamp'], 14*86400 ); // 2 weeks
+ } );
+
+ return true;
+ }
+
+ /**
+ * Check if the "root" job of a given job has been superseded by a newer one
+ *
+ * @param $job Job
+ * @return bool
+ */
+ protected function isRootJobOldDuplicate( Job $job ) {
+ global $wgMemc;
+
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ return false; // job has no de-deplication info
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
+ return false;
+ }
+
+ // Get the last time this root job was enqueued
+ $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
+
+ // Check if a new root job was started at the location after this one's...
+ return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+ }
+
/**
* @see JobQueue::doWaitForBackups()
* @return void
* @return array
*/
protected function insertFields( Job $job ) {
- // Rows that describe the nature of the job
- $descFields = array(
+ $dbw = $this->getMasterDB();
+ return array(
+ // Fields that describe the nature of the job
'job_cmd' => $job->getType(),
'job_namespace' => $job->getTitle()->getNamespace(),
'job_title' => $job->getTitle()->getDBkey(),
'job_params' => self::makeBlob( $job->getParams() ),
- );
- // Additional job metadata
- $dbw = $this->getMasterDB();
- $metaFields = array(
+ // Additional job metadata
'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
'job_timestamp' => $dbw->timestamp(),
- 'job_sha1' => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ),
+ 'job_sha1' => wfBaseConvert(
+ sha1( serialize( $job->getDeduplicationInfo() ) ),
+ 16, 36, 31
+ ),
'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
);
- return ( $descFields + $metaFields );
}
/**
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'isempty' );
}
+ /**
+ * @param string $signature Hash identifier of the root job
+ * @return string
+ */
+ private function getRootJobCacheKey( $signature ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
+ }
+
/**
* @param $params
* @return string
return $this->get( $job->getType() )->ack( $job );
}
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ *
+ * @param $job Job
+ * @return bool
+ */
+ public function deduplicateRootJob( Job $job ) {
+ return $this->get( $job->getType() )->deduplicateRootJob( $job );
+ }
+
/**
* Get the list of queue types
*
--- /dev/null
+<?php
+/**
+ * No-op job that does nothing.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+/**
+ * No-op job that does nothing. Used to represent duplicates.
+ *
+ * @ingroup JobQueue
+ */
+final class DuplicateJob extends Job {
+ /**
+ * Callers should use DuplicateJob::newFromJob() instead
+ *
+ * @param $title Title
+ * @param $params Array: job parameters
+ * @param $id Integer: job id
+ */
+ function __construct( $title, $params, $id = 0 ) {
+ parent::__construct( 'duplicate', $title, $params, $id );
+ }
+
+ /**
+ * Get a duplicate no-op version of a job
+ *
+ * @param Job $job
+ * @return Job
+ */
+ public static function newFromJob( Job $job ) {
+ $job = new self( $job->getTitle(), $job->getParams(), $job->getId() );
+ $job->command = $job->getType();
+ $job->params = is_array( $job->params ) ? $job->params : array();
+ $job->params = array( 'isDuplicate' => true ) + $job->params;
+ return $job;
+ }
+
+ public function run() {
+ return true;
+ }
+}
* Job wrapper for HTMLCacheUpdate. Gets run whenever a related
* job gets called from the queue.
*
+ * This class is designed to work efficiently with small numbers of links, and
+ * to work reasonably well with up to ~10^5 links. Above ~10^6 links, the memory
+ * and time requirements of loading all backlinked IDs in doUpdate() might become
+ * prohibitive. The requirements measured at Wikimedia are approximately:
+ *
+ * memory: 48 bytes per row
+ * time: 16us per row for the query plus processing
+ *
+ * The reason this query is done is to support partitioning of the job
+ * by backlinked ID. The memory issue could be allieviated by doing this query in
+ * batches, but of course LIMIT with an offset is inefficient on the DB side.
+ *
+ * The class is nevertheless a vast improvement on the previous method of using
+ * File::getLinksTo() and Title::touchArray(), which uses about 2KB of memory per
+ * link.
+ *
* @ingroup JobQueue
*/
class HTMLCacheUpdateJob extends Job {
- var $table, $start, $end;
+ /** @var BacklinkCache */
+ protected $blCache;
+
+ protected $rowsPerJob, $rowsPerQuery;
/**
* Construct a job
* @param $id Integer: job id
*/
function __construct( $title, $params, $id = 0 ) {
+ global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
+
parent::__construct( 'htmlCacheUpdate', $title, $params, $id );
- $this->table = $params['table'];
- $this->start = $params['start'];
- $this->end = $params['end'];
+
+ $this->rowsPerJob = $wgUpdateRowsPerJob;
+ $this->rowsPerQuery = $wgUpdateRowsPerQuery;
+ $this->blCache = $title->getBacklinkCache();
}
public function run() {
- $update = new HTMLCacheUpdate( $this->title, $this->table, $this->start, $this->end );
- $update->doUpdate();
+ if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
+ # This is hit when a job is actually performed
+ return $this->doPartialUpdate();
+ } else {
+ # This is hit when the jobs have to be inserted
+ return $this->doFullUpdate();
+ }
+ }
+
+ /**
+ * Update all of the backlinks
+ */
+ protected function doFullUpdate() {
+ # Get an estimate of the number of rows from the BacklinkCache
+ $numRows = $this->blCache->getNumLinks( $this->params['table'] );
+ if ( $numRows > $this->rowsPerJob * 2 ) {
+ # Do fast cached partition
+ $this->insertPartitionJobs();
+ } else {
+ # Get the links from the DB
+ $titleArray = $this->blCache->getLinks( $this->params['table'] );
+ # Check if the row count estimate was correct
+ if ( $titleArray->count() > $this->rowsPerJob * 2 ) {
+ # Not correct, do accurate partition
+ wfDebug( __METHOD__.": row count estimate was incorrect, repartitioning\n" );
+ $this->insertJobsFromTitles( $titleArray );
+ } else {
+ $this->invalidateTitles( $titleArray ); // just do the query
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Update some of the backlinks, defined by a page ID range
+ */
+ protected function doPartialUpdate() {
+ $titleArray = $this->blCache->getLinks(
+ $this->params['table'], $this->params['start'], $this->params['end'] );
+ if ( $titleArray->count() <= $this->rowsPerJob * 2 ) {
+ # This partition is small enough, do the update
+ $this->invalidateTitles( $titleArray );
+ } else {
+ # Partitioning was excessively inaccurate. Divide the job further.
+ # This can occur when a large number of links are added in a short
+ # period of time, say by updating a heavily-used template.
+ $this->insertJobsFromTitles( $titleArray );
+ }
return true;
}
+
+ /**
+ * Partition the current range given by $this->params['start'] and $this->params['end'],
+ * using a pre-calculated title array which gives the links in that range.
+ * Queue the resulting jobs.
+ *
+ * @param $titleArray array
+ * @param $rootJobParams array
+ * @rerturn void
+ */
+ protected function insertJobsFromTitles( $titleArray, $rootJobParams = array() ) {
+ // Carry over any "root job" information
+ $rootJobParams = $this->getRootJobParams();
+ # We make subpartitions in the sense that the start of the first job
+ # will be the start of the parent partition, and the end of the last
+ # job will be the end of the parent partition.
+ $jobs = array();
+ $start = $this->params['start']; # start of the current job
+ $numTitles = 0;
+ foreach ( $titleArray as $title ) {
+ $id = $title->getArticleID();
+ # $numTitles is now the number of titles in the current job not
+ # including the current ID
+ if ( $numTitles >= $this->rowsPerJob ) {
+ # Add a job up to but not including the current ID
+ $jobs[] = new HTMLCacheUpdateJob( $this->title,
+ array(
+ 'table' => $this->params['table'],
+ 'start' => $start,
+ 'end' => $id - 1
+ ) + $rootJobParams // carry over information for de-duplication
+ );
+ $start = $id;
+ $numTitles = 0;
+ }
+ $numTitles++;
+ }
+ # Last job
+ $jobs[] = new HTMLCacheUpdateJob( $this->title,
+ array(
+ 'table' => $this->params['table'],
+ 'start' => $start,
+ 'end' => $this->params['end']
+ ) + $rootJobParams // carry over information for de-duplication
+ );
+ wfDebug( __METHOD__.": repartitioning into " . count( $jobs ) . " jobs\n" );
+
+ if ( count( $jobs ) < 2 ) {
+ # I don't think this is possible at present, but handling this case
+ # makes the code a bit more robust against future code updates and
+ # avoids a potential infinite loop of repartitioning
+ wfDebug( __METHOD__.": repartitioning failed!\n" );
+ $this->invalidateTitles( $titleArray );
+ } else {
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+ }
+
+ /**
+ * @param $rootJobParams array
+ * @return void
+ */
+ protected function insertPartitionJobs( $rootJobParams = array() ) {
+ // Carry over any "root job" information
+ $rootJobParams = $this->getRootJobParams();
+
+ $batches = $this->blCache->partition( $this->params['table'], $this->rowsPerJob );
+ if ( !count( $batches ) ) {
+ return; // no jobs to insert
+ }
+
+ $jobs = array();
+ foreach ( $batches as $batch ) {
+ list( $start, $end ) = $batch;
+ $jobs[] = new HTMLCacheUpdateJob( $this->title,
+ array(
+ 'table' => $this->params['table'],
+ 'start' => $start,
+ 'end' => $end,
+ ) + $rootJobParams // carry over information for de-duplication
+ );
+ }
+
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+
+ /**
+ * Invalidate an array (or iterator) of Title objects, right now
+ * @param $titleArray array
+ */
+ protected function invalidateTitles( $titleArray ) {
+ global $wgUseFileCache, $wgUseSquid;
+
+ $dbw = wfGetDB( DB_MASTER );
+ $timestamp = $dbw->timestamp();
+
+ # Get all IDs in this query into an array
+ $ids = array();
+ foreach ( $titleArray as $title ) {
+ $ids[] = $title->getArticleID();
+ }
+
+ if ( !$ids ) {
+ return;
+ }
+
+ # Don't invalidated pages that were already invalidated
+ $touchedCond = isset( $this->params['rootJobTimestamp'] )
+ ? array( "page_touched < " .
+ $dbw->addQuotes( $dbw->timestamp( $this->params['rootJobTimestamp'] ) ) )
+ : array();
+
+ # Update page_touched
+ $batches = array_chunk( $ids, $this->rowsPerQuery );
+ foreach ( $batches as $batch ) {
+ $dbw->update( 'page',
+ array( 'page_touched' => $timestamp ),
+ array( 'page_id' => $batch ) + $touchedCond,
+ __METHOD__
+ );
+ }
+
+ # Update squid
+ if ( $wgUseSquid ) {
+ $u = SquidUpdate::newFromTitles( $titleArray );
+ $u->doUpdate();
+ }
+
+ # Update file cache
+ if ( $wgUseFileCache ) {
+ foreach ( $titleArray as $title ) {
+ HTMLFileCache::clearFileCache( $title );
+ }
+ }
+ }
}
<?php
/**
- * Degenerate job that just replaces itself in the queue.
+ * Degenerate job that does nothing.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
*/
/**
- * Degenerate job that just replace itself in the queue.
- * Useful for lock contention and performance testing.
+ * Degenerate job that does nothing, but can optionally replace itself
+ * in the queue and/or sleep for a brief time period. These can be used
+ * to represent "no-op" jobs or test lock contention and performance.
*
* @ingroup JobQueue
*/
return true;
}
+ /**
+ * @return Array
+ */
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ // Don't let highly unique "masterPos" values ruin duplicate detection
+ if ( is_array( $info['params'] ) ) {
+ unset( $info['params']['masterPos'] );
+ }
+ return $info;
+ }
+
+ /**
+ * @param $title Title
+ * @param $revision Revision
+ * @param $fname string
+ * @return void
+ */
public static function runForTitleInternal( Title $title, Revision $revision, $fname ) {
wfProfileIn( $fname );
$content = $revision->getContent( Revision::RAW );
* @ingroup JobQueue
*/
class RefreshLinksJob2 extends Job {
- const MAX_TITLES_RUN = 10;
-
function __construct( $title, $params, $id = 0 ) {
parent::__construct( 'refreshLinks2', $title, $params, $id );
}
* @return boolean success
*/
function run() {
+ global $wgUpdateRowsPerJob;
+
wfProfileIn( __METHOD__ );
$linkCache = LinkCache::singleton();
$this->error = "refreshLinks2: Invalid title";
wfProfileOut( __METHOD__ );
return false;
- } elseif ( !isset( $this->params['start'] ) || !isset( $this->params['end'] ) ) {
- $this->error = "refreshLinks2: Invalid params";
- wfProfileOut( __METHOD__ );
- return false;
}
// Back compat for pre-r94435 jobs
$table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks';
- // Avoid slave lag when fetching templates
+ // Avoid slave lag when fetching templates.
+ // When the outermost job is run, we know that the caller that enqueued it must have
+ // committed the relevant changes to the DB by now. At that point, record the master
+ // position and pass it along as the job recursively breaks into smaller range jobs.
+ // Hopefully, when leaf jobs are popped, the slaves will have reached that position.
if ( isset( $this->params['masterPos'] ) ) {
$masterPos = $this->params['masterPos'];
} elseif ( wfGetLB()->getServerCount() > 1 ) {
$masterPos = false;
}
- $titles = $this->title->getBacklinkCache()->getLinks(
- $table, $this->params['start'], $this->params['end'] );
-
- if ( $titles->count() > self::MAX_TITLES_RUN ) {
- # We don't want to parse too many pages per job as it can starve other jobs.
- # If there are too many pages to parse, break this up into smaller jobs. By passing
- # in the master position here we can cut down on the time spent waiting for slaves to
- # catch up by the runners handling these jobs since time will have passed between now
- # and when they pop these jobs off the queue.
- $start = 0; // batch start
- $end = 0; // batch end
- $bsize = 0; // batch size
- $first = true; // first of batch
- $jobs = array();
- foreach ( $titles as $title ) {
- $start = $first ? $title->getArticleId() : $start;
- $end = $title->getArticleId();
- $first = false;
- if ( ++$bsize >= self::MAX_TITLES_RUN ) {
- $jobs[] = new RefreshLinksJob2( $this->title, array(
- 'table' => $table,
- 'start' => $start,
- 'end' => $end,
- 'masterPos' => $masterPos
- ) );
- $first = true;
- $start = $end = $bsize = 0;
- }
- }
- if ( $bsize > 0 ) { // group remaining pages into a job
- $jobs[] = new RefreshLinksJob2( $this->title, array(
- 'table' => $table,
- 'start' => $start,
- 'end' => $end,
- 'masterPos' => $masterPos
- ) );
- }
- Job::batchInsert( $jobs );
- } elseif ( php_sapi_name() != 'cli' ) {
- # Not suitable for page load triggered job running!
- # Gracefully switch to refreshLinks jobs if this happens.
- $jobs = array();
- foreach ( $titles as $title ) {
- $jobs[] = new RefreshLinksJob( $title, array( 'masterPos' => $masterPos ) );
- }
- Job::batchInsert( $jobs );
+ $tbc = $this->title->getBacklinkCache();
+
+ $jobs = array(); // jobs to insert
+ if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
+ # This is a partition job to trigger the insertion of leaf jobs...
+ $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
} else {
- # Wait for the DB of the current/next slave DB handle to catch up to the master.
- # This way, we get the correct page_latest for templates or files that just changed
- # milliseconds ago, having triggered this job to begin with.
- if ( $masterPos ) {
- wfGetLB()->waitFor( $masterPos );
- }
- # Re-parse each page that transcludes this page and update their tracking links...
- foreach ( $titles as $title ) {
- $revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL );
- if ( !$revision ) {
- $this->error = 'refreshLinks: Article not found "' .
- $title->getPrefixedDBkey() . '"';
- continue; // skip this page
+ # This is a base job to trigger the insertion of partitioned jobs...
+ if ( $tbc->getNumLinks( $table ) <= $wgUpdateRowsPerJob ) {
+ # Just directly insert the single per-title jobs
+ $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
+ } else {
+ # Insert the partition jobs to make per-title jobs
+ foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) {
+ list( $start, $end ) = $batch;
+ $jobs[] = new RefreshLinksJob2( $this->title,
+ array(
+ 'table' => $table,
+ 'start' => $start,
+ 'end' => $end,
+ 'masterPos' => $masterPos,
+ ) + $this->getRootJobParams() // carry over information for de-duplication
+ );
}
- RefreshLinksJob::runForTitleInternal( $title, $revision, __METHOD__ );
- wfWaitForSlaves();
}
}
+ if ( count( $jobs ) ) {
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+
wfProfileOut( __METHOD__ );
return true;
}
+
+ /**
+ * @param $table string
+ * @param $masterPos mixed
+ * @return Array
+ */
+ protected function getSingleTitleJobs( $table, $masterPos ) {
+ # The "start"/"end" fields are not set for the base jobs
+ $start = isset( $this->params['start'] ) ? $this->params['start'] : false;
+ $end = isset( $this->params['end'] ) ? $this->params['end'] : false;
+ $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end );
+ # Convert into single page refresh links jobs.
+ # This handles well when in sapi mode and is useful in any case for job
+ # de-duplication. If many pages use template A, and that template itself
+ # uses template B, then an edit to both will create many duplicate jobs.
+ # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will
+ # get run first, and when it does, it will remove the duplicates. Of course,
+ # one page could have its job popped when the other page's job is still
+ # buried within the logic of a refreshLinks2 job.
+ $jobs = array();
+ foreach ( $titles as $title ) {
+ $jobs[] = new RefreshLinksJob( $title,
+ array( 'masterPos' => $masterPos ) + $this->getRootJobParams()
+ ); // carry over information for de-duplication
+ }
+ return $jobs;
+ }
+
+ /**
+ * @return Array
+ */
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ // Don't let highly unique "masterPos" values ruin duplicate detection
+ if ( is_array( $info['params'] ) ) {
+ unset( $info['params']['masterPos'] );
+ }
+ return $info;
+ }
}