Added job table, for deferred processing of jobs. The immediate application is to...
authorTim Starling <tstarling@users.mediawiki.org>
Fri, 24 Feb 2006 01:56:31 +0000 (01:56 +0000)
committerTim Starling <tstarling@users.mediawiki.org>
Fri, 24 Feb 2006 01:56:31 +0000 (01:56 +0000)
RELEASE-NOTES
includes/DefaultSettings.php
includes/JobQueue.php [new file with mode: 0644]
includes/LinksUpdate.php
includes/Wiki.php
maintenance/archives/patch-job.sql [new file with mode: 0644]
maintenance/mysql5/tables.sql
maintenance/runJobs.php [new file with mode: 0644]
maintenance/tables.sql
maintenance/updaters.inc

index e29c0f2..facbf03 100644 (file)
@@ -65,6 +65,9 @@ Database:
 * Respect database prefix in dumpHTML.inc
 * Removed read-only check from Database::query()
 * Added externallinks table, to track links to arbitrary URLs
+* Added job table, for deferred processing of jobs. The immediate application is 
+  to complete the link table refresh operation when templates are changed.
+
 
 Documentation:
 * (bug 3306) Document $wgLocalTZoffset
index 71c81f6..ddbd9cc 100644 (file)
@@ -1865,4 +1865,18 @@ $wgFilterRobotsWL = false;
  */
 $wgAllowCategorizedRecentChanges = false ;
 
+/**
+ * Number of jobs to perform per request. May be less than one in which case
+ * jobs are performed probabalistically. If this is zero, jobs will not be done
+ * during ordinary apache requests. In this case, maintenance/doJobs.php should 
+ * be run periodically.
+ */
+$wgJobRunRate = 1;
+
+/**
+ * Log file for job execution
+ */
+$wgJobLogFile = false;
+
+
 ?>
