* @ingroup Maintenance
*/
-require_once( dirname( __FILE__ ) . '/Maintenance.php' );
+require_once( __DIR__ . '/Maintenance.php' );
/**
* Maintenance script that picks a database that has pending jobs.
public function execute() {
global $wgMemc;
- $type = $this->getOption( 'type', false );
- $memcKey = 'jobqueue:dbs:v2';
- $pendingDBs = $wgMemc->get( $memcKey );
+ $type = $this->getOption( 'type', false );
- // If the cache entry wasn't present, or in 1% of cases otherwise,
- // regenerate the cache.
- if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) {
- $pendingDBs = $this->getPendingDbs();
- $wgMemc->set( $memcKey, $pendingDBs, 300 );
+ $memcKey = 'jobqueue:dbs:v3';
+ $pendingDbInfo = $wgMemc->get( $memcKey );
+
+ // If the cache entry wasn't present, is stale, or in .1% of cases otherwise,
+ // regenerate the cache. Use any available stale cache if another process is
+ // currently regenerating the pending DB information.
+ if ( !is_array( $pendingDbInfo )
+ || ( time() - $pendingDbInfo['timestamp'] ) > 300 // 5 minutes
+ || mt_rand( 0, 999 ) == 0
+ ) {
+ if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock
+ $pendingDbInfo = array(
+ 'pendingDBs' => $this->getPendingDbs(),
+ 'timestamp' => time()
+ );
+ for ( $attempts=1; $attempts <= 25; ++$attempts ) {
+ if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { // lock
+ $wgMemc->set( $memcKey, $pendingDbInfo );
+ $wgMemc->delete( "$memcKey:lock" ); // unlock
+ break;
+ }
+ }
+ $wgMemc->delete( "$memcKey:rebuild" ); // unlock
+ }
}
- if ( !$pendingDBs ) {
- return;
+ if ( !is_array( $pendingDbInfo ) || !$pendingDbInfo['pendingDBs'] ) {
+ return; // no DBs with jobs or cache is both empty and locked
}
+ $pendingDBs = $pendingDbInfo['pendingDBs']; // convenience
do {
$again = false;
$candidates = array_values( $candidates );
$db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
if ( !$this->checkJob( $type, $db ) ) {
- // This job is not available in the current database. Remove it from
- // the cache.
if ( $type === false ) {
+ // There are no jobs available in the current database
foreach ( $pendingDBs as $type2 => $dbs ) {
$pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) );
}
} else {
+ // There are no jobs of this type available in the current database
$pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
}
-
- $wgMemc->set( $memcKey, $pendingDBs, 300 );
+ // Update the cache to remove the outdated information.
+ // Make sure that this does not race (especially with full rebuilds).
+ $pendingDbInfo['pendingDBs'] = $pendingDBs;
+ if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { // lock
+ $curInfo = $wgMemc->get( $memcKey );
+ if ( $curInfo && $curInfo['timestamp'] === $pendingDbInfo['timestamp'] ) {
+ $wgMemc->set( $memcKey, $pendingDbInfo );
+ }
+ $wgMemc->delete( "$memcKey:lock" ); // unlock
+ }
$again = true;
}
} while ( $again );
* @return bool
*/
function checkJob( $type, $dbName ) {
- $lb = wfGetLB( $dbName );
- $db = $lb->getConnection( DB_MASTER, array(), $dbName );
+ $group = JobQueueGroup::singleton( $dbName );
if ( $type === false ) {
- $conds = Job::defaultQueueConditions( );
+ foreach ( $group->getDefaultQueueTypes() as $type ) {
+ if ( !$group->get( $type )->isEmpty() ) {
+ return true;
+ }
+ }
+ return false;
} else {
- $conds = array( 'job_cmd' => $type );
+ return !$group->get( $type )->isEmpty();
}
-
-
- $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ );
- $lb->reuseConnection( $db );
- return $exists;
}
/**
*/
private function getPendingDbs() {
global $wgLocalDatabases;
- $pendingDBs = array();
- # Cross-reference DBs by master DB server
- $dbsByMaster = array();
- foreach ( $wgLocalDatabases as $db ) {
- $lb = wfGetLB( $db );
- $dbsByMaster[$lb->getServerName( 0 )][] = $db;
- }
-
- foreach ( $dbsByMaster as $dbs ) {
- $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] );
-
- # Padding row for MySQL bug
- $pad = str_repeat( '-', 40 );
- $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
- foreach ( $dbs as $wikiId ) {
- if ( $sql != '' ) {
- $sql .= ' UNION ';
- }
- list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId );
- $dbConn->tablePrefix( $tablePrefix );
- $jobTable = $dbConn->tableName( 'job' );
-
- $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
- }
- $res = $dbConn->query( $sql, __METHOD__ );
- $first = true;
- foreach ( $res as $row ) {
- if ( $first ) {
- // discard padding row
- $first = false;
- continue;
- }
- $pendingDBs[$row->job_cmd][] = $row->db;
+ $pendingDBs = array(); // (job type => (db list))
+ foreach ( $wgLocalDatabases as $db ) {
+ $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs();
+ foreach ( $types as $type ) {
+ $pendingDBs[$type][] = $db;
}
}
+
return $pendingDBs;
}
}