[JobQueue] Improved refreshLinks/htmlCacheUpdate job de-duplication.
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 8 Nov 2012 22:01:40 +0000 (14:01 -0800)
committerGerrit Code Review <gerrit@wikimedia.org>
Wed, 28 Nov 2012 09:29:41 +0000 (09:29 +0000)
* Added JobQueue::deduplicateRootJob() function which uses cache
  records of the last time a "root" job was initiated for a task
  in order to invalidate prior jobs for that task. For refreshLinks,
  the "task" is basically "enqueueing the refresh jobs for title X".
* (bug 27914) Also added new Job::getDeduplicationFields() function
  and made use of it with refreshLinks to exclude things like 'masterPos'
  from duplicate job check comparisons for refreshLinks.
* (bug 27914) Always resolve refreshLinks2 jobs down to refreshLinks jobs.
  For each affected pages, one of them will get their job popped
  first, which will remove the duplicates for that page unless
  one page is in a refreshLinks2 jobs and the other in refreshLinks.
* (bug 37731) Made LinksUpdate/HTMLCacheUpdate defer the large
  backlinks query by doing it in an outer job.
* (bug 42065) HTMLCacheUpdate will no longer purge pages that were
  already purged since the job was added.

Change-Id: I71b743e0a38e60a874ca856e80cb761bea06b689

12 files changed:
includes/AutoLoader.php
includes/LinksUpdate.php
includes/cache/BacklinkCache.php
includes/cache/HTMLCacheUpdate.php
includes/job/Job.php
includes/job/JobQueue.php
includes/job/JobQueueDB.php
includes/job/JobQueueGroup.php
includes/job/jobs/DuplicateJob.php [new file with mode: 0644]
includes/job/jobs/HTMLCacheUpdateJob.php
includes/job/jobs/NullJob.php
includes/job/jobs/RefreshLinksJob.php