diff --git a/includes/JobQueue.php b/includes/JobQueue.php
new file mode 100644 (file)
index 0000000..b18a819
--- /dev/null
@@ -0,0 +1,182 @@
+<?php
+
+if ( !defined( 'MEDIAWIKI' ) ) {
+       die( "This file is part of MediaWiki, it is not a valid entry point\n" );
+}
+
+class Job {
+       var $command,
+               $title,
+               $params, 
+               $removeDuplicates, 
+               $error;
+
+       /*-------------------------------------------------------------------------
+        * Static functions
+        *------------------------------------------------------------------------*/
+       /**
+        * Add an array of refreshLinks jobs to the queue
+        * @param array $titles Array of title objects. 
+        * @static
+        */
+       function queueLinksJobs( $titles ) {
+               $fname = 'Job::queueLinksJobs';
+               wfProfileIn( $fname );
+               foreach ( $titles as $title ) {
+                       $job = new Job( 'refreshLinks', $title );
+                       $job->insert();
+               }
+               wfProfileOut( $fname );
+       }
+
+       /**
+        * Pop a job off the front of the queue
+        * @static
+        * @return Job or false if there's no jobs
+        */
+       function pop() {
+               $fname = 'Job::pop';
+               wfProfileIn( $fname );
+
+               // First check to see if there are any jobs in the slave DB
+               $dbr =& wfGetDB( DB_SLAVE );
+               $id = $dbr->selectField( 'job', 'job_id', '', $fname, array( 'LIMIT' => 1 ) );
+               if ( $id === false ) {
+                       wfProfileOut( $fname );
+                       return false;
+               }
+
+               // Pop an item off the front of the queue
+               // Method due to Domas, may not work on all DBMSes
+               $dbw =& wfGetDB( DB_MASTER );
+               $jobTable = $dbw->tableName( 'job' );
+               $dbw->query( "DELETE FROM $jobTable WHERE " .
+                       '(job_cmd = @job_cmd := job_cmd) AND ' .
+                       '(job_namespace = @job_namespace := job_namespace) AND ' .
+                       '(job_title = @job_title := job_title) AND ' .
+                       '(job_params = @job_params := job_params) ' .
+                       'ORDER BY job_id LIMIT 1', $fname );
+               $affected = $dbw->affectedRows();
+               // Commit now before 100 other threads pile up behind us
+               $dbw->immediateCommit();
+               if ( !$affected ) {
+                       wfProfileOut( $fname );
+                       return false;
+               }
+
+               $res = $dbw->query( "SELECT @job_cmd, @job_namespace, @job_title, @job_params", $fname );
+               $row = $dbw->fetchRow( $res );
+               if ( !$row ) {
+                       wfProfileOut( $fname );
+                       return false;
+               }
+
+               $command = $row['@job_cmd'];
+               $namespace = $row['@job_namespace'];
+               $dbkey = $row['@job_title'];
+               $title = Title::makeTitleSafe( $namespace, $dbkey );
+               $params = $row['@job_params'];
+               $job = new Job( $command, $title, $params );
+               wfProfileOut( $fname );
+               return $job;
+       }
+
+       /*-------------------------------------------------------------------------
+        * Non-static functions
+        *------------------------------------------------------------------------*/
+
+       function Job( $command, $title, $params = '' ) {
+               $this->command = $command;
+               $this->title = $title;
+               $this->params = $params;
+
+               // A bit of premature generalisation
+               // Oh well, the whole class is premature generalisation really
+               $this->removeDuplicates = true;
+       }
+
+       function insert() {
+               $fname = 'Job::insert';
+
+               $fields = array(
+                       'job_cmd' => $this->command,
+                       'job_namespace' => $this->title->getNamespace(),
+                       'job_title' => $this->title->getDBkey(),
+                       'job_params' => $this->params
+               );
+
+               $dbw =& wfGetDB( DB_MASTER );
+               
+               if ( $this->removeDuplicates ) {
+                       $dbw->delete( 'job', $fields, $fname );
+               }
+               $fields['job_id'] = $dbw->nextSequenceValue( 'job_job_id_seq' );
+               $dbw->insert( 'job', $fields, $fname );
+       }
+
+       /**
+        * Run the job
+        * @return boolean success
+        */
+       function run() {
+               $fname = 'Job::run';
+               wfProfileIn( $fname );
+               switch ( $this->command ) {
+                       case 'refreshLinks':
+                               $retval = $this->refreshLinks();
+                               break;
+                       default:
+                               $this->error = "Invalid job type {$this->command}, ignoring";
+                               wfDebug( $this->error . "\n" );
+                               $retval = false;
+               }
+               wfProfileOut( $fname );
+               return $retval;
+       }
+
+       /**
+        * Run a refreshLinks job
+        * @return boolean success
+        */
+       function refreshLinks() {
+               global $wgParser;
+               
+               $dbw =& wfGetDB( DB_MASTER );
+
+               $linkCache =& LinkCache::singleton();
+               $linkCache->clear();
+               
+               if ( is_null( $this->title ) ) {
+                       $this->error = "refreshLinks: Invalid title";
+                       return false;
+               }
+
+               $revision = Revision::newFromTitle( $this->title );
+               if ( !$revision ) {
+                       $this->error = 'refreshLinks: Article not found "' . $this->title->getPrefixedDBkey() . '"';
+                       return false;
+               }
+
+               $options = new ParserOptions;
+               $parserOutput = $wgParser->parse( $revision->getText(), $this->title, $options, true, true, $revision->getId() );
+               $update = new LinksUpdate( $this->title, $parserOutput );
+               $update->doUpdate();
+               return true;
+       }
+
+       function toString() {
+               if ( is_object( $this->title ) ) {
+                       $s = "{$this->command} " . $this->title->getPrefixedDBkey();
+                       if ( $this->params !== '' ) {
+                               $s .= ', ' . $this->params;
+                       }
+                       return $s;
+               } else {
+                       return "{$this->command} {$this->params}";
+               }                       
+       }
+
+       function getLastError() {
+               return $this->error;
+       }
+}
index f909c35..f1d217b 100644 (file)
@@ -15,7 +15,6 @@ class LinksUpdate {
         */
        var $mId,            # Page ID of the article linked from
                $mTitle,         # Title object of the article linked from
-               $mParserOutput,  # Parser output containing the links to be inserted into the database
                $mLinks,         # Map of title strings to IDs for the links in the document
                $mImages,        # DB keys of the images used, in the array key only
                $mTemplates,     # Map of title strings to IDs for the template references, including broken ones
@@ -47,14 +46,12 @@ class LinksUpdate {
                }
                $this->mTitle = $title;
                $this->mId = $title->getArticleID();
-               $this->mParserOutput = $parserOutput;
 
-               // Shortcut aliases
-               $this->mLinks =& $this->mParserOutput->getLinks();
-               $this->mImages =& $this->mParserOutput->getImages();
-               $this->mTemplates =& $this->mParserOutput->getTemplates();
-               $this->mExternals =& $this->mParserOutput->getExternalLinks();
-               $this->mCategories =& $this->mParserOutput->getCategories();
+               $this->mLinks = $parserOutput->getLinks();
+               $this->mImages = $parserOutput->getImages();
+               $this->mTemplates = $parserOutput->getTemplates();
+               $this->mExternals = $parserOutput->getExternalLinks();
+               $this->mCategories = $parserOutput->getCategories();
 
        }
 
