[JobQueue] Abstracted nextJobDB.php to not assume JobQueueDB.
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 3 Nov 2012 00:31:25 +0000 (17:31 -0700)
committerGerrit Code Review <gerrit@wikimedia.org>
Thu, 8 Nov 2012 23:59:18 +0000 (23:59 +0000)
* Changed the use of caching to avoid cache slams by having runners
  fall back to the old "queues with jobs" cache value when another
  process is updating the cache. Also bumped the cache key version.

Change-Id: I6d5f32ab846c14a25afe8c5957443ed95a1d7bd2

includes/job/JobQueue.php
includes/job/JobQueueDB.php
includes/job/JobQueueGroup.php
maintenance/nextJobDB.php

index dc5bdbd..21ef6d3 100644 (file)
@@ -88,7 +88,10 @@ abstract class JobQueue {
        }
 
        /**
-        * @return bool Quickly check if the queue is empty
+        * Quickly check if the queue is empty.
+        * Queue classes should use caching if they are any slower without memcached.
+        *
+        * @return bool
         */
        final public function isEmpty() {
                wfProfileIn( __METHOD__ );
index a8a2e89..223ef41 100644 (file)
@@ -28,7 +28,7 @@
  * @since 1.21
  */
 class JobQueueDB extends JobQueue {
-       const CACHE_TTL      = 30; // integer; seconds
+       const CACHE_TTL      = 300; // integer; seconds
        const MAX_JOB_RANDOM = 2147483647; // 2^31 - 1; used for job_random
 
        /**
@@ -97,7 +97,7 @@ class JobQueueDB extends JobQueue {
                                        $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
                                }
 
-                               $wgMemc->set( $key, 'false', $ttl );
+                               $wgMemc->set( $key, 'false', $ttl ); // queue is not empty
                        } );
                }
 
index 44db516..48f2746 100644 (file)
@@ -153,4 +153,17 @@ class JobQueueGroup {
 
                return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
        }
+
+       /**
+        * @return Array List of job types that have non-empty queues
+        */
+       public function getQueuesWithJobs() {
+               $types = array();
+               foreach ( $this->getQueueTypes() as $type ) {
+                       if ( !$this->get( $type )->isEmpty() ) {
+                               $types[] = $type;
+                       }
+               }
+               return $types;
+       }
 }
index 75018de..7a5ddac 100644 (file)
@@ -38,21 +38,31 @@ class nextJobDB extends Maintenance {
 
        public function execute() {
                global $wgMemc;
+
                $type = $this->getOption( 'type', false );
 
-               $memcKey = 'jobqueue:dbs:v2';
-               $pendingDBs = $wgMemc->get( $memcKey );
+               $memcKey = 'jobqueue:dbs:v3';
+               $pendingDbInfo = $wgMemc->get( $memcKey );
 
                // If the cache entry wasn't present, or in 1% of cases otherwise,
-               // regenerate the cache.
-               if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) {
-                       $pendingDBs = $this->getPendingDbs();
-                       $wgMemc->set( $memcKey, $pendingDBs, 300 );
+               // regenerate the cache. Use any available stale cache if another
+               // process is currently regenerating the pending DB information.
+               if ( !$pendingDbInfo || mt_rand( 0, 100 ) == 0 ) {
+                       $lock = $wgMemc->add( 'jobqueue:dbs:v3:lock', 1 ); // lock
+                       if ( $lock ) {
+                               $pendingDbInfo = array(
+                                       'pendingDBs' => $this->getPendingDbs(),
+                                       'timestamp'  => time()
+                               );
+                               $wgMemc->set( $memcKey, $pendingDbInfo );
+                               $wgMemc->delete( 'jobqueue:dbs:v3:lock' ); // unlock
+                       }
                }
 
-               if ( !$pendingDBs ) {
-                       return;
+               if ( !$pendingDbInfo || !$pendingDbInfo['pendingDBs'] ) {
+                       return; // no DBs with jobs or cache is both empty and locked
                }
+               $pendingDBs = $pendingDbInfo['pendingDBs'];
 
                do {
                        $again = false;
@@ -97,24 +107,17 @@ class nextJobDB extends Maintenance {
         * @return bool
         */
        function checkJob( $type, $dbName ) {
-               global $wgJobTypesExcludedFromDefaultQueue;
-
+               $group = JobQueueGroup::singleton( $dbName );
                if ( $type === false ) {
-                       $lb = wfGetLB( $dbName );
-                       $db = $lb->getConnection( DB_MASTER, array(), $dbName );
-                       $conds = array();
-                       if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) {
-                               foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) {
-                                       $conds[] = "job_cmd != " . $db->addQuotes( $cmdType );
+                       foreach ( $group->getDefaultQueueTypes() as $type ) {
+                               if ( !$group->get( $type )->isEmpty() ) {
+                                       return true;
                                }
                        }
-                       $exists = (bool)$db->selectField( 'job', '1', $conds, __METHOD__ );
-                       $lb->reuseConnection( $db );
+                       return false;
                } else {
-                       $exists = !JobQueueGroup::singleton( $dbName )->get( $type )->isEmpty();
+                       return !$group->get( $type )->isEmpty();
                }
-
-               return $exists;
        }
 
        /**
@@ -123,42 +126,15 @@ class nextJobDB extends Maintenance {
         */
        private function getPendingDbs() {
                global $wgLocalDatabases;
-               $pendingDBs = array();
-               # Cross-reference DBs by master DB server
-               $dbsByMaster = array();
-               foreach ( $wgLocalDatabases as $db ) {
-                       $lb = wfGetLB( $db );
-                       $dbsByMaster[$lb->getServerName( 0 )][] = $db;
-               }
-
-               foreach ( $dbsByMaster as $dbs ) {
-                       $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] );
 
-                       # Padding row for MySQL bug
-                       $pad = str_repeat( '-', 40 );
-                       $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
-                       foreach ( $dbs as $wikiId ) {
-                               if ( $sql != '' ) {
-                                       $sql .= ' UNION ';
-                               }
-
-                               list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId );
-                               $dbConn->tablePrefix( $tablePrefix );
-                               $jobTable = $dbConn->tableName( 'job' );
-
-                               $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
-                       }
-                       $res = $dbConn->query( $sql, __METHOD__ );
-                       $first = true;
-                       foreach ( $res as $row ) {
-                               if ( $first ) {
-                                       // discard padding row
-                                       $first = false;
-                                       continue;
-                               }
-                               $pendingDBs[$row->job_cmd][] = $row->db;
+               $pendingDBs = array(); // (job type => (db list))
+               foreach ( $wgLocalDatabases as $db ) {
+                       $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs();
+                       foreach ( $types as $type ) {
+                               $pendingDBs[$type][] = $db;
                        }
                }
+
                return $pendingDBs;
        }
 }