X-Git-Url: https://git.cyclocoop.org/?a=blobdiff_plain;f=includes%2Fjob%2FJobQueueDB.php;h=79ff4e8662eac26a104a066ce66549d45fe00921;hb=21d7ac51875f1eb980eebcccf75eead05b64cd4d;hp=c39083df168d72ec3cba4d683531952b39be6aac;hpb=7c47d3f66a63e3699a2125890785976b10f5481e;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index c39083df16..79ff4e8662 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -37,7 +37,8 @@ class JobQueueDB extends JobQueue { /** @var BagOStuff */ protected $cache; - protected $cluster = false; // string; name of an external DB cluster + /** @var bool|string Name of an external DB cluster. False if not set */ + protected $cluster = false; /** * Additional parameters include: @@ -45,7 +46,7 @@ class JobQueueDB extends JobQueue { * If not specified, the primary DB cluster for the wiki will be used. * This can be overridden with a custom cluster so that DB handles will * be retrieved via LBFactory::getExternalLB() and getConnection(). - * @param $params array + * @param array $params */ protected function __construct( array $params ) { global $wgMemc; @@ -94,7 +95,7 @@ class JobQueueDB extends JobQueue { /** * @see JobQueue::doGetSize() - * @return integer + * @return int */ protected function doGetSize() { $key = $this->getCacheKey( 'size' ); @@ -120,7 +121,7 @@ class JobQueueDB extends JobQueue { /** * @see JobQueue::doGetAcquiredCount() - * @return integer + * @return int */ protected function doGetAcquiredCount() { if ( $this->claimTTL <= 0 ) { @@ -150,7 +151,7 @@ class JobQueueDB extends JobQueue { /** * @see JobQueue::doGetAbandonedCount() - * @return integer + * @return int * @throws MWException */ protected function doGetAbandonedCount() { @@ -198,7 +199,7 @@ class JobQueueDB extends JobQueue { $that = $this; $method = __METHOD__; $dbw->onTransactionIdle( - function() use ( $dbw, $that, $jobs, $flags, $method ) { + function () use ( $dbw, $that, $jobs, $flags, $method ) { $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); } ); @@ -209,12 +210,12 @@ class JobQueueDB extends JobQueue { /** * This function should *not* be called outside of JobQueueDB * - * @param DatabaseBase $dbw + * @param IDatabase $dbw * @param array $jobs * @param int $flags * @param string $method - * @return boolean - * @throws type + * @throws DBError + * @return bool */ public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { if ( !count( $jobs ) ) { @@ -258,8 +259,11 @@ class JobQueueDB extends JobQueue { $dbw->insert( 'job', $rowBatch, $method ); } JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) ); - JobQueue::incrStats( 'job-insert-duplicate', $this->type, - count( $rowSet ) + count( $rowList ) - count( $rows ) ); + JobQueue::incrStats( + 'job-insert-duplicate', + $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ) + ); } catch ( DBError $e ) { if ( $flags & self::QOS_ATOMIC ) { $dbw->rollback( $method ); @@ -289,7 +293,7 @@ class JobQueueDB extends JobQueue { $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting } ); @@ -320,7 +324,6 @@ class JobQueueDB extends JobQueue { $job = Job::factory( $row->job_cmd, $title, self::extractBlob( $row->job_params ), $row->job_id ); $job->metadata['id'] = $row->job_id; - $job->id = $row->job_id; // XXX: work around broken subclasses break; // done } while ( true ); } catch ( DBError $e ) { @@ -336,7 +339,7 @@ class JobQueueDB extends JobQueue { * @param string $uuid 32 char hex string * @param $rand integer Random unsigned integer (31 bits) * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) - * @return Row|false + * @return stdClass|bool Row|false */ protected function claimRandom( $uuid, $rand, $gte ) { $dbw = $this->getMasterDB(); @@ -386,6 +389,7 @@ class JobQueueDB extends JobQueue { continue; // use job_random } } + if ( $row ) { // claim the job $dbw->update( 'job', // update by PK array( @@ -412,7 +416,7 @@ class JobQueueDB extends JobQueue { * Reserve a row with a single UPDATE without holding row locks over RTTs... * * @param string $uuid 32 char hex string - * @return Row|false + * @return stdClass|bool Row|false */ protected function claimOldest( $uuid ) { $dbw = $this->getMasterDB(); @@ -485,7 +489,7 @@ class JobQueueDB extends JobQueue { $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting } ); @@ -520,7 +524,7 @@ class JobQueueDB extends JobQueue { // jobs to become no-ops without any actual jobs that made them redundant. $dbw = $this->getMasterDB(); $cache = $this->dupCache; - $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) { + $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) { $timestamp = $cache->get( $key ); // current last timestamp of this job if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { return true; // a newer version of this root job was enqueued @@ -544,6 +548,7 @@ class JobQueueDB extends JobQueue { } catch ( DBError $e ) { $this->throwDBException( $e ); } + return true; } @@ -556,7 +561,7 @@ class JobQueueDB extends JobQueue { } /** - * @return Array + * @return array */ protected function doGetPeriodicTasks() { return array( @@ -586,15 +591,13 @@ class JobQueueDB extends JobQueue { return new MappedIterator( $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function( $row ) use ( $dbr ) { + function ( $row ) use ( $dbr ) { $job = Job::factory( $row->job_cmd, Title::makeTitle( $row->job_namespace, $row->job_title ), - strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, - $row->job_id + strlen( $row->job_params ) ? unserialize( $row->job_params ) : false ); $job->metadata['id'] = $row->job_id; - $job->id = $row->job_id; // XXX: work around broken subclasses return $job; } ); @@ -618,6 +621,7 @@ class JobQueueDB extends JobQueue { foreach ( $res as $row ) { $types[] = $row->job_cmd; } + return $types; } @@ -630,13 +634,14 @@ class JobQueueDB extends JobQueue { foreach ( $res as $row ) { $sizes[$row->job_cmd] = (int)$row->count; } + return $sizes; } /** * Recycle or destroy any jobs that have been claimed for too long * - * @return integer Number of jobs recycled/deleted + * @return int Number of jobs recycled/deleted */ public function recycleAndDeleteStaleJobs() { $now = time(); @@ -663,7 +668,7 @@ class JobQueueDB extends JobQueue { __METHOD__ ); $ids = array_map( - function( $o ) { + function ( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); @@ -699,7 +704,7 @@ class JobQueueDB extends JobQueue { // the IDs first means that the UPDATE can be done by primary key (less deadlocks). $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); $ids = array_map( - function( $o ) { + function ( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); @@ -718,29 +723,31 @@ class JobQueueDB extends JobQueue { } /** - * @param $job Job + * @param Job $job * @return array */ protected function insertFields( Job $job ) { $dbw = $this->getMasterDB(); + return array( // Fields that describe the nature of the job - 'job_cmd' => $job->getType(), + 'job_cmd' => $job->getType(), 'job_namespace' => $job->getTitle()->getNamespace(), - 'job_title' => $job->getTitle()->getDBkey(), - 'job_params' => self::makeBlob( $job->getParams() ), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), // Additional job metadata - 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), + 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), 'job_timestamp' => $dbw->timestamp(), - 'job_sha1' => wfBaseConvert( + 'job_sha1' => wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ), - 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) ); } /** + * @throws JobQueueConnectionError * @return DBConnRef */ protected function getSlaveDB() { @@ -752,6 +759,7 @@ class JobQueueDB extends JobQueue { } /** + * @throws JobQueueConnectionError * @return DBConnRef */ protected function getMasterDB() { @@ -770,15 +778,18 @@ class JobQueueDB extends JobQueue { $lb = ( $this->cluster !== false ) ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) : wfGetLB( $this->wiki ); + return $lb->getConnectionRef( $index, array(), $this->wiki ); } /** + * @param $property * @return string */ private function getCacheKey( $property ) { list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property ); }