From dc9eb6002982f7961d7b2c9ce3b4d0523de13734 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 2 Nov 2012 17:31:25 -0700 Subject: [PATCH] [JobQueue] Abstracted nextJobDB.php to not assume JobQueueDB. * Changed the use of caching to avoid cache slams by having runners fall back to the old "queues with jobs" cache value when another process is updating the cache. Also bumped the cache key version. Change-Id: I6d5f32ab846c14a25afe8c5957443ed95a1d7bd2 --- includes/job/JobQueue.php | 5 +- includes/job/JobQueueDB.php | 4 +- includes/job/JobQueueGroup.php | 13 ++++++ maintenance/nextJobDB.php | 84 ++++++++++++---------------------- 4 files changed, 49 insertions(+), 57 deletions(-) diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index dc5bdbd3a8..21ef6d34e1 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -88,7 +88,10 @@ abstract class JobQueue { } /** - * @return bool Quickly check if the queue is empty + * Quickly check if the queue is empty. + * Queue classes should use caching if they are any slower without memcached. + * + * @return bool */ final public function isEmpty() { wfProfileIn( __METHOD__ ); diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index a8a2e89d79..223ef41fe8 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -28,7 +28,7 @@ * @since 1.21 */ class JobQueueDB extends JobQueue { - const CACHE_TTL = 30; // integer; seconds + const CACHE_TTL = 300; // integer; seconds const MAX_JOB_RANDOM = 2147483647; // 2^31 - 1; used for job_random /** @@ -97,7 +97,7 @@ class JobQueueDB extends JobQueue { $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin() } - $wgMemc->set( $key, 'false', $ttl ); + $wgMemc->set( $key, 'false', $ttl ); // queue is not empty } ); } diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index 44db516351..48f27469e5 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -153,4 +153,17 @@ class JobQueueGroup { return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); } + + /** + * @return Array List of job types that have non-empty queues + */ + public function getQueuesWithJobs() { + $types = array(); + foreach ( $this->getQueueTypes() as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } + return $types; + } } diff --git a/maintenance/nextJobDB.php b/maintenance/nextJobDB.php index 75018dea82..7a5ddacc09 100644 --- a/maintenance/nextJobDB.php +++ b/maintenance/nextJobDB.php @@ -38,21 +38,31 @@ class nextJobDB extends Maintenance { public function execute() { global $wgMemc; + $type = $this->getOption( 'type', false ); - $memcKey = 'jobqueue:dbs:v2'; - $pendingDBs = $wgMemc->get( $memcKey ); + $memcKey = 'jobqueue:dbs:v3'; + $pendingDbInfo = $wgMemc->get( $memcKey ); // If the cache entry wasn't present, or in 1% of cases otherwise, - // regenerate the cache. - if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) { - $pendingDBs = $this->getPendingDbs(); - $wgMemc->set( $memcKey, $pendingDBs, 300 ); + // regenerate the cache. Use any available stale cache if another + // process is currently regenerating the pending DB information. + if ( !$pendingDbInfo || mt_rand( 0, 100 ) == 0 ) { + $lock = $wgMemc->add( 'jobqueue:dbs:v3:lock', 1 ); // lock + if ( $lock ) { + $pendingDbInfo = array( + 'pendingDBs' => $this->getPendingDbs(), + 'timestamp' => time() + ); + $wgMemc->set( $memcKey, $pendingDbInfo ); + $wgMemc->delete( 'jobqueue:dbs:v3:lock' ); // unlock + } } - if ( !$pendingDBs ) { - return; + if ( !$pendingDbInfo || !$pendingDbInfo['pendingDBs'] ) { + return; // no DBs with jobs or cache is both empty and locked } + $pendingDBs = $pendingDbInfo['pendingDBs']; do { $again = false; @@ -97,24 +107,17 @@ class nextJobDB extends Maintenance { * @return bool */ function checkJob( $type, $dbName ) { - global $wgJobTypesExcludedFromDefaultQueue; - + $group = JobQueueGroup::singleton( $dbName ); if ( $type === false ) { - $lb = wfGetLB( $dbName ); - $db = $lb->getConnection( DB_MASTER, array(), $dbName ); - $conds = array(); - if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) { - foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) { - $conds[] = "job_cmd != " . $db->addQuotes( $cmdType ); + foreach ( $group->getDefaultQueueTypes() as $type ) { + if ( !$group->get( $type )->isEmpty() ) { + return true; } } - $exists = (bool)$db->selectField( 'job', '1', $conds, __METHOD__ ); - $lb->reuseConnection( $db ); + return false; } else { - $exists = !JobQueueGroup::singleton( $dbName )->get( $type )->isEmpty(); + return !$group->get( $type )->isEmpty(); } - - return $exists; } /** @@ -123,42 +126,15 @@ class nextJobDB extends Maintenance { */ private function getPendingDbs() { global $wgLocalDatabases; - $pendingDBs = array(); - # Cross-reference DBs by master DB server - $dbsByMaster = array(); - foreach ( $wgLocalDatabases as $db ) { - $lb = wfGetLB( $db ); - $dbsByMaster[$lb->getServerName( 0 )][] = $db; - } - - foreach ( $dbsByMaster as $dbs ) { - $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] ); - # Padding row for MySQL bug - $pad = str_repeat( '-', 40 ); - $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)"; - foreach ( $dbs as $wikiId ) { - if ( $sql != '' ) { - $sql .= ' UNION '; - } - - list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId ); - $dbConn->tablePrefix( $tablePrefix ); - $jobTable = $dbConn->tableName( 'job' ); - - $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)"; - } - $res = $dbConn->query( $sql, __METHOD__ ); - $first = true; - foreach ( $res as $row ) { - if ( $first ) { - // discard padding row - $first = false; - continue; - } - $pendingDBs[$row->job_cmd][] = $row->db; + $pendingDBs = array(); // (job type => (db list)) + foreach ( $wgLocalDatabases as $db ) { + $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs(); + foreach ( $types as $type ) { + $pendingDBs[$type][] = $db; } } + return $pendingDBs; } } -- 2.20.1