Merge "Fixed action=query list=blocks for IPv6 addresses"
[lhc/web/wiklou.git] / maintenance / nextJobDB.php
index e66e981..d8480f3 100644 (file)
@@ -38,21 +38,31 @@ class nextJobDB extends Maintenance {
 
        public function execute() {
                global $wgMemc;
+
                $type = $this->getOption( 'type', false );
 
-               $memcKey = 'jobqueue:dbs:v2';
-               $pendingDBs = $wgMemc->get( $memcKey );
+               $memcKey = 'jobqueue:dbs:v3';
+               $pendingDbInfo = $wgMemc->get( $memcKey );
 
                // If the cache entry wasn't present, or in 1% of cases otherwise,
-               // regenerate the cache.
-               if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) {
-                       $pendingDBs = $this->getPendingDbs();
-                       $wgMemc->set( $memcKey, $pendingDBs, 300 );
+               // regenerate the cache. Use any available stale cache if another
+               // process is currently regenerating the pending DB information.
+               if ( !$pendingDbInfo || mt_rand( 0, 100 ) == 0 ) {
+                       $lock = $wgMemc->add( 'jobqueue:dbs:v3:lock', 1, 1800 ); // lock
+                       if ( $lock ) {
+                               $pendingDbInfo = array(
+                                       'pendingDBs' => $this->getPendingDbs(),
+                                       'timestamp'  => time()
+                               );
+                               $wgMemc->set( $memcKey, $pendingDbInfo );
+                               $wgMemc->delete( 'jobqueue:dbs:v3:lock' ); // unlock
+                       }
                }
 
-               if ( !$pendingDBs ) {
-                       return;
+               if ( !$pendingDbInfo || !$pendingDbInfo['pendingDBs'] ) {
+                       return; // no DBs with jobs or cache is both empty and locked
                }
+               $pendingDBs = $pendingDbInfo['pendingDBs'];
 
                do {
                        $again = false;
@@ -71,17 +81,19 @@ class nextJobDB extends Maintenance {
                        $candidates = array_values( $candidates );
                        $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
                        if ( !$this->checkJob( $type, $db ) ) {
-                               // This job is not available in the current database. Remove it from
-                               // the cache.
                                if ( $type === false ) {
+                                       // There are no jobs available in the current database
                                        foreach ( $pendingDBs as $type2 => $dbs ) {
                                                $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) );
                                        }
                                } else {
+                                       // There are no jobs of this type available in the current database
                                        $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
                                }
-
-                               $wgMemc->set( $memcKey, $pendingDBs, 300 );
+                               // Update the cache to remove the outdated information
+                               $pendingDbInfo['pendingDBs'] = $pendingDBs;
+                               // @TODO: fix race condition with these updates
+                               $wgMemc->set( $memcKey, $pendingDbInfo );
                                $again = true;
                        }
                } while ( $again );
@@ -97,18 +109,17 @@ class nextJobDB extends Maintenance {
         * @return bool
         */
        function checkJob( $type, $dbName ) {
-               $lb = wfGetLB( $dbName );
-               $db = $lb->getConnection( DB_MASTER, array(), $dbName );
+               $group = JobQueueGroup::singleton( $dbName );
                if ( $type === false ) {
-                       $conds = Job::defaultQueueConditions( );
+                       foreach ( $group->getDefaultQueueTypes() as $type ) {
+                               if ( !$group->get( $type )->isEmpty() ) {
+                                       return true;
+                               }
+                       }
+                       return false;
                } else {
-                       $conds = array( 'job_cmd' => $type );
+                       return !$group->get( $type )->isEmpty();
                }
-
-
-               $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ );
-               $lb->reuseConnection( $db );
-               return $exists;
        }
 
        /**
@@ -117,42 +128,15 @@ class nextJobDB extends Maintenance {
         */
        private function getPendingDbs() {
                global $wgLocalDatabases;
-               $pendingDBs = array();
-               # Cross-reference DBs by master DB server
-               $dbsByMaster = array();
-               foreach ( $wgLocalDatabases as $db ) {
-                       $lb = wfGetLB( $db );
-                       $dbsByMaster[$lb->getServerName( 0 )][] = $db;
-               }
 
-               foreach ( $dbsByMaster as $dbs ) {
-                       $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] );
-
-                       # Padding row for MySQL bug
-                       $pad = str_repeat( '-', 40 );
-                       $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
-                       foreach ( $dbs as $wikiId ) {
-                               if ( $sql != '' ) {
-                                       $sql .= ' UNION ';
-                               }
-
-                               list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId );
-                               $dbConn->tablePrefix( $tablePrefix );
-                               $jobTable = $dbConn->tableName( 'job' );
-
-                               $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
-                       }
-                       $res = $dbConn->query( $sql, __METHOD__ );
-                       $first = true;
-                       foreach ( $res as $row ) {
-                               if ( $first ) {
-                                       // discard padding row
-                                       $first = false;
-                                       continue;
-                               }
-                               $pendingDBs[$row->job_cmd][] = $row->db;
+               $pendingDBs = array(); // (job type => (db list))
+               foreach ( $wgLocalDatabases as $db ) {
+                       $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs();
+                       foreach ( $types as $type ) {
+                               $pendingDBs[$type][] = $db;
                        }
                }
+
                return $pendingDBs;
        }
 }