/** @var BagOStuff */
protected $cache;
- protected $cluster = false; // string; name of an external DB cluster
+ /** @var bool|string Name of an external DB cluster. False if not set */
+ protected $cluster = false;
/**
* Additional parameters include:
* If not specified, the primary DB cluster for the wiki will be used.
* This can be overridden with a custom cluster so that DB handles will
* be retrieved via LBFactory::getExternalLB() and getConnection().
- * @param $params array
+ * @param array $params
*/
protected function __construct( array $params ) {
global $wgMemc;
return false;
}
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
try {
$found = $dbr->selectField( // unclaimed job
'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
/**
* @see JobQueue::doGetSize()
- * @return integer
+ * @return int
*/
protected function doGetSize() {
$key = $this->getCacheKey( 'size' );
}
try {
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
array( 'job_cmd' => $this->type, 'job_token' => '' ),
__METHOD__
/**
* @see JobQueue::doGetAcquiredCount()
- * @return integer
+ * @return int
*/
protected function doGetAcquiredCount() {
if ( $this->claimTTL <= 0 ) {
return $count;
}
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
try {
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
/**
* @see JobQueue::doGetAbandonedCount()
- * @return integer
+ * @return int
* @throws MWException
*/
protected function doGetAbandonedCount() {
return $count;
}
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
try {
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
array(
* @return bool
*/
protected function doBatchPush( array $jobs, $flags ) {
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
$that = $this;
$method = __METHOD__;
$dbw->onTransactionIdle(
- function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) {
+ function () use ( $dbw, $that, $jobs, $flags, $method ) {
$that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
}
);
/**
* This function should *not* be called outside of JobQueueDB
*
- * @param DatabaseBase $dbw
+ * @param IDatabase $dbw
* @param array $jobs
* @param int $flags
* @param string $method
- * @return boolean
- * @throws type
+ * @throws DBError
+ * @return bool
*/
- public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) {
+ public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
if ( !count( $jobs ) ) {
return true;
}
$dbw->insert( 'job', $rowBatch, $method );
}
JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
- JobQueue::incrStats( 'job-insert-duplicate', $this->type,
- count( $rowSet ) + count( $rowList ) - count( $rows ) );
+ JobQueue::incrStats(
+ 'job-insert-duplicate',
+ $this->type,
+ count( $rowSet ) + count( $rowList ) - count( $rows )
+ );
} catch ( DBError $e ) {
if ( $flags & self::QOS_ATOMIC ) {
$dbw->rollback( $method );
return false; // queue is empty
}
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
try {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
- $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+ $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
} );
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->metadata['id'] = $row->job_id;
- $job->id = $row->job_id; // XXX: work around broken subclasses
break; // done
} while ( true );
} catch ( DBError $e ) {
* @param string $uuid 32 char hex string
* @param $rand integer Random unsigned integer (31 bits)
* @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
- * @return Row|false
+ * @return stdClass|bool Row|false
*/
protected function claimRandom( $uuid, $rand, $gte ) {
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
// Check cache to see if the queue has <= OFFSET items
$tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
continue; // use job_random
}
}
+
if ( $row ) { // claim the job
$dbw->update( 'job', // update by PK
array(
* Reserve a row with a single UPDATE without holding row locks over RTTs...
*
* @param string $uuid 32 char hex string
- * @return Row|false
+ * @return stdClass|bool Row|false
*/
protected function claimOldest( $uuid ) {
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
$row = false; // the row acquired
do {
throw new MWException( "Job of type '{$job->getType()}' has no ID." );
}
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
try {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
- $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+ $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
} );
// deferred till "transaction idle", do the same here, so that the ordering is
// maintained. Having only the de-duplication registration succeed would cause
// jobs to become no-ops without any actual jobs that made them redundant.
- list( $dbw, $scope ) = $this->getMasterDB();
- $cache = $this->cache;
- $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $scope ) {
+ $dbw = $this->getMasterDB();
+ $cache = $this->dupCache;
+ $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
$timestamp = $cache->get( $key ); // current last timestamp of this job
if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
return true; // a newer version of this root job was enqueued
* @return bool
*/
protected function doDelete() {
- list( $dbw, $scope ) = $this->getMasterDB();
-
+ $dbw = $this->getMasterDB();
try {
$dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
+
return true;
}
}
/**
- * @return Array
+ * @return array
*/
protected function doGetPeriodicTasks() {
return array(
* @return Iterator
*/
public function getAllQueuedJobs() {
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
try {
return new MappedIterator(
$dbr->select( 'job', '*',
array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
- function( $row ) use ( $scope ) {
+ function ( $row ) use ( $dbr ) {
$job = Job::factory(
$row->job_cmd,
Title::makeTitle( $row->job_namespace, $row->job_title ),
- strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
- $row->job_id
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
);
$job->metadata['id'] = $row->job_id;
- $job->id = $row->job_id; // XXX: work around broken subclasses
return $job;
}
);
}
}
+ public function getCoalesceLocationInternal() {
+ return $this->cluster
+ ? "DBCluster:{$this->cluster}:{$this->wiki}"
+ : "LBFactory:{$this->wiki}";
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ $dbr = $this->getSlaveDB();
+ $res = $dbr->select( 'job', 'DISTINCT job_cmd',
+ array( 'job_cmd' => $types ), __METHOD__ );
+
+ $types = array();
+ foreach ( $res as $row ) {
+ $types[] = $row->job_cmd;
+ }
+
+ return $types;
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $dbr = $this->getSlaveDB();
+ $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
+ array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
+
+ $sizes = array();
+ foreach ( $res as $row ) {
+ $sizes[$row->job_cmd] = (int)$row->count;
+ }
+
+ return $sizes;
+ }
+
/**
* Recycle or destroy any jobs that have been claimed for too long
*
- * @return integer Number of jobs recycled/deleted
+ * @return int Number of jobs recycled/deleted
*/
public function recycleAndDeleteStaleJobs() {
$now = time();
$count = 0; // affected rows
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
try {
if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
__METHOD__
);
$ids = array_map(
- function( $o ) {
+ function ( $o ) {
return $o->job_id;
}, iterator_to_array( $res )
);
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
$ids = array_map(
- function( $o ) {
+ function ( $o ) {
return $o->job_id;
}, iterator_to_array( $res )
);
}
/**
- * @return Array (DatabaseBase, ScopedCallback)
+ * @param Job $job
+ * @return array
+ */
+ protected function insertFields( Job $job ) {
+ $dbw = $this->getMasterDB();
+
+ return array(
+ // Fields that describe the nature of the job
+ 'job_cmd' => $job->getType(),
+ 'job_namespace' => $job->getTitle()->getNamespace(),
+ 'job_title' => $job->getTitle()->getDBkey(),
+ 'job_params' => self::makeBlob( $job->getParams() ),
+ // Additional job metadata
+ 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+ 'job_timestamp' => $dbw->timestamp(),
+ 'job_sha1' => wfBaseConvert(
+ sha1( serialize( $job->getDeduplicationInfo() ) ),
+ 16, 36, 31
+ ),
+ 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
+ );
+ }
+
+ /**
+ * @throws JobQueueConnectionError
+ * @return DBConnRef
*/
protected function getSlaveDB() {
try {
}
/**
- * @return Array (DatabaseBase, ScopedCallback)
+ * @throws JobQueueConnectionError
+ * @return DBConnRef
*/
protected function getMasterDB() {
try {
/**
* @param $index integer (DB_SLAVE/DB_MASTER)
- * @return Array (DatabaseBase, ScopedCallback)
+ * @return DBConnRef
*/
protected function getDB( $index ) {
$lb = ( $this->cluster !== false )
? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
: wfGetLB( $this->wiki );
- $conn = $lb->getConnection( $index, array(), $this->wiki );
- return array(
- $conn,
- new ScopedCallback( function() use ( $lb, $conn ) {
- $lb->reuseConnection( $conn );
- } )
- );
- }
- /**
- * @param $job Job
- * @return array
- */
- protected function insertFields( Job $job ) {
- list( $dbw, $scope ) = $this->getMasterDB();
- return array(
- // Fields that describe the nature of the job
- 'job_cmd' => $job->getType(),
- 'job_namespace' => $job->getTitle()->getNamespace(),
- 'job_title' => $job->getTitle()->getDBkey(),
- 'job_params' => self::makeBlob( $job->getParams() ),
- // Additional job metadata
- 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
- 'job_timestamp' => $dbw->timestamp(),
- 'job_sha1' => wfBaseConvert(
- sha1( serialize( $job->getDeduplicationInfo() ) ),
- 16, 36, 31
- ),
- 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
- );
+ return $lb->getConnectionRef( $index, array(), $this->wiki );
}
/**
+ * @param $property
* @return string
*/
private function getCacheKey( $property ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
$cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
+
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
}