Rewrite the caching code in nextJobDB.php so that jobs-loop.sh has a chance of being...
authorTim Starling <tstarling@users.mediawiki.org>
Thu, 10 Mar 2011 04:10:27 +0000 (04:10 +0000)
committerTim Starling <tstarling@users.mediawiki.org>
Thu, 10 Mar 2011 04:10:27 +0000 (04:10 +0000)
The current syndrome is that when the cache expires, a small number of high-priority jobs are added. These jobs are quickly cleared, but the job runners do not continue on to other things, because the cache keeps returning the same small list of wikis for 5 minutes. After 5 minutes, there are a few more jobs in the high-priority list, so the cycle continues.

maintenance/nextJobDB.php

index 8d8229e..5d01650 100644 (file)
@@ -33,20 +33,71 @@ class nextJobDB extends Maintenance {
        public function execute() {
                global $wgMemc;
                $type = $this->getOption( 'type', false );
-               $mckey = $type === false
-                                       ? "jobqueue:dbs"
-                                       : "jobqueue:dbs:$type";
-               $pendingDBs = $wgMemc->get( $mckey );
 
-               # If we didn't get it from the cache
+               $memcKey = 'jobqueue:dbs:v2';
+               $pendingDBs = $wgMemc->get( $memcKey );
+
+               // If the cache entry wasn't present, or in 1% of cases otherwise, 
+               // regenerate the cache.
+               if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) {
+                       $pendingDBs = $this->getPendingDbs();
+                       $wgMemc->set( $memcKey, $pendingDBs, 300 );
+               }
+
                if ( !$pendingDBs ) {
-                       $pendingDBs = $this->getPendingDbs( $type );
-                       $wgMemc->set( $mckey, $pendingDBs, 300 );
+                       return;
                }
-               # If we've got a pending job in a db, display it.
-               if ( $pendingDBs ) {
-                       $this->output( $pendingDBs[mt_rand( 0, count( $pendingDBs ) - 1 )] );
+
+               do {
+                       $again = false;
+
+                       if ( $type === false ) {
+                               $candidates = call_user_func_array( 'array_merge', $pendingDBs );
+                       } elseif ( isset( $pendingDBs[$type] ) ) {
+                               $candidates = $pendingDBs[$type];
+                       } else {
+                               $candidates = array();
+                       }
+                       if ( !$candidates ) {
+                               return;
+                       }
+
+                       $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
+                       if ( !$this->checkJob( $type, $db ) ) {
+                               // This job is not available in the current database. Remove it from 
+                               // the cache.
+                               if ( $type === false ) {
+                                       foreach ( $pendingDBs as $type2 => $dbs ) {
+                                               $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) );
+                                       }
+                               } else {
+                                       $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
+                               }
+
+                               $wgMemc->set( $memcKey, $pendingDBs, 300 );
+                               $again = true;
+                       }
+               } while ( $again );
+
+               $this->output( $db . "\n" );
+       }
+
+       /**
+        * Check if the specified database has a job of the specified type in it.
+        * The type may be false to indicate "all". 
+        */
+       function checkJob( $type, $db ) {
+               $lb = wfGetLB( $db );
+               $db = $lb->getConnection( DB_MASTER );
+               if ( $type === false ) {
+                       $conds = array();
+               } else {
+                       $conds = array( 'job_cmd' => $type );
                }
+
+               $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ );
+               $lb->reuseConnection( $db );
+               return $exists;
        }
 
        /**
@@ -54,7 +105,7 @@ class nextJobDB extends Maintenance {
         * @param $type String Job type
         * @return array
         */
-       private function getPendingDbs( $type ) {
+       private function getPendingDbs() {
                global $wgLocalDatabases;
                $pendingDBs = array();
                # Cross-reference DBs by master DB server
@@ -66,10 +117,10 @@ class nextJobDB extends Maintenance {
 
                foreach ( $dbsByMaster as $dbs ) {
                        $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] );
-                       $stype = $dbConn->addQuotes( $type );
 
                        # Padding row for MySQL bug
-                       $sql = "(SELECT '-------------------------------------------' as db)";
+                       $pad = str_repeat( '-', 40 );
+                       $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
                        foreach ( $dbs as $wikiId ) {
                                if ( $sql != '' ) {
                                        $sql .= ' UNION ';
@@ -79,10 +130,7 @@ class nextJobDB extends Maintenance {
                                $dbConn->tablePrefix( $tablePrefix );
                                $jobTable = $dbConn->tableName( 'job' );
 
-                               if ( $type === false )
-                                       $sql .= "(SELECT '$wikiId' as db FROM $dbName.$jobTable LIMIT 1)";
-                               else
-                                       $sql .= "(SELECT '$wikiId' as db FROM $dbName.$jobTable WHERE job_cmd=$stype LIMIT 1)";
+                               $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
                        }
                        $res = $dbConn->query( $sql, __METHOD__ );
                        $first = true;
@@ -92,7 +140,7 @@ class nextJobDB extends Maintenance {
                                        $first = false;
                                        continue;
                                }
-                               $pendingDBs[] = $row->db;
+                               $pendingDBs[$row->job_cmd][] = $row->db;
                        }
                }
                return $pendingDBs;