From: Aaron Schulz Date: Fri, 21 Dec 2012 01:29:02 +0000 (-0800) Subject: [JobQueue] Added support for using an external DB. X-Git-Tag: 1.31.0-rc.0~21109 X-Git-Url: https://git.cyclocoop.org/%7B%24www_url%7Dadmin/compta/exercices/journal.php?a=commitdiff_plain;h=6ffc11730729c6c6cd12ad70ca46774139bae7a1;p=lhc%2Fweb%2Fwiklou.git [JobQueue] Added support for using an external DB. * Added a new ScopedCallback class to handle reuseConnection() easily. Change-Id: Icfab280f0173c2cb1ac44cc0bd45e9202600687b --- diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index fd3459063a..5c23c4df81 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -217,6 +217,7 @@ $wgAutoloadLocalClasses = array( 'Sanitizer' => 'includes/Sanitizer.php', 'DataUpdate' => 'includes/DataUpdate.php', 'SqlDataUpdate' => 'includes/SqlDataUpdate.php', + 'ScopedCallback' => 'includes/ScopedCallback.php', 'ScopedPHPTimeout' => 'includes/ScopedPHPTimeout.php', 'SiteConfiguration' => 'includes/SiteConfiguration.php', 'SiteStats' => 'includes/SiteStats.php', diff --git a/includes/ScopedCallback.php b/includes/ScopedCallback.php new file mode 100644 index 0000000000..1d5b26bfb7 --- /dev/null +++ b/includes/ScopedCallback.php @@ -0,0 +1,40 @@ +callback = $callback; + } + + function __destruct() { + call_user_func( $this->callback ); + } +} diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index e4972ebd59..7f4300ca33 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -35,6 +35,21 @@ class JobQueueDB extends JobQueue { const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random const MAX_OFFSET = 255; // integer; maximum number of rows to skip + protected $cluster = false; // string; name of an external DB cluster + + /** + * Additional parameters include: + * - cluster : The name of an external cluster registered via LBFactory. + * If not specified, the primary DB cluster for the wiki will be used. + * This can be overridden with a custom cluster so that DB handles will + * be retrieved via LBFactory::getExternalLB() and getConnection(). + * @param $params array + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; + } + /** * @see JobQueue::doIsEmpty() * @return bool @@ -51,7 +66,8 @@ class JobQueueDB extends JobQueue { return false; } - $found = $this->getSlaveDB()->selectField( // unclaimed job + list( $dbr, $scope ) = $this->getSlaveDB(); + $found = $dbr->selectField( // unclaimed job 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ ); $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); @@ -116,7 +132,7 @@ class JobQueueDB extends JobQueue { */ protected function doBatchPush( array $jobs, $flags ) { if ( count( $jobs ) ) { - $dbw = $this->getMasterDB(); + list( $dbw, $scope ) = $this->getMasterDB(); $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated $rowList = array(); // list of jobs for jobs that are are not de-duplicated @@ -135,7 +151,7 @@ class JobQueueDB extends JobQueue { $ttl = self::CACHE_TTL_LONG; $dbw->onTransactionIdle( - function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl + function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl, $scope ) { global $wgMemc; @@ -193,7 +209,7 @@ class JobQueueDB extends JobQueue { return false; // queue is empty } - $dbw = $this->getMasterDB(); + list( $dbw, $scope ) = $this->getMasterDB(); $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $uuid = wfRandomString( 32 ); // pop attempt @@ -249,7 +265,7 @@ class JobQueueDB extends JobQueue { protected function claimRandom( $uuid, $rand, $gte ) { global $wgMemc; - $dbw = $this->getMasterDB(); + list( $dbw, $scope ) = $this->getMasterDB(); // Check cache to see if the queue has <= OFFSET items $tinyQueue = $wgMemc->get( $this->getCacheKey( 'small' ) ); @@ -325,7 +341,7 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimOldest( $uuid ) { - $dbw = $this->getMasterDB(); + list( $dbw, $scope ) = $this->getMasterDB(); $row = false; // the row acquired do { @@ -386,7 +402,7 @@ class JobQueueDB extends JobQueue { */ protected function recycleStaleJobs() { $now = time(); - $dbw = $this->getMasterDB(); + list( $dbw, $scope ) = $this->getMasterDB(); $count = 0; // affected rows if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { @@ -459,7 +475,7 @@ class JobQueueDB extends JobQueue { throw new MWException( "Job of type '{$job->getType()}' has no ID." ); } - $dbw = $this->getMasterDB(); + list( $dbw, $scope ) = $this->getMasterDB(); $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction // Delete a row with a single DELETE without holding row locks over RTTs... @@ -488,7 +504,8 @@ class JobQueueDB extends JobQueue { // deferred till "transaction idle", do the same here, so that the ordering is // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. - $this->getMasterDB()->onTransactionIdle( function() use ( $params, $key ) { + list( $dbw, $scope ) = $this->getMasterDB(); + $dbw->onTransactionIdle( function() use ( $params, $key, $scope ) { global $wgMemc; $timestamp = $wgMemc->get( $key ); // current last timestamp of this job @@ -536,17 +553,34 @@ class JobQueueDB extends JobQueue { } /** - * @return DatabaseBase + * @return Array (DatabaseBase, ScopedCallback) */ protected function getSlaveDB() { - return wfGetDB( DB_SLAVE, array(), $this->wiki ); + return $this->getDB( DB_SLAVE ); } /** - * @return DatabaseBase + * @return Array (DatabaseBase, ScopedCallback) */ protected function getMasterDB() { - return wfGetDB( DB_MASTER, array(), $this->wiki ); + return $this->getDB( DB_MASTER ); + } + + /** + * @param $index integer (DB_SLAVE/DB_MASTER) + * @return Array (DatabaseBase, ScopedCallback) + */ + protected function getDB( $index ) { + $lb = ( $this->cluster !== false ) + ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) + : wfGetLB( $this->wiki ); + $conn = $lb->getConnection( $index ); + return array( + $conn, + new ScopedCallback( function() use ( $lb, $conn ) { + $lb->reuseConnection( $conn ); + } ) + ); } /** @@ -554,7 +588,7 @@ class JobQueueDB extends JobQueue { * @return array */ protected function insertFields( Job $job ) { - $dbw = $this->getMasterDB(); + list( $dbw, $scope ) = $this->getMasterDB(); return array( // Fields that describe the nature of the job 'job_cmd' => $job->getType(),