From 08372236c99aead63f9a1565a1e4d9597201a729 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Mon, 7 Oct 2013 09:18:16 -0700 Subject: [PATCH] Made JobQueueDB use getConnectionRef() * Also moved insertFields() function up a bit about DB connection methods Change-Id: I7a9d9c9bc71038fe280542b13e0c0e94bfc4de1e --- includes/job/JobQueueDB.php | 99 +++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 53 deletions(-) diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 7840f14354..c39083df16 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -79,7 +79,7 @@ class JobQueueDB extends JobQueue { return false; } - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); try { $found = $dbr->selectField( // unclaimed job 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ @@ -105,7 +105,7 @@ class JobQueueDB extends JobQueue { } try { - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); $size = (int)$dbr->selectField( 'job', 'COUNT(*)', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ @@ -134,7 +134,7 @@ class JobQueueDB extends JobQueue { return $count; } - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); try { $count = (int)$dbr->selectField( 'job', 'COUNT(*)', array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), @@ -167,7 +167,7 @@ class JobQueueDB extends JobQueue { return $count; } - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); try { $count = (int)$dbr->selectField( 'job', 'COUNT(*)', array( @@ -193,12 +193,12 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doBatchPush( array $jobs, $flags ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); $that = $this; $method = __METHOD__; $dbw->onTransactionIdle( - function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) { + function() use ( $dbw, $that, $jobs, $flags, $method ) { $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); } ); @@ -216,7 +216,7 @@ class JobQueueDB extends JobQueue { * @return boolean * @throws type */ - public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) { + public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { if ( !count( $jobs ) ) { return true; } @@ -284,7 +284,7 @@ class JobQueueDB extends JobQueue { return false; // queue is empty } - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); try { $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting @@ -339,7 +339,7 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimRandom( $uuid, $rand, $gte ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); // Check cache to see if the queue has <= OFFSET items $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); @@ -415,7 +415,7 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimOldest( $uuid ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); $row = false; // the row acquired do { @@ -480,7 +480,7 @@ class JobQueueDB extends JobQueue { throw new MWException( "Job of type '{$job->getType()}' has no ID." ); } - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); try { $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting @@ -518,9 +518,9 @@ class JobQueueDB extends JobQueue { // deferred till "transaction idle", do the same here, so that the ordering is // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); $cache = $this->dupCache; - $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $scope ) { + $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) { $timestamp = $cache->get( $key ); // current last timestamp of this job if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { return true; // a newer version of this root job was enqueued @@ -538,8 +538,7 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doDelete() { - list( $dbw, $scope ) = $this->getMasterDB(); - + $dbw = $this->getMasterDB(); try { $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); } catch ( DBError $e ) { @@ -582,12 +581,12 @@ class JobQueueDB extends JobQueue { * @return Iterator */ public function getAllQueuedJobs() { - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); try { return new MappedIterator( $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function( $row ) use ( $scope ) { + function( $row ) use ( $dbr ) { $job = Job::factory( $row->job_cmd, Title::makeTitle( $row->job_namespace, $row->job_title ), @@ -611,7 +610,7 @@ class JobQueueDB extends JobQueue { } protected function doGetSiblingQueuesWithJobs( array $types ) { - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); $res = $dbr->select( 'job', 'DISTINCT job_cmd', array( 'job_cmd' => $types ), __METHOD__ ); @@ -623,7 +622,7 @@ class JobQueueDB extends JobQueue { } protected function doGetSiblingQueueSizes( array $types ) { - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); @@ -642,7 +641,7 @@ class JobQueueDB extends JobQueue { public function recycleAndDeleteStaleJobs() { $now = time(); $count = 0; // affected rows - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); try { if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { @@ -719,7 +718,30 @@ class JobQueueDB extends JobQueue { } /** - * @return Array (DatabaseBase, ScopedCallback) + * @param $job Job + * @return array + */ + protected function insertFields( Job $job ) { + $dbw = $this->getMasterDB(); + return array( + // Fields that describe the nature of the job + 'job_cmd' => $job->getType(), + 'job_namespace' => $job->getTitle()->getNamespace(), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), + // Additional job metadata + 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), + 'job_timestamp' => $dbw->timestamp(), + 'job_sha1' => wfBaseConvert( + sha1( serialize( $job->getDeduplicationInfo() ) ), + 16, 36, 31 + ), + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) + ); + } + + /** + * @return DBConnRef */ protected function getSlaveDB() { try { @@ -730,7 +752,7 @@ class JobQueueDB extends JobQueue { } /** - * @return Array (DatabaseBase, ScopedCallback) + * @return DBConnRef */ protected function getMasterDB() { try { @@ -742,42 +764,13 @@ class JobQueueDB extends JobQueue { /** * @param $index integer (DB_SLAVE/DB_MASTER) - * @return Array (DatabaseBase, ScopedCallback) + * @return DBConnRef */ protected function getDB( $index ) { $lb = ( $this->cluster !== false ) ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) : wfGetLB( $this->wiki ); - $conn = $lb->getConnection( $index, array(), $this->wiki ); - return array( - $conn, - new ScopedCallback( function() use ( $lb, $conn ) { - $lb->reuseConnection( $conn ); - } ) - ); - } - - /** - * @param $job Job - * @return array - */ - protected function insertFields( Job $job ) { - list( $dbw, $scope ) = $this->getMasterDB(); - return array( - // Fields that describe the nature of the job - 'job_cmd' => $job->getType(), - 'job_namespace' => $job->getTitle()->getNamespace(), - 'job_title' => $job->getTitle()->getDBkey(), - 'job_params' => self::makeBlob( $job->getParams() ), - // Additional job metadata - 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), - 'job_timestamp' => $dbw->timestamp(), - 'job_sha1' => wfBaseConvert( - sha1( serialize( $job->getDeduplicationInfo() ) ), - 16, 36, 31 - ), - 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) - ); + return $lb->getConnectionRef( $index, array(), $this->wiki ); } /** -- 2.20.1