Merge "Add missing subjectspace pages to watchlist on update."
[lhc/web/wiklou.git] / includes / job / JobQueueDB.php
index c39083d..79ff4e8 100644 (file)
@@ -37,7 +37,8 @@ class JobQueueDB extends JobQueue {
        /** @var BagOStuff */
        protected $cache;
 
-       protected $cluster = false; // string; name of an external DB cluster
+       /** @var bool|string Name of an external DB cluster. False if not set */
+       protected $cluster = false;
 
        /**
         * Additional parameters include:
@@ -45,7 +46,7 @@ class JobQueueDB extends JobQueue {
         *               If not specified, the primary DB cluster for the wiki will be used.
         *               This can be overridden with a custom cluster so that DB handles will
         *               be retrieved via LBFactory::getExternalLB() and getConnection().
-        * @param $params array
+        * @param array $params
         */
        protected function __construct( array $params ) {
                global $wgMemc;
@@ -94,7 +95,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doGetSize()
-        * @return integer
+        * @return int
         */
        protected function doGetSize() {
                $key = $this->getCacheKey( 'size' );
@@ -120,7 +121,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doGetAcquiredCount()
-        * @return integer
+        * @return int
         */
        protected function doGetAcquiredCount() {
                if ( $this->claimTTL <= 0 ) {
@@ -150,7 +151,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doGetAbandonedCount()
-        * @return integer
+        * @return int
         * @throws MWException
         */
        protected function doGetAbandonedCount() {
@@ -198,7 +199,7 @@ class JobQueueDB extends JobQueue {
                $that = $this;
                $method = __METHOD__;
                $dbw->onTransactionIdle(
-                       function() use ( $dbw, $that, $jobs, $flags, $method ) {
+                       function () use ( $dbw, $that, $jobs, $flags, $method ) {
                                $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
                        }
                );
@@ -209,12 +210,12 @@ class JobQueueDB extends JobQueue {
        /**
         * This function should *not* be called outside of JobQueueDB
         *
-        * @param DatabaseBase $dbw
+        * @param IDatabase $dbw
         * @param array $jobs
         * @param int $flags
         * @param string $method
-        * @return boolean
-        * @throws type
+        * @throws DBError
+        * @return bool
         */
        public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
                if ( !count( $jobs ) ) {
@@ -258,8 +259,11 @@ class JobQueueDB extends JobQueue {
                                $dbw->insert( 'job', $rowBatch, $method );
                        }
                        JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
-                       JobQueue::incrStats( 'job-insert-duplicate', $this->type,
-                               count( $rowSet ) + count( $rowList ) - count( $rows ) );
+                       JobQueue::incrStats(
+                               'job-insert-duplicate',
+                               $this->type,
+                               count( $rowSet ) + count( $rowList ) - count( $rows )
+                       );
                } catch ( DBError $e ) {
                        if ( $flags & self::QOS_ATOMIC ) {
                                $dbw->rollback( $method );
@@ -289,7 +293,7 @@ class JobQueueDB extends JobQueue {
                        $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
                        $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
                        $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
-                       $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+                       $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
                                $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
                        } );
 
@@ -320,7 +324,6 @@ class JobQueueDB extends JobQueue {
                                $job = Job::factory( $row->job_cmd, $title,
                                        self::extractBlob( $row->job_params ), $row->job_id );
                                $job->metadata['id'] = $row->job_id;
-                               $job->id = $row->job_id; // XXX: work around broken subclasses
                                break; // done
                        } while ( true );
                } catch ( DBError $e ) {
@@ -336,7 +339,7 @@ class JobQueueDB extends JobQueue {
         * @param string $uuid 32 char hex string
         * @param $rand integer Random unsigned integer (31 bits)
         * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
-        * @return Row|false
+        * @return stdClass|bool Row|false
         */
        protected function claimRandom( $uuid, $rand, $gte ) {
                $dbw = $this->getMasterDB();
@@ -386,6 +389,7 @@ class JobQueueDB extends JobQueue {
                                        continue; // use job_random
                                }
                        }
+
                        if ( $row ) { // claim the job
                                $dbw->update( 'job', // update by PK
                                        array(
@@ -412,7 +416,7 @@ class JobQueueDB extends JobQueue {
         * Reserve a row with a single UPDATE without holding row locks over RTTs...
         *
         * @param string $uuid 32 char hex string
-        * @return Row|false
+        * @return stdClass|bool Row|false
         */
        protected function claimOldest( $uuid ) {
                $dbw = $this->getMasterDB();
@@ -485,7 +489,7 @@ class JobQueueDB extends JobQueue {
                        $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
                        $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
                        $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
-                       $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+                       $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
                                $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
                        } );
 
@@ -520,7 +524,7 @@ class JobQueueDB extends JobQueue {
                // jobs to become no-ops without any actual jobs that made them redundant.
                $dbw = $this->getMasterDB();
                $cache = $this->dupCache;
-               $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) {
+               $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
                        $timestamp = $cache->get( $key ); // current last timestamp of this job
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
@@ -544,6 +548,7 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
+
                return true;
        }
 
@@ -556,7 +561,7 @@ class JobQueueDB extends JobQueue {
        }
 
        /**
-        * @return Array
+        * @return array
         */
        protected function doGetPeriodicTasks() {
                return array(
@@ -586,15 +591,13 @@ class JobQueueDB extends JobQueue {
                        return new MappedIterator(
                                $dbr->select( 'job', '*',
                                        array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
-                               function( $row ) use ( $dbr ) {
+                               function ( $row ) use ( $dbr ) {
                                        $job = Job::factory(
                                                $row->job_cmd,
                                                Title::makeTitle( $row->job_namespace, $row->job_title ),
-                                               strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
-                                               $row->job_id
+                                               strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
                                        );
                                        $job->metadata['id'] = $row->job_id;
-                                       $job->id = $row->job_id; // XXX: work around broken subclasses
                                        return $job;
                                }
                        );
@@ -618,6 +621,7 @@ class JobQueueDB extends JobQueue {
                foreach ( $res as $row ) {
                        $types[] = $row->job_cmd;
                }
+
                return $types;
        }
 
@@ -630,13 +634,14 @@ class JobQueueDB extends JobQueue {
                foreach ( $res as $row ) {
                        $sizes[$row->job_cmd] = (int)$row->count;
                }
+
                return $sizes;
        }
 
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *
-        * @return integer Number of jobs recycled/deleted
+        * @return int Number of jobs recycled/deleted
         */
        public function recycleAndDeleteStaleJobs() {
                $now = time();
@@ -663,7 +668,7 @@ class JobQueueDB extends JobQueue {
                                        __METHOD__
                                );
                                $ids = array_map(
-                                       function( $o ) {
+                                       function ( $o ) {
                                                return $o->job_id;
                                        }, iterator_to_array( $res )
                                );
@@ -699,7 +704,7 @@ class JobQueueDB extends JobQueue {
                        // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
                        $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
                        $ids = array_map(
-                               function( $o ) {
+                               function ( $o ) {
                                        return $o->job_id;
                                }, iterator_to_array( $res )
                        );
@@ -718,29 +723,31 @@ class JobQueueDB extends JobQueue {
        }
 
        /**
-        * @param $job Job
+        * @param Job $job
         * @return array
         */
        protected function insertFields( Job $job ) {
                $dbw = $this->getMasterDB();
+
                return array(
                        // Fields that describe the nature of the job
-                       'job_cmd'       => $job->getType(),
+                       'job_cmd' => $job->getType(),
                        'job_namespace' => $job->getTitle()->getNamespace(),
-                       'job_title'     => $job->getTitle()->getDBkey(),
-                       'job_params'    => self::makeBlob( $job->getParams() ),
+                       'job_title' => $job->getTitle()->getDBkey(),
+                       'job_params' => self::makeBlob( $job->getParams() ),
                        // Additional job metadata
-                       'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+                       'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
                        'job_timestamp' => $dbw->timestamp(),
-                       'job_sha1'      => wfBaseConvert(
+                       'job_sha1' => wfBaseConvert(
                                sha1( serialize( $job->getDeduplicationInfo() ) ),
                                16, 36, 31
                        ),
-                       'job_random'    => mt_rand( 0, self::MAX_JOB_RANDOM )
+                       'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
                );
        }
 
        /**
+        * @throws JobQueueConnectionError
         * @return DBConnRef
         */
        protected function getSlaveDB() {
@@ -752,6 +759,7 @@ class JobQueueDB extends JobQueue {
        }
 
        /**
+        * @throws JobQueueConnectionError
         * @return DBConnRef
         */
        protected function getMasterDB() {
@@ -770,15 +778,18 @@ class JobQueueDB extends JobQueue {
                $lb = ( $this->cluster !== false )
                        ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
                        : wfGetLB( $this->wiki );
+
                return $lb->getConnectionRef( $index, array(), $this->wiki );
        }
 
        /**
+        * @param $property
         * @return string
         */
        private function getCacheKey( $property ) {
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
                $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
+
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
        }