return false;
}
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
try {
$found = $dbr->selectField( // unclaimed job
'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
}
try {
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
array( 'job_cmd' => $this->type, 'job_token' => '' ),
__METHOD__
return $count;
}
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
try {
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
return $count;
}
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
try {
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
array(
* @return bool
*/
protected function doBatchPush( array $jobs, $flags ) {
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
$that = $this;
$method = __METHOD__;
$dbw->onTransactionIdle(
- function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) {
+ function() use ( $dbw, $that, $jobs, $flags, $method ) {
$that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
}
);
* @return boolean
* @throws type
*/
- public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) {
+ public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
if ( !count( $jobs ) ) {
return true;
}
return false; // queue is empty
}
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
try {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
* @return Row|false
*/
protected function claimRandom( $uuid, $rand, $gte ) {
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
// Check cache to see if the queue has <= OFFSET items
$tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
* @return Row|false
*/
protected function claimOldest( $uuid ) {
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
$row = false; // the row acquired
do {
throw new MWException( "Job of type '{$job->getType()}' has no ID." );
}
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
try {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
// deferred till "transaction idle", do the same here, so that the ordering is
// maintained. Having only the de-duplication registration succeed would cause
// jobs to become no-ops without any actual jobs that made them redundant.
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
$cache = $this->dupCache;
- $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $scope ) {
+ $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) {
$timestamp = $cache->get( $key ); // current last timestamp of this job
if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
return true; // a newer version of this root job was enqueued
* @return bool
*/
protected function doDelete() {
- list( $dbw, $scope ) = $this->getMasterDB();
-
+ $dbw = $this->getMasterDB();
try {
$dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
} catch ( DBError $e ) {
* @return Iterator
*/
public function getAllQueuedJobs() {
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
try {
return new MappedIterator(
$dbr->select( 'job', '*',
array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
- function( $row ) use ( $scope ) {
+ function( $row ) use ( $dbr ) {
$job = Job::factory(
$row->job_cmd,
Title::makeTitle( $row->job_namespace, $row->job_title ),
}
protected function doGetSiblingQueuesWithJobs( array $types ) {
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
$res = $dbr->select( 'job', 'DISTINCT job_cmd',
array( 'job_cmd' => $types ), __METHOD__ );
}
protected function doGetSiblingQueueSizes( array $types ) {
- list( $dbr, $scope ) = $this->getSlaveDB();
+ $dbr = $this->getSlaveDB();
$res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
public function recycleAndDeleteStaleJobs() {
$now = time();
$count = 0; // affected rows
- list( $dbw, $scope ) = $this->getMasterDB();
+ $dbw = $this->getMasterDB();
try {
if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
}
/**
- * @return Array (DatabaseBase, ScopedCallback)
+ * @param $job Job
+ * @return array
+ */
+ protected function insertFields( Job $job ) {
+ $dbw = $this->getMasterDB();
+ return array(
+ // Fields that describe the nature of the job
+ 'job_cmd' => $job->getType(),
+ 'job_namespace' => $job->getTitle()->getNamespace(),
+ 'job_title' => $job->getTitle()->getDBkey(),
+ 'job_params' => self::makeBlob( $job->getParams() ),
+ // Additional job metadata
+ 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+ 'job_timestamp' => $dbw->timestamp(),
+ 'job_sha1' => wfBaseConvert(
+ sha1( serialize( $job->getDeduplicationInfo() ) ),
+ 16, 36, 31
+ ),
+ 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
+ );
+ }
+
+ /**
+ * @return DBConnRef
*/
protected function getSlaveDB() {
try {
}
/**
- * @return Array (DatabaseBase, ScopedCallback)
+ * @return DBConnRef
*/
protected function getMasterDB() {
try {
/**
* @param $index integer (DB_SLAVE/DB_MASTER)
- * @return Array (DatabaseBase, ScopedCallback)
+ * @return DBConnRef
*/
protected function getDB( $index ) {
$lb = ( $this->cluster !== false )
? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
: wfGetLB( $this->wiki );
- $conn = $lb->getConnection( $index, array(), $this->wiki );
- return array(
- $conn,
- new ScopedCallback( function() use ( $lb, $conn ) {
- $lb->reuseConnection( $conn );
- } )
- );
- }
-
- /**
- * @param $job Job
- * @return array
- */
- protected function insertFields( Job $job ) {
- list( $dbw, $scope ) = $this->getMasterDB();
- return array(
- // Fields that describe the nature of the job
- 'job_cmd' => $job->getType(),
- 'job_namespace' => $job->getTitle()->getNamespace(),
- 'job_title' => $job->getTitle()->getDBkey(),
- 'job_params' => self::makeBlob( $job->getParams() ),
- // Additional job metadata
- 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
- 'job_timestamp' => $dbw->timestamp(),
- 'job_sha1' => wfBaseConvert(
- sha1( serialize( $job->getDeduplicationInfo() ) ),
- 16, 36, 31
- ),
- 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
- );
+ return $lb->getConnectionRef( $index, array(), $this->wiki );
}
/**