index 31c1571..3d0c27a 100644 (file)
@@ -651,6 +651,7 @@ $wgAutoloadLocalClasses = array(
 
        # includes/job/jobs
        'DoubleRedirectJob' => 'includes/job/jobs/DoubleRedirectJob.php',
+       'DuplicateJob' => 'includes/job/jobs/DuplicateJob.php',
        'EmaillingJob' => 'includes/job/jobs/EmaillingJob.php',
        'EnotifNotifyJob' => 'includes/job/jobs/EnotifNotifyJob.php',
        'HTMLCacheUpdateJob' => 'includes/job/jobs/HTMLCacheUpdateJob.php',
index fd1fefb..a7a903e 100644 (file)
@@ -238,26 +238,20 @@ class LinksUpdate extends SqlDataUpdate {
        }
 
        function queueRecursiveJobs() {
-               global $wgUpdateRowsPerJob;
                wfProfileIn( __METHOD__ );
 
-               $cache = $this->mTitle->getBacklinkCache();
-               $batches = $cache->partition( 'templatelinks', $wgUpdateRowsPerJob );
-               if ( !$batches ) {
-                       wfProfileOut( __METHOD__ );
-                       return;
-               }
-               $jobs = array();
-               foreach ( $batches as $batch ) {
-                       list( $start, $end ) = $batch;
-                       $params = array(
-                               'table' => 'templatelinks',
-                               'start' => $start,
-                               'end' => $end,
+               if ( $this->mTitle->getBacklinkCache()->hasLinks( 'templatelinks' ) ) {
+                       $job = new RefreshLinksJob2(
+                               $this->mTitle,
+                               array(
+                                       'table' => 'templatelinks',
+                               ) + Job::newRootJobParams( // "overall" refresh links job info
+                                       "refreshlinks:templatelinks:{$this->mTitle->getPrefixedText()}"
+                               )
                        );
-                       $jobs[] = new RefreshLinksJob2( $this->mTitle, $params );
+                       JobQueueGroup::singleton()->push( $job );
+                       JobQueueGroup::singleton()->deduplicateRootJob( $job );
                }
-               Job::batchInsert( $jobs );
 
                wfProfileOut( __METHOD__ );
        }
index c4f8762..0710caa 100644 (file)
@@ -297,23 +297,33 @@ class BacklinkCache {
                return $conds;
        }
 
+       /**
+        * Check if there are any backlinks
+        * @param $table String
+        * @return bool
+        */
+       public function hasLinks( $table ) {
+               return ( $this->getNumLinks( $table, 1 ) > 0 );
+       }
+
        /**
         * Get the approximate number of backlinks
         * @param $table String
+        * @param $max integer Only count up to this many backlinks
         * @return integer
         */
-       public function getNumLinks( $table ) {
+       public function getNumLinks( $table, $max = INF ) {
                global $wgMemc;
 
                // 1) try partition cache ...
                if ( isset( $this->partitionCache[$table] ) ) {
                        $entry = reset( $this->partitionCache[$table] );
-                       return $entry['numRows'];
+                       return min( $max, $entry['numRows'] );
                }
 
                // 2) ... then try full result cache ...
                if ( isset( $this->fullResultCache[$table] ) ) {
-                       return $this->fullResultCache[$table]->numRows();
+                       return min( $max, $this->fullResultCache[$table]->numRows() );
                }
 
                $memcKey = wfMemcKey( 'numbacklinks', md5( $this->title->getPrefixedDBkey() ), $table );
@@ -321,12 +331,22 @@ class BacklinkCache {
                // 3) ... fallback to memcached ...
                $count = $wgMemc->get( $memcKey );
                if ( $count ) {
-                       return $count;
+                       return min( $max, $count );
                }
 
                // 4) fetch from the database ...
-               $count = $this->getLinks( $table )->count();
-               $wgMemc->set( $memcKey, $count, self::CACHE_EXPIRY );
+               if ( is_infinite( $max ) ) { // full count
+                       $count = $this->getLinks( $table )->count();
+                       $wgMemc->set( $memcKey, $count, self::CACHE_EXPIRY );
+               } else { // with limit
+                       $count = $this->getDB()->select(
+                               array( $table, 'page' ),
+                               '1',
+                               $this->getConditions( $table ),
+                               __METHOD__,
+                               array( 'LIMIT' => $max )
+                       )->numRows();
+               }
 
                return $count;
        }
index 51a28ca..791ae3e 100644 (file)
 
 /**
  * Class to invalidate the HTML cache of all the pages linking to a given title.
- * Small numbers of links will be done immediately, large numbers are pushed onto
- * the job queue.
- *
- * This class is designed to work efficiently with small numbers of links, and
- * to work reasonably well with up to ~10^5 links. Above ~10^6 links, the memory
- * and time requirements of loading all backlinked IDs in doUpdate() might become
- * prohibitive. The requirements measured at Wikimedia are approximately:
- *
- *   memory: 48 bytes per row
- *   time: 16us per row for the query plus processing
- *
- * The reason this query is done is to support partitioning of the job
- * by backlinked ID. The memory issue could be allieviated by doing this query in
- * batches, but of course LIMIT with an offset is inefficient on the DB side.
- *
- * The class is nevertheless a vast improvement on the previous method of using
- * File::getLinksTo() and Title::touchArray(), which uses about 2KB of memory per
- * link.
  *
  * @ingroup Cache
  */
@@ -50,8 +32,7 @@ class HTMLCacheUpdate implements DeferrableUpdate {
         */
        public $mTitle;
 
-       public $mTable, $mPrefix, $mStart, $mEnd;
-       public $mRowsPerJob, $mRowsPerQuery;
+       public $mTable;
 
        /**
         * @param $titleTo
@@ -59,172 +40,31 @@ class HTMLCacheUpdate implements DeferrableUpdate {
         * @param $start bool
         * @param $end bool
         */
-       function __construct( $titleTo, $table, $start = false, $end = false ) {
-               global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
-
+       function __construct( Title $titleTo, $table ) {
                $this->mTitle = $titleTo;
                $this->mTable = $table;
-               $this->mStart = $start;
-               $this->mEnd = $end;
-               $this->mRowsPerJob = $wgUpdateRowsPerJob;
-               $this->mRowsPerQuery = $wgUpdateRowsPerQuery;
-               $this->mCache = $this->mTitle->getBacklinkCache();
        }
 
        public function doUpdate() {
-               if ( $this->mStart || $this->mEnd ) {
-                       $this->doPartialUpdate();
-                       return;
-               }
-
-               # Get an estimate of the number of rows from the BacklinkCache
-               $numRows = $this->mCache->getNumLinks( $this->mTable );
-               if ( $numRows > $this->mRowsPerJob * 2 ) {
-                       # Do fast cached partition
-                       $this->insertJobs();
-               } else {
-                       # Get the links from the DB
-                       $titleArray = $this->mCache->getLinks( $this->mTable );
-                       # Check if the row count estimate was correct
-                       if ( $titleArray->count() > $this->mRowsPerJob * 2 ) {
-                               # Not correct, do accurate partition
-                               wfDebug( __METHOD__.": row count estimate was incorrect, repartitioning\n" );
-                               $this->insertJobsFromTitles( $titleArray );
-                       } else {
-                               $this->invalidateTitles( $titleArray );
-                       }
-               }
-       }
-
-       /**
-        * Update some of the backlinks, defined by a page ID range
-        */
-       protected function doPartialUpdate() {
-               $titleArray = $this->mCache->getLinks( $this->mTable, $this->mStart, $this->mEnd );
-               if ( $titleArray->count() <= $this->mRowsPerJob * 2 ) {
-                       # This partition is small enough, do the update
-                       $this->invalidateTitles( $titleArray );
-               } else {
-                       # Partitioning was excessively inaccurate. Divide the job further.
-                       # This can occur when a large number of links are added in a short
-                       # period of time, say by updating a heavily-used template.
-                       $this->insertJobsFromTitles( $titleArray );
-               }
-       }
-
-       /**
-        * Partition the current range given by $this->mStart and $this->mEnd,
-        * using a pre-calculated title array which gives the links in that range.
-        * Queue the resulting jobs.
-        *
-        * @param $titleArray array
-        */
-       protected function insertJobsFromTitles( $titleArray ) {
-               # We make subpartitions in the sense that the start of the first job
-               # will be the start of the parent partition, and the end of the last
-               # job will be the end of the parent partition.
-               $jobs = array();
-               $start = $this->mStart; # start of the current job
-               $numTitles = 0;
-               foreach ( $titleArray as $title ) {
-                       $id = $title->getArticleID();
-                       # $numTitles is now the number of titles in the current job not
-                       # including the current ID
-                       if ( $numTitles >= $this->mRowsPerJob ) {
-                               # Add a job up to but not including the current ID
-                               $params = array(
-                                       'table' => $this->mTable,
-                                       'start' => $start,
-                                       'end' => $id - 1
-                               );
-                               $jobs[] = new HTMLCacheUpdateJob( $this->mTitle, $params );
-                               $start = $id;
-                               $numTitles = 0;
-                       }
-                       $numTitles++;
-               }
-               # Last job
-               $params = array(
-                       'table' => $this->mTable,
-                       'start' => $start,
-                       'end' => $this->mEnd
-               );
-               $jobs[] = new HTMLCacheUpdateJob( $this->mTitle, $params );
-               wfDebug( __METHOD__.": repartitioning into " . count( $jobs ) . " jobs\n" );
-
-               if ( count( $jobs ) < 2 ) {
-                       # I don't think this is possible at present, but handling this case
-                       # makes the code a bit more robust against future code updates and
-                       # avoids a potential infinite loop of repartitioning
-                       wfDebug( __METHOD__.": repartitioning failed!\n" );
-                       $this->invalidateTitles( $titleArray );
-                       return;
-               }
-
-               Job::batchInsert( $jobs );
-       }
+               wfProfileIn( __METHOD__ );
 
-       /**
-        * @return mixed
-        */
-       protected function insertJobs() {
-               $batches = $this->mCache->partition( $this->mTable, $this->mRowsPerJob );
-               if ( !$batches ) {
-                       return;
-               }
-               $jobs = array();
-               foreach ( $batches as $batch ) {
-                       $params = array(
+               $job = new HTMLCacheUpdateJob(
+                       $this->mTitle,
+                       array(
                                'table' => $this->mTable,
-                               'start' => $batch[0],
-                               'end' => $batch[1],
-                       );
-                       $jobs[] = new HTMLCacheUpdateJob( $this->mTitle, $params );
-               }
-               Job::batchInsert( $jobs );
-       }
-
-       /**
-        * Invalidate an array (or iterator) of Title objects, right now
-        * @param $titleArray array
-        */
-       protected function invalidateTitles( $titleArray ) {
-               global $wgUseFileCache, $wgUseSquid;
-
-               $dbw = wfGetDB( DB_MASTER );
-               $timestamp = $dbw->timestamp();
-
-               # Get all IDs in this query into an array
-               $ids = array();
-               foreach ( $titleArray as $title ) {
-                       $ids[] = $title->getArticleID();
-               }
-
-               if ( !$ids ) {
-                       return;
-               }
-
-               # Update page_touched
-               $batches = array_chunk( $ids, $this->mRowsPerQuery );
-               foreach ( $batches as $batch ) {
-                       $dbw->update( 'page',
-                               array( 'page_touched' => $timestamp ),
-                               array( 'page_id' => $batch ),
-                               __METHOD__
-                       );
-               }
+                       ) + Job::newRootJobParams( // "overall" refresh links job info
+                               "htmlCacheUpdate:{$this->mTable}:{$this->mTitle->getPrefixedText()}"
+                       )
+               );
 
-               # Update squid
-               if ( $wgUseSquid ) {
-                       $u = SquidUpdate::newFromTitles( $titleArray );
-                       $u->doUpdate();
+               $count = $this->mTitle->getBacklinkCache()->getNumLinks( $this->mTable, 200 );
+               if ( $count >= 200 ) { // many backlinks
+                       JobQueueGroup::singleton()->push( $job );
+                       JobQueueGroup::singleton()->deduplicateRootJob( $job );
+               } else { // few backlinks ($count might be off even if 0)
+                       $job->run(); // just do the purge query now
                }
 
-               # Update file cache
-               if  ( $wgUseFileCache ) {
-                       foreach ( $titleArray as $title ) {
-                               HTMLFileCache::clearFileCache( $title );
-                       }
-               }
+               wfProfileOut( __METHOD__ );
        }
 }
index 0d2803e..927ca4e 100644 (file)
@@ -178,6 +178,51 @@ abstract class Job {
                return $this->removeDuplicates;
        }
 
+       /**
+        * Subclasses may need to override this to make duplication detection work
+        *
+        * @return Array Map of key/values
+        */
+       public function getDeduplicationInfo() {
+               $info = array(
+                       'type'      => $this->getType(),
+                       'namespace' => $this->getTitle()->getNamespace(),
+                       'title'     => $this->getTitle()->getDBkey(),
+                       'params'    => $this->getParams()
+               );
+               // Identical jobs with different "root" jobs should count as duplicates
+               if ( is_array( $info['params'] ) ) {
+                       unset( $info['params']['rootJobSignature'] );
+                       unset( $info['params']['rootJobTimestamp'] );
+               }
+               return $info;
+       }
+
+       /**
+        * @param $key string A key that identifies the task
+        * @return Array
+        */
+       public static function newRootJobParams( $key ) {
+               return array(
+                       'rootJobSignature' => sha1( $key ),
+                       'rootJobTimestamp' => wfTimestampNow()
+               );
+       }
+
+       /**
+        * @return Array
+        */
+       public function getRootJobParams() {
+               return array(
+                       'rootJobSignature' => isset( $this->params['rootJobSignature'] )
+                               ? $this->params['rootJobSignature']
+                               : null,
+                       'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
+                               ? $this->params['rootJobTimestamp']
+                               : null
+               );
+       }
+
        /**
         * Insert a single job into the queue.
         * @return bool true on success
index aa3d6e2..e88441d 100644 (file)
@@ -183,6 +183,53 @@ abstract class JobQueue {
         */
        abstract protected function doAck( Job $job );
 
+       /**
+        * Register the "root job" of a given job into the queue for de-duplication.
+        * This should only be called right *after* all the new jobs have been inserted.
+        * This is used to turn older, duplicate, job entries into no-ops. The root job
+        * information will remain in the registry until it simply falls out of cache.
+        *
+        * This requires that $job has two special fields in the "params" array:
+        *   - rootJobSignature : hash (e.g. SHA1) that identifies the task
+        *   - rootJobTimestamp : TS_MW timestamp of this instance of the task
+        *
+        * A "root job" is a conceptual job that consist of potentially many smaller jobs
+        * that are actually inserted into the queue. For example, "refreshLinks" jobs are
+        * spawned when a template is edited. One can think of the task as "update links
+        * of pages that use template X" and an instance of that task as a "root job".
+        * However, what actually goes into the queue are potentially many refreshLinks2 jobs.
+        * Since these jobs include things like page ID ranges and DB master positions, and morph
+        * into smaller refreshLinks2 jobs recursively, simple duplicate detection (like job_sha1)
+        * for individual jobs being identical is not useful.
+        *
+        * In the case of "refreshLinks", if these jobs are still in the queue when the template
+        * is edited again, we want all of these old refreshLinks jobs for that template to become
+        * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
+        * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
+        * previous "root job" for the same task of "update links of pages that use template X".
+        *
+        * @param $job Job
+        * @return bool
+        */
+       final public function deduplicateRootJob( Job $job ) {
+               if ( $job->getType() !== $this->type ) {
+                       throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+               }
+               wfProfileIn( __METHOD__ );
+               $ok = $this->doDeduplicateRootJob( $job );
+               wfProfileOut( __METHOD__ );
+               return $ok;
+       }
+
+       /**
+        * @see JobQueue::deduplicateRootJob()
+        * @param $job Job
+        * @return bool
+        */
+       protected function doDeduplicateRootJob( Job $job ) {
+               return true;
+       }
+
        /**
         * Wait for any slaves or backup servers to catch up
         *
index cbb2391..0c01db7 100644 (file)
@@ -165,6 +165,10 @@ class JobQueueDB extends JobQueue {
                                        );
                                        wfIncrStats( 'job-pop', $dbw->affectedRows() );
                                }
+                               // Flag this job as an old duplicate based on its "root" job...
+                               if ( $this->isRootJobOldDuplicate( $job ) ) {
+                                       $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
+                               }
                                break; // done
                        } while( true );
                } catch ( DBError $e ) {
@@ -353,6 +357,62 @@ class JobQueueDB extends JobQueue {
                return true;
        }
 
+       /**
+        * @see JobQueue::doDeduplicateRootJob()
+        * @return bool
+        */
+       protected function doDeduplicateRootJob( Job $job ) {
+               $params = $job->getParams();
+               if ( !isset( $params['rootJobSignature'] ) ) {
+                       throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+                       throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+               }
+               $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+               // Callers should call batchInsert() and then this function so that if the insert
+               // fails, the de-duplication registration will be aborted. Since the insert is
+               // deferred till "transaction idle", do that same here, so that the ordering is
+               // maintained. Having only the de-duplication registration succeed would cause
+               // jobs to become no-ops without any actual jobs that made them redundant.
+               $this->getMasterDB()->onTransactionIdle( function() use ( $params, $key ) {
+                       global $wgMemc;
+
+                       $timestamp = $wgMemc->get( $key ); // current last timestamp of this job
+                       if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+                               return true; // a newer version of this root job was enqueued
+                       }
+
+                       // Update the timestamp of the last root job started at the location...
+                       return $wgMemc->set( $key, $params['rootJobTimestamp'], 14*86400 ); // 2 weeks
+               } );
+
+               return true;
+       }
+
+       /**
+        * Check if the "root" job of a given job has been superseded by a newer one
+        *
+        * @param $job Job
+        * @return bool
+        */
+       protected function isRootJobOldDuplicate( Job $job ) {
+               global $wgMemc;
+
+               $params = $job->getParams();
+               if ( !isset( $params['rootJobSignature'] ) ) {
+                       return false; // job has no de-deplication info
+               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+                       trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
+                       return false;
+               }
+
+               // Get the last time this root job was enqueued
+               $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
+
+               // Check if a new root job was started at the location after this one's...
+               return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+       }
+
        /**
         * @see JobQueue::doWaitForBackups()
         * @return void
@@ -380,22 +440,22 @@ class JobQueueDB extends JobQueue {
         * @return array
         */
        protected function insertFields( Job $job ) {
-               // Rows that describe the nature of the job
-               $descFields = array(
+               $dbw = $this->getMasterDB();
+               return array(
+                       // Fields that describe the nature of the job
                        'job_cmd'       => $job->getType(),
                        'job_namespace' => $job->getTitle()->getNamespace(),
                        'job_title'     => $job->getTitle()->getDBkey(),
                        'job_params'    => self::makeBlob( $job->getParams() ),
-               );
-               // Additional job metadata
-               $dbw = $this->getMasterDB();
-               $metaFields = array(
+                       // Additional job metadata
                        'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
                        'job_timestamp' => $dbw->timestamp(),
-                       'job_sha1'      => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ),
+                       'job_sha1'      => wfBaseConvert(
+                               sha1( serialize( $job->getDeduplicationInfo() ) ),
+                               16, 36, 31
+                       ),
                        'job_random'    => mt_rand( 0, self::MAX_JOB_RANDOM )
                );
-               return ( $descFields + $metaFields );
        }
 
        /**
@@ -406,6 +466,15 @@ class JobQueueDB extends JobQueue {
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'isempty' );
        }
 
+       /**
+        * @param string $signature Hash identifier of the root job
+        * @return string
+        */
+       private function getRootJobCacheKey( $signature ) {
+               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
+       }
+
        /**
         * @param $params
         * @return string
index 48f2746..97e0598 100644 (file)
@@ -132,6 +132,17 @@ class JobQueueGroup {
                return $this->get( $job->getType() )->ack( $job );
        }
 
+       /**
+        * Register the "root job" of a given job into the queue for de-duplication.
+        * This should only be called right *after* all the new jobs have been inserted.
+        *
+        * @param $job Job
+        * @return bool
+        */
+       public function deduplicateRootJob( Job $job ) {
+               return $this->get( $job->getType() )->deduplicateRootJob( $job );
+       }
+
        /**
         * Get the list of queue types
         *
diff --git a/includes/job/jobs/DuplicateJob.php b/includes/job/jobs/DuplicateJob.php
new file mode 100644 (file)
index 0000000..6e056de
--- /dev/null
@@ -0,0 +1,58 @@
+<?php
+/**
+ * No-op job that does nothing.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+/**
+ * No-op job that does nothing. Used to represent duplicates.
+ *
+ * @ingroup JobQueue
+ */
+final class DuplicateJob extends Job {
+       /**
+        * Callers should use DuplicateJob::newFromJob() instead
+        *
+        * @param $title Title
+        * @param $params Array: job parameters
+        * @param $id Integer: job id
+        */
+       function __construct( $title, $params, $id = 0 ) {
+               parent::__construct( 'duplicate', $title, $params, $id );
+       }
+
+       /**
+        * Get a duplicate no-op version of a job
+        *
+        * @param Job $job
+        * @return Job
+        */
+       public static function newFromJob( Job $job ) {
+               $job = new self( $job->getTitle(), $job->getParams(), $job->getId() );
+               $job->command = $job->getType();
+               $job->params  = is_array( $job->params ) ? $job->params : array();
+               $job->params  = array( 'isDuplicate' => true ) + $job->params;
+               return $job;
+       }
+
+       public function run() {
+               return true;
+       }
+}
index 4e6fd6c..20245b3 100644 (file)
  * Job wrapper for HTMLCacheUpdate. Gets run whenever a related
  * job gets called from the queue.
  *
+ * This class is designed to work efficiently with small numbers of links, and
+ * to work reasonably well with up to ~10^5 links. Above ~10^6 links, the memory
+ * and time requirements of loading all backlinked IDs in doUpdate() might become
+ * prohibitive. The requirements measured at Wikimedia are approximately:
+ *
+ *   memory: 48 bytes per row
+ *   time: 16us per row for the query plus processing
+ *
+ * The reason this query is done is to support partitioning of the job
+ * by backlinked ID. The memory issue could be allieviated by doing this query in
+ * batches, but of course LIMIT with an offset is inefficient on the DB side.
+ *
+ * The class is nevertheless a vast improvement on the previous method of using
+ * File::getLinksTo() and Title::touchArray(), which uses about 2KB of memory per
+ * link.
+ *
  * @ingroup JobQueue
  */
 class HTMLCacheUpdateJob extends Job {
-       var $table, $start, $end;
+       /** @var BacklinkCache */
+       protected $blCache;
+
+       protected $rowsPerJob, $rowsPerQuery;
 
        /**
         * Construct a job
@@ -37,15 +56,199 @@ class HTMLCacheUpdateJob extends Job {
         * @param $id Integer: job id
         */
        function __construct( $title, $params, $id = 0 ) {
+               global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
+
                parent::__construct( 'htmlCacheUpdate', $title, $params, $id );
-               $this->table = $params['table'];
-               $this->start = $params['start'];
-               $this->end = $params['end'];
+
+               $this->rowsPerJob   = $wgUpdateRowsPerJob;
+               $this->rowsPerQuery = $wgUpdateRowsPerQuery;
+               $this->blCache      = $title->getBacklinkCache();
        }
 
        public function run() {
-               $update = new HTMLCacheUpdate( $this->title, $this->table, $this->start, $this->end );
-               $update->doUpdate();
+               if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
+                       # This is hit when a job is actually performed
+                       return $this->doPartialUpdate();
+               } else {
+                       # This is hit when the jobs have to be inserted
+                       return $this->doFullUpdate();
+               }
+       }
+
+       /**
+        * Update all of the backlinks
+        */
+       protected function doFullUpdate() {
+               # Get an estimate of the number of rows from the BacklinkCache
+               $numRows = $this->blCache->getNumLinks( $this->params['table'] );
+               if ( $numRows > $this->rowsPerJob * 2 ) {
+                       # Do fast cached partition
+                       $this->insertPartitionJobs();
+               } else {
+                       # Get the links from the DB
+                       $titleArray = $this->blCache->getLinks( $this->params['table'] );
+                       # Check if the row count estimate was correct
+                       if ( $titleArray->count() > $this->rowsPerJob * 2 ) {
+                               # Not correct, do accurate partition
+                               wfDebug( __METHOD__.": row count estimate was incorrect, repartitioning\n" );
+                               $this->insertJobsFromTitles( $titleArray );
+                       } else {
+                               $this->invalidateTitles( $titleArray ); // just do the query
+                       }
+               }
+               return true;
+       }
+
+       /**
+        * Update some of the backlinks, defined by a page ID range
+        */
+       protected function doPartialUpdate() {
+               $titleArray = $this->blCache->getLinks(
+                       $this->params['table'], $this->params['start'], $this->params['end'] );
+               if ( $titleArray->count() <= $this->rowsPerJob * 2 ) {
+                       # This partition is small enough, do the update
+                       $this->invalidateTitles( $titleArray );
+               } else {
+                       # Partitioning was excessively inaccurate. Divide the job further.
+                       # This can occur when a large number of links are added in a short
+                       # period of time, say by updating a heavily-used template.
+                       $this->insertJobsFromTitles( $titleArray );
+               }
                return true;
        }
+
+       /**
+        * Partition the current range given by $this->params['start'] and $this->params['end'],
+        * using a pre-calculated title array which gives the links in that range.
+        * Queue the resulting jobs.
+        *
+        * @param $titleArray array
+        * @param $rootJobParams array
+        * @rerturn void
+        */
+       protected function insertJobsFromTitles( $titleArray, $rootJobParams = array() ) {
+               // Carry over any "root job" information
+               $rootJobParams = $this->getRootJobParams();
+               # We make subpartitions in the sense that the start of the first job
+               # will be the start of the parent partition, and the end of the last
+               # job will be the end of the parent partition.
+               $jobs = array();
+               $start = $this->params['start']; # start of the current job
+               $numTitles = 0;
+               foreach ( $titleArray as $title ) {
+                       $id = $title->getArticleID();
+                       # $numTitles is now the number of titles in the current job not
+                       # including the current ID
+                       if ( $numTitles >= $this->rowsPerJob ) {
+                               # Add a job up to but not including the current ID
+                               $jobs[] = new HTMLCacheUpdateJob( $this->title,
+                                       array(
+                                               'table' => $this->params['table'],
+                                               'start' => $start,
+                                               'end'   => $id - 1
+                                       ) + $rootJobParams // carry over information for de-duplication
+                               );
+                               $start = $id;
+                               $numTitles = 0;
+                       }
+                       $numTitles++;
+               }
+               # Last job
+               $jobs[] = new HTMLCacheUpdateJob( $this->title,
+                       array(
+                               'table' => $this->params['table'],
+                               'start' => $start,
+                               'end'   => $this->params['end']
+                       ) + $rootJobParams // carry over information for de-duplication
+               );
+               wfDebug( __METHOD__.": repartitioning into " . count( $jobs ) . " jobs\n" );
+
+               if ( count( $jobs ) < 2 ) {
+                       # I don't think this is possible at present, but handling this case
+                       # makes the code a bit more robust against future code updates and
+                       # avoids a potential infinite loop of repartitioning
+                       wfDebug( __METHOD__.": repartitioning failed!\n" );
+                       $this->invalidateTitles( $titleArray );
+               } else {
+                       JobQueueGroup::singleton()->push( $jobs );
+               }
+       }
+
+       /**
+        * @param $rootJobParams array
+        * @return void
+        */
+       protected function insertPartitionJobs( $rootJobParams = array() ) {
+               // Carry over any "root job" information
+               $rootJobParams = $this->getRootJobParams();
+
+               $batches = $this->blCache->partition( $this->params['table'], $this->rowsPerJob );
+               if ( !count( $batches ) ) {
+                       return; // no jobs to insert
+               }
+
+               $jobs = array();
+               foreach ( $batches as $batch ) {
+                       list( $start, $end ) = $batch;
+                       $jobs[] = new HTMLCacheUpdateJob( $this->title,
+                               array(
+                                       'table' => $this->params['table'],
+                                       'start' => $start,
+                                       'end'   => $end,
+                               ) + $rootJobParams // carry over information for de-duplication
+                       );
+               }
+
+               JobQueueGroup::singleton()->push( $jobs );
+       }
+
+       /**
+        * Invalidate an array (or iterator) of Title objects, right now
+        * @param $titleArray array
+        */
+       protected function invalidateTitles( $titleArray ) {
+               global $wgUseFileCache, $wgUseSquid;
+
+               $dbw = wfGetDB( DB_MASTER );
+               $timestamp = $dbw->timestamp();
+
+               # Get all IDs in this query into an array
+               $ids = array();
+               foreach ( $titleArray as $title ) {
+                       $ids[] = $title->getArticleID();
+               }
+
+               if ( !$ids ) {
+                       return;
+               }
+
+               # Don't invalidated pages that were already invalidated
+               $touchedCond = isset( $this->params['rootJobTimestamp'] )
+                       ? array( "page_touched < " .
+                               $dbw->addQuotes( $dbw->timestamp( $this->params['rootJobTimestamp'] ) ) )
+                       : array();
+
+               # Update page_touched
+               $batches = array_chunk( $ids, $this->rowsPerQuery );
+               foreach ( $batches as $batch ) {
+                       $dbw->update( 'page',
+                               array( 'page_touched' => $timestamp ),
+                               array( 'page_id' => $batch ) + $touchedCond,
+                               __METHOD__
+                       );
+               }
+
+               # Update squid
+               if ( $wgUseSquid ) {
+                       $u = SquidUpdate::newFromTitles( $titleArray );
+                       $u->doUpdate();
+               }
+
+               # Update file cache
+               if  ( $wgUseFileCache ) {
+                       foreach ( $titleArray as $title ) {
+                               HTMLFileCache::clearFileCache( $title );
+                       }
+               }
+       }
 }
index eef3bf7..99a8429 100644 (file)
@@ -1,6 +1,6 @@
 <?php
 /**
- * Degenerate job that just replaces itself in the queue.
+ * Degenerate job that does nothing.
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -22,8 +22,9 @@
  */
 
 /**
- * Degenerate job that just replace itself in the queue.
- * Useful for lock contention and performance testing.
+ * Degenerate job that does nothing, but can optionally replace itself
+ * in the queue and/or sleep for a brief time period. These can be used
+ * to represent "no-op" jobs or test lock contention and performance.
  *
  * @ingroup JobQueue
  */
index 86c808b..20e4f16 100644 (file)
@@ -69,6 +69,24 @@ class RefreshLinksJob extends Job {
                return true;
        }
 
+       /**
+        * @return Array
+        */
+       public function getDeduplicationInfo() {
+               $info = parent::getDeduplicationInfo();
+               // Don't let highly unique "masterPos" values ruin duplicate detection
+               if ( is_array( $info['params'] ) ) {
+                       unset( $info['params']['masterPos'] );
+               }
+               return $info;
+       }
+
+       /**
+        * @param $title Title
+        * @param $revision Revision
+        * @param $fname string
+        * @return void
+        */
        public static function runForTitleInternal( Title $title, Revision $revision, $fname ) {
                wfProfileIn( $fname );
                $content = $revision->getContent( Revision::RAW );
@@ -94,8 +112,6 @@ class RefreshLinksJob extends Job {
  * @ingroup JobQueue
  */
 class RefreshLinksJob2 extends Job {
-       const MAX_TITLES_RUN = 10;
-
        function __construct( $title, $params, $id = 0 ) {
                parent::__construct( 'refreshLinks2', $title, $params, $id );
        }
@@ -105,6 +121,8 @@ class RefreshLinksJob2 extends Job {
         * @return boolean success
         */
        function run() {
+               global $wgUpdateRowsPerJob;
+
                wfProfileIn( __METHOD__ );
 
                $linkCache = LinkCache::singleton();
@@ -114,16 +132,16 @@ class RefreshLinksJob2 extends Job {
                        $this->error = "refreshLinks2: Invalid title";
                        wfProfileOut( __METHOD__ );
                        return false;
-               } elseif ( !isset( $this->params['start'] ) || !isset( $this->params['end'] ) ) {
-                       $this->error = "refreshLinks2: Invalid params";
-                       wfProfileOut( __METHOD__ );
-                       return false;
                }
 
                // Back compat for pre-r94435 jobs
                $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks';
 
-               // Avoid slave lag when fetching templates
+               // Avoid slave lag when fetching templates.
+               // When the outermost job is run, we know that the caller that enqueued it must have
+               // committed the relevant changes to the DB by now. At that point, record the master
+               // position and pass it along as the job recursively breaks into smaller range jobs.
+               // Hopefully, when leaf jobs are popped, the slaves will have reached that position.
                if ( isset( $this->params['masterPos'] ) ) {
                        $masterPos = $this->params['masterPos'];
                } elseif ( wfGetLB()->getServerCount() > 1  ) {
@@ -132,73 +150,77 @@ class RefreshLinksJob2 extends Job {
                        $masterPos = false;
                }
 
-               $titles = $this->title->getBacklinkCache()->getLinks(
-                       $table, $this->params['start'], $this->params['end'] );
-
-               if ( $titles->count() > self::MAX_TITLES_RUN ) {
-                       # We don't want to parse too many pages per job as it can starve other jobs.
-                       # If there are too many pages to parse, break this up into smaller jobs. By passing
-                       # in the master position here we can cut down on the time spent waiting for slaves to
-                       # catch up by the runners handling these jobs since time will have passed between now
-                       # and when they pop these jobs off the queue.
-                       $start = 0; // batch start
-                       $end   = 0; // batch end
-                       $bsize = 0; // batch size
-                       $first = true; // first of batch
-                       $jobs  = array();
-                       foreach ( $titles as $title ) {
-                               $start = $first ? $title->getArticleId() : $start;
-                               $end   = $title->getArticleId();
-                               $first = false;
-                               if ( ++$bsize >= self::MAX_TITLES_RUN ) {
-                                       $jobs[] = new RefreshLinksJob2( $this->title, array(
-                                               'table'     => $table,
-                                               'start'     => $start,
-                                               'end'       => $end,
-                                               'masterPos' => $masterPos
-                                       ) );
-                                       $first = true;
-                                       $start = $end = $bsize = 0;
-                               }
-                       }
-                       if ( $bsize > 0 ) { // group remaining pages into a job
-                               $jobs[] = new RefreshLinksJob2( $this->title, array(
-                                       'table'     => $table,
-                                       'start'     => $start,
-                                       'end'       => $end,
-                                       'masterPos' => $masterPos
-                               ) );
-                       }
-                       Job::batchInsert( $jobs );
-               } elseif ( php_sapi_name() != 'cli' ) {
-                       # Not suitable for page load triggered job running!
-                       # Gracefully switch to refreshLinks jobs if this happens.
-                       $jobs = array();
-                       foreach ( $titles as $title ) {
-                               $jobs[] = new RefreshLinksJob( $title, array( 'masterPos' => $masterPos ) );
-                       }
-                       Job::batchInsert( $jobs );
+               $tbc  = $this->title->getBacklinkCache();
+
+               $jobs = array(); // jobs to insert
+               if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) {
+                       # This is a partition job to trigger the insertion of leaf jobs...
+                       $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
                } else {
-                       # Wait for the DB of the current/next slave DB handle to catch up to the master.
-                       # This way, we get the correct page_latest for templates or files that just changed
-                       # milliseconds ago, having triggered this job to begin with.
-                       if ( $masterPos ) {
-                               wfGetLB()->waitFor( $masterPos );
-                       }
-                       # Re-parse each page that transcludes this page and update their tracking links...
-                       foreach ( $titles as $title ) {
-                               $revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL );
-                               if ( !$revision ) {
-                                       $this->error = 'refreshLinks: Article not found "' .
-                                               $title->getPrefixedDBkey() . '"';
-                                       continue; // skip this page
+                       # This is a base job to trigger the insertion of partitioned jobs...
+                       if ( $tbc->getNumLinks( $table ) <= $wgUpdateRowsPerJob ) {
+                               # Just directly insert the single per-title jobs
+                               $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) );
+                       } else {
+                               # Insert the partition jobs to make per-title jobs
+                               foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) {
+                                       list( $start, $end ) = $batch;
+                                       $jobs[] = new RefreshLinksJob2( $this->title,
+                                               array(
+                                                       'table'            => $table,
+                                                       'start'            => $start,
+                                                       'end'              => $end,
+                                                       'masterPos'        => $masterPos,
+                                               ) + $this->getRootJobParams() // carry over information for de-duplication
+                                       );
                                }
-                               RefreshLinksJob::runForTitleInternal( $title, $revision, __METHOD__ );
-                               wfWaitForSlaves();
                        }
                }
 
+               if ( count( $jobs ) ) {
+                       JobQueueGroup::singleton()->push( $jobs );
+               }
+
                wfProfileOut( __METHOD__ );
                return true;
        }
+
+       /**
+        * @param $table string
+        * @param $masterPos mixed
+        * @return Array
+        */
+       protected function getSingleTitleJobs( $table, $masterPos ) {
+               # The "start"/"end" fields are not set for the base jobs
+               $start  = isset( $this->params['start'] ) ? $this->params['start'] : false;
+               $end    = isset( $this->params['end'] ) ? $this->params['end'] : false;
+               $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end );
+               # Convert into single page refresh links jobs.
+               # This handles well when in sapi mode and is useful in any case for job
+               # de-duplication. If many pages use template A, and that template itself
+               # uses template B, then an edit to both will create many duplicate jobs.
+               # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will
+               # get run first, and when it does, it will remove the duplicates. Of course,
+               # one page could have its job popped when the other page's job is still
+               # buried within the logic of a refreshLinks2 job.
+               $jobs = array();
+               foreach ( $titles as $title ) {
+                       $jobs[] = new RefreshLinksJob( $title,
+                               array( 'masterPos' => $masterPos ) + $this->getRootJobParams()
+                       ); // carry over information for de-duplication
+               }
+               return $jobs;
+       }
+
+       /**
+        * @return Array
+        */
+       public function getDeduplicationInfo() {
+               $info = parent::getDeduplicationInfo();
+               // Don't let highly unique "masterPos" values ruin duplicate detection
+               if ( is_array( $info['params'] ) ) {
+                       unset( $info['params']['masterPos'] );
+               }
+               return $info;
+       }
 }