@@ -79,11 +76,6 @@ class LinksUpdate {
                $this->incrTableUpdate( 'pagelinks', 'pl', $this->getLinkDeletions( $existing ),
                        $this->getLinkInsertions( $existing ) );
 
-               # Template links
-               $existing = $this->getExistingTemplates();
-               $this->incrTableUpdate( 'templatelinks', 'tl', $this->getTemplateDeletions( $existing ),
-                       $this->getTemplateInsertions( $existing ) );
-
                # Image links
                $existing = $this->getExistingImages();
                $this->incrTableUpdate( 'imagelinks', 'il', $this->getImageDeletions( $existing ),
@@ -93,7 +85,19 @@ class LinksUpdate {
                $existing = $this->getExistingExternals();
                $this->incrTableUpdate( 'externallinks', 'el', $this->getExternalDeletions( $existing ),
                        $this->getExternalInsertions( $existing ) );
-               
+
+               # Template links
+               $existing = $this->getExistingTemplates();
+               $this->incrTableUpdate( 'templatelinks', 'tl', $this->getTemplateDeletions( $existing ),
+                       $this->getTemplateInsertions( $existing ) );
+
+               # Refresh links of all pages including this page
+               $tlto = $this->mTitle->getTemplateLinksTo();
+               if ( count( $tlto ) ) {
+                       require_once( 'JobQueue.php' );
+                       Job::queueLinksJobs( $tlto );
+               }
+
                # Category links
                $existing = $this->getExistingCategories();
                $this->incrTableUpdate( 'categorylinks', 'cl', $this->getCategoryDeletions( $existing ),
@@ -117,9 +121,17 @@ class LinksUpdate {
                $fname = 'LinksUpdate::doDumbUpdate';
                wfProfileIn( $fname );
 
+               # Refresh category pages
                $existing = $this->getExistingCategories();
                $categoryUpdates = array_diff_assoc( $existing, $this->mCategories ) + array_diff_assoc( $this->mCategories, $existing );
 
+               # Refresh links of all pages including this page
+               $tlto = $this->mTitle->getTemplateLinksTo();
+               if ( count( $tlto ) ) {
+                       require_once( 'JobQueue.php' );
+                       Job::queueLinksJobs( $tlto );
+               }
+
                $this->dumbTableUpdate( 'pagelinks',     $this->getLinkInsertions(),     'pl_from' );
                $this->dumbTableUpdate( 'imagelinks',    $this->getImageInsertions(),    'il_from' );
                $this->dumbTableUpdate( 'categorylinks', $this->getCategoryInsertions(), 'cl_from' );
index 407a3b6..fa7d783 100644 (file)
@@ -249,6 +249,7 @@ class MediaWiki {
        function finalCleanup ( &$deferredUpdates, &$loadBalancer, &$output ) {
                wfProfileIn( 'MediaWiki::finalCleanup' );
                $this->doUpdates( $deferredUpdates );
+               $this->doJobs();
                $loadBalancer->saveMasterPos();
                # Now commit any transactions, so that unreported errors after output() don't roll back the whole thing
                $loadBalancer->commitAll();
@@ -268,6 +269,38 @@ class MediaWiki {
                }
                wfProfileOut( 'MediaWiki::doUpdates' );         
        }
+
+       /**
+        * Do a job from the job queue
+        */
+       function doJobs() {
+               global $wgJobLogFile, $wgJobRunRate;
+               
+               if ( $wgJobRunRate <= 0 ) {
+                       return;
+               }
+               if ( $wgJobRunRate < 1 ) {
+                       $max = mt_getrandmax();
+                       if ( mt_rand( 0, $max ) < $max * $wgJobRunRate ) {
+                               return;
+                       }
+                       $n = 1;
+               } else {
+                       $n = intval( $wgJobRunRate );
+               }
+
+               require_once( 'JobQueue.php' );
+
+               while ( $n-- && false != ($job = Job::pop())) {
+                       $output = $job->toString() . "\n";
+                       if ( !$job->run() ) {
+                               $output .= "Error: " . $job->getLastError() . "\n";
+                       }
+                       if ( $wgJobLogFile ) {
+                               error_log( $output, 3, $wgJobLogFile );
+                       }
+               }
+       }
        
        /**
         * Ends this task peacefully
diff --git a/maintenance/archives/patch-job.sql b/maintenance/archives/patch-job.sql
new file mode 100644 (file)
index 0000000..8991845
--- /dev/null
@@ -0,0 +1,20 @@
+
+-- Jobs performed by parallel apache threads or a command-line daemon
+CREATE TABLE /*$wgDBprefix*/job (
+  job_id int(9) unsigned NOT NULL auto_increment,
+  
+  -- Command name, currently only refreshLinks is defined
+  job_cmd varchar(255) NOT NULL default '',
+
+  -- Namespace and title to act on
+  -- Should be 0 and '' if the command does not operate on a title
+  job_namespace int NOT NULL,
+  job_title varchar(255) binary NOT NULL,
+
+  -- Any other parameters to the command
+  -- Presently unused, format undefined
+  job_params blob NOT NULL default '',
+
+  PRIMARY KEY job_id (job_id),
+  KEY (job_cmd, job_namespace, job_title)
+) TYPE=InnoDB;
index 1186250..9fb2f33 100644 (file)
@@ -914,3 +914,23 @@ CREATE TABLE /*$wgDBprefix*/trackbacks (
 
        INDEX (tb_page)
 ) TYPE=InnoDB, DEFAULT CHARSET=utf8;
+
+-- Jobs performed by parallel apache threads or a command-line daemon
+CREATE TABLE /*$wgDBprefix*/job (
+  job_id int(9) unsigned NOT NULL auto_increment,
+  
+  -- Command name, currently only refreshLinks is defined
+  job_cmd varchar(255) NOT NULL default '',
+
+  -- Namespace and title to act on
+  -- Should be 0 and '' if the command does not operate on a title
+  job_namespace int NOT NULL,
+  job_title varchar(255) binary NOT NULL,
+
+  -- Any other parameters to the command
+  -- Presently unused, format undefined
+  job_params blob NOT NULL default '',
+
+  PRIMARY KEY job_id (job_id),
+  KEY (job_cmd, job_namespace, job_title)
+) TYPE=InnoDB, DEFAULT CHARSET=utf8;
diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php
new file mode 100644 (file)
index 0000000..b98d765
--- /dev/null
@@ -0,0 +1,13 @@
+<?php
+
+require_once( 'commandLine.inc' );
+require_once( "$IP/includes/JobQueue.php" );
+
+while ( false != ($job = Job::pop()) ) {
+       wfWaitForSlaves( 5 );
+       print $job->toString() . "\n";
+       if ( !$job->run() ) {
+               print "Error: {$job->error}\n";
+       }
+}
+
index 0bd3026..b3581d2 100644 (file)
@@ -902,3 +902,24 @@ CREATE TABLE /*$wgDBprefix*/trackbacks (
 
        INDEX (tb_page)
 ) TYPE=InnoDB;
+
+
+-- Jobs performed by parallel apache threads or a command-line daemon
+CREATE TABLE /*$wgDBprefix*/job (
+  job_id int(9) unsigned NOT NULL auto_increment,
+  
+  -- Command name, currently only refreshLinks is defined
+  job_cmd varchar(255) NOT NULL default '',
+
+  -- Namespace and title to act on
+  -- Should be 0 and '' if the command does not operate on a title
+  job_namespace int NOT NULL,
+  job_title varchar(255) binary NOT NULL,
+
+  -- Any other parameters to the command
+  -- Presently unused, format undefined
+  job_params blob NOT NULL default '',
+
+  PRIMARY KEY job_id (job_id),
+  KEY (job_cmd, job_namespace, job_title)
+) TYPE=InnoDB;
index ab08e27..4d532b0 100644 (file)
@@ -27,6 +27,7 @@ $wgNewTables = array(
        array( 'transcache',    'patch-transcache.sql' ),
        array( 'trackbacks',    'patch-trackbacks.sql' ),
        array( 'externallinks', 'patch-externallinks.sql' ),
+       array( 'job',           'patch-job.sql' ),
 );
 
 $wgNewFields = array(