Merge "Include Postgres tables and maintenance scripts for Change I23c47c2c"
[lhc/web/wiklou.git] / maintenance / nextJobDB.php
index 75018de..032d6f9 100644 (file)
@@ -18,7 +18,6 @@
  * http://www.gnu.org/copyleft/gpl.html
  *
  * @file
- * @todo Make this work on PostgreSQL and maybe other database servers
  * @ingroup Maintenance
  */
 
@@ -33,60 +32,108 @@ class nextJobDB extends Maintenance {
        public function __construct() {
                parent::__construct();
                $this->mDescription = "Pick a database that has pending jobs";
-               $this->addOption( 'type', "The type of job to search for", false, true );
+               $this->addOption( 'type', "Search by job type", false, true );
+               $this->addOption( 'types', "Space separated list of job types to search for", false, true );
        }
 
        public function execute() {
                global $wgMemc;
-               $type = $this->getOption( 'type', false );
 
-               $memcKey = 'jobqueue:dbs:v2';
-               $pendingDBs = $wgMemc->get( $memcKey );
+               $type = false; // job type required/picked
+               if ( $this->hasOption( 'types' ) ) {
+                       $types = explode( ' ', $this->getOption( 'types' ) );
+               } elseif ( $this->hasOption( 'type' ) ) {
+                       $types = array( $this->getOption( 'type' ) );
+               } else {
+                       $types = JobQueueGroup::singleton()->getDefaultQueueTypes();
+               }
 
-               // If the cache entry wasn't present, or in 1% of cases otherwise,
-               // regenerate the cache.
-               if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) {
-                       $pendingDBs = $this->getPendingDbs();
-                       $wgMemc->set( $memcKey, $pendingDBs, 300 );
+               $memcKey = 'jobqueue:dbs:v3';
+               $pendingDbInfo = $wgMemc->get( $memcKey );
+
+               // If the cache entry wasn't present, is stale, or in .1% of cases otherwise,
+               // regenerate the cache. Use any available stale cache if another process is
+               // currently regenerating the pending DB information.
+               if ( !is_array( $pendingDbInfo )
+                       || ( time() - $pendingDbInfo['timestamp'] ) > 300 // 5 minutes
+                       || mt_rand( 0, 999 ) == 0
+               ) {
+                       if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock
+                               $pendingDbInfo = array(
+                                       'pendingDBs' => $this->getPendingDbs(),
+                                       'timestamp'  => time()
+                               );
+                               for ( $attempts=1; $attempts <= 25; ++$attempts ) {
+                                       if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { // lock
+                                               $wgMemc->set( $memcKey, $pendingDbInfo );
+                                               $wgMemc->delete( "$memcKey:lock" ); // unlock
+                                               break;
+                                       }
+                               }
+                               $wgMemc->delete( "$memcKey:rebuild" ); // unlock
+                       }
                }
 
-               if ( !$pendingDBs ) {
-                       return;
+               if ( !is_array( $pendingDbInfo ) || !$pendingDbInfo['pendingDBs'] ) {
+                       return; // no DBs with jobs or cache is both empty and locked
                }
 
+               $pendingDBs = $pendingDbInfo['pendingDBs']; // convenience
                do {
                        $again = false;
 
-                       if ( $type === false ) {
-                               $candidates = call_user_func_array( 'array_merge', $pendingDBs );
-                       } elseif ( isset( $pendingDBs[$type] ) ) {
-                               $candidates = $pendingDBs[$type];
-                       } else {
-                               $candidates = array();
+                       $candidates = array(); // list of (type, db)
+                       // Flatten the tree of candidates into a flat list so that a random
+                       // item can be selected, weighing each queue (type/db tuple) equally.
+                       foreach ( $pendingDBs as $type => $dbs ) {
+                               if ( in_array( $type, $types ) ) {
+                                       foreach ( $dbs as $db ) {
+                                               $candidates[] = array( $type, $db );
+                                       }
+                               }
                        }
-                       if ( !$candidates ) {
-                               return;
+                       if ( !count( $candidates ) ) {
+                               return; // no jobs for this type
                        }
 
-                       $candidates = array_values( $candidates );
-                       $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
-                       if ( !$this->checkJob( $type, $db ) ) {
-                               // This job is not available in the current database. Remove it from
-                               // the cache.
-                               if ( $type === false ) {
-                                       foreach ( $pendingDBs as $type2 => $dbs ) {
-                                               $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) );
+                       list( $type, $db ) = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
+                       if ( !$this->checkJob( $type, $db ) ) { // queue is actually empty?
+                               $pendingDBs = $this->delistDB( $pendingDBs, $db, $type );
+                               // Update the cache to remove the outdated information.
+                               // Make sure that this does not race (especially with full rebuilds).
+                               if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { // lock
+                                       $curInfo = $wgMemc->get( $memcKey );
+                                       if ( is_array( $curInfo ) ) {
+                                               $curInfo['pendingDBs'] =
+                                                       $this->delistDB( $curInfo['pendingDBs'], $db, $type );
+                                               $wgMemc->set( $memcKey, $curInfo );
+                                               // May as well make use of this newer information
+                                               $pendingDBs = $curInfo['pendingDBs'];
                                        }
-                               } else {
-                                       $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
+                                       $wgMemc->delete( "$memcKey:lock" ); // unlock
                                }
-
-                               $wgMemc->set( $memcKey, $pendingDBs, 300 );
                                $again = true;
                        }
                } while ( $again );
 
-               $this->output( $db . "\n" );
+               if ( $this->hasOption( 'types' ) ) {
+                       $this->output( $db . " " . $type . "\n" );
+               } else {
+                       $this->output( $db . "\n" );
+               }
+       }
+
+       /**
+        * Remove a type/DB entry from the list of queues with jobs
+        *
+        * @param $pendingDBs array
+        * @param $db string
+        * @param $type string
+        * @return Array
+        */
+       private function delistDB( array $pendingDBs, $db, $type ) {
+               $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
+               return $pendingDBs;
        }
 
        /**
@@ -96,25 +143,8 @@ class nextJobDB extends Maintenance {
         * @param $dbName string
         * @return bool
         */
-       function checkJob( $type, $dbName ) {
-               global $wgJobTypesExcludedFromDefaultQueue;
-
-               if ( $type === false ) {
-                       $lb = wfGetLB( $dbName );
-                       $db = $lb->getConnection( DB_MASTER, array(), $dbName );
-                       $conds = array();
-                       if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) {
-                               foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) {
-                                       $conds[] = "job_cmd != " . $db->addQuotes( $cmdType );
-                               }
-                       }
-                       $exists = (bool)$db->selectField( 'job', '1', $conds, __METHOD__ );
-                       $lb->reuseConnection( $db );
-               } else {
-                       $exists = !JobQueueGroup::singleton( $dbName )->get( $type )->isEmpty();
-               }
-
-               return $exists;
+       private function checkJob( $type, $dbName ) {
+               return !JobQueueGroup::singleton( $dbName )->get( $type )->isEmpty();
        }
 
        /**
@@ -123,42 +153,15 @@ class nextJobDB extends Maintenance {
         */
        private function getPendingDbs() {
                global $wgLocalDatabases;
-               $pendingDBs = array();
-               # Cross-reference DBs by master DB server
-               $dbsByMaster = array();
-               foreach ( $wgLocalDatabases as $db ) {
-                       $lb = wfGetLB( $db );
-                       $dbsByMaster[$lb->getServerName( 0 )][] = $db;
-               }
-
-               foreach ( $dbsByMaster as $dbs ) {
-                       $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] );
 
-                       # Padding row for MySQL bug
-                       $pad = str_repeat( '-', 40 );
-                       $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
-                       foreach ( $dbs as $wikiId ) {
-                               if ( $sql != '' ) {
-                                       $sql .= ' UNION ';
-                               }
-
-                               list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId );
-                               $dbConn->tablePrefix( $tablePrefix );
-                               $jobTable = $dbConn->tableName( 'job' );
-
-                               $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
-                       }
-                       $res = $dbConn->query( $sql, __METHOD__ );
-                       $first = true;
-                       foreach ( $res as $row ) {
-                               if ( $first ) {
-                                       // discard padding row
-                                       $first = false;
-                                       continue;
-                               }
-                               $pendingDBs[$row->job_cmd][] = $row->db;
+               $pendingDBs = array(); // (job type => (db list))
+               foreach ( $wgLocalDatabases as $db ) {
+                       $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs();
+                       foreach ( $types as $type ) {
+                               $pendingDBs[$type][] = $db;
                        }
                }
+
                return $pendingDBs;
        }
 }