From ec277aed726f57b3d0ae613a2cf8e3a9509ac053 Mon Sep 17 00:00:00 2001 From: Tim Starling Date: Thu, 10 Mar 2011 04:10:27 +0000 Subject: [PATCH] Rewrite the caching code in nextJobDB.php so that jobs-loop.sh has a chance of being able to exit from the "high-priority" loop. The current syndrome is that when the cache expires, a small number of high-priority jobs are added. These jobs are quickly cleared, but the job runners do not continue on to other things, because the cache keeps returning the same small list of wikis for 5 minutes. After 5 minutes, there are a few more jobs in the high-priority list, so the cycle continues. --- maintenance/nextJobDB.php | 84 ++++++++++++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 18 deletions(-) diff --git a/maintenance/nextJobDB.php b/maintenance/nextJobDB.php index 8d8229e257..5d01650022 100644 --- a/maintenance/nextJobDB.php +++ b/maintenance/nextJobDB.php @@ -33,20 +33,71 @@ class nextJobDB extends Maintenance { public function execute() { global $wgMemc; $type = $this->getOption( 'type', false ); - $mckey = $type === false - ? "jobqueue:dbs" - : "jobqueue:dbs:$type"; - $pendingDBs = $wgMemc->get( $mckey ); - # If we didn't get it from the cache + $memcKey = 'jobqueue:dbs:v2'; + $pendingDBs = $wgMemc->get( $memcKey ); + + // If the cache entry wasn't present, or in 1% of cases otherwise, + // regenerate the cache. + if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) { + $pendingDBs = $this->getPendingDbs(); + $wgMemc->set( $memcKey, $pendingDBs, 300 ); + } + if ( !$pendingDBs ) { - $pendingDBs = $this->getPendingDbs( $type ); - $wgMemc->set( $mckey, $pendingDBs, 300 ); + return; } - # If we've got a pending job in a db, display it. - if ( $pendingDBs ) { - $this->output( $pendingDBs[mt_rand( 0, count( $pendingDBs ) - 1 )] ); + + do { + $again = false; + + if ( $type === false ) { + $candidates = call_user_func_array( 'array_merge', $pendingDBs ); + } elseif ( isset( $pendingDBs[$type] ) ) { + $candidates = $pendingDBs[$type]; + } else { + $candidates = array(); + } + if ( !$candidates ) { + return; + } + + $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ]; + if ( !$this->checkJob( $type, $db ) ) { + // This job is not available in the current database. Remove it from + // the cache. + if ( $type === false ) { + foreach ( $pendingDBs as $type2 => $dbs ) { + $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) ); + } + } else { + $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) ); + } + + $wgMemc->set( $memcKey, $pendingDBs, 300 ); + $again = true; + } + } while ( $again ); + + $this->output( $db . "\n" ); + } + + /** + * Check if the specified database has a job of the specified type in it. + * The type may be false to indicate "all". + */ + function checkJob( $type, $db ) { + $lb = wfGetLB( $db ); + $db = $lb->getConnection( DB_MASTER ); + if ( $type === false ) { + $conds = array(); + } else { + $conds = array( 'job_cmd' => $type ); } + + $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ ); + $lb->reuseConnection( $db ); + return $exists; } /** @@ -54,7 +105,7 @@ class nextJobDB extends Maintenance { * @param $type String Job type * @return array */ - private function getPendingDbs( $type ) { + private function getPendingDbs() { global $wgLocalDatabases; $pendingDBs = array(); # Cross-reference DBs by master DB server @@ -66,10 +117,10 @@ class nextJobDB extends Maintenance { foreach ( $dbsByMaster as $dbs ) { $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] ); - $stype = $dbConn->addQuotes( $type ); # Padding row for MySQL bug - $sql = "(SELECT '-------------------------------------------' as db)"; + $pad = str_repeat( '-', 40 ); + $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)"; foreach ( $dbs as $wikiId ) { if ( $sql != '' ) { $sql .= ' UNION '; @@ -79,10 +130,7 @@ class nextJobDB extends Maintenance { $dbConn->tablePrefix( $tablePrefix ); $jobTable = $dbConn->tableName( 'job' ); - if ( $type === false ) - $sql .= "(SELECT '$wikiId' as db FROM $dbName.$jobTable LIMIT 1)"; - else - $sql .= "(SELECT '$wikiId' as db FROM $dbName.$jobTable WHERE job_cmd=$stype LIMIT 1)"; + $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)"; } $res = $dbConn->query( $sql, __METHOD__ ); $first = true; @@ -92,7 +140,7 @@ class nextJobDB extends Maintenance { $first = false; continue; } - $pendingDBs[] = $row->db; + $pendingDBs[$row->job_cmd][] = $row->db; } } return $pendingDBs; -- 2.20.1