Merge "Add $wgMaxJobDBWriteDuration setting for avoiding replication lag"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Mon, 12 Jun 2017 18:15:57 +0000 (18:15 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Mon, 12 Jun 2017 18:15:57 +0000 (18:15 +0000)
1  2 
includes/DefaultSettings.php
includes/jobqueue/JobRunner.php

@@@ -6765,7 -6765,7 +6765,7 @@@ $wgUseRCPatrol = true
  /**
   * Whether to allow users to save their RecentChanges filters
   */
 -$wgStructuredChangeFiltersEnableSaving = false;
 +$wgStructuredChangeFiltersEnableSaving = true;
  
  /**
   * Use new page patrolling to check new pages on Special:Newpages
@@@ -8183,6 -8183,7 +8183,6 @@@ $wgPhpCli = '/usr/bin/php'
   * @note If multiple wikis are being served from the same process (e.g. the
   *  same fastCGI or Apache server), this setting must be the same on all those
   *  wikis.
 - * @see wfInitShellLocale()
   */
  $wgShellLocale = 'C.UTF-8';
  
@@@ -8559,6 -8560,15 +8559,15 @@@ $wgPopularPasswordFile = __DIR__ . '/..
   */
  $wgMaxUserDBWriteDuration = false;
  
+ /*
+  * Max time (in seconds) a job-generated transaction can spend in writes.
+  * If exceeded, the transaction is rolled back with an error instead of being committed.
+  *
+  * @var int|bool Disabled if false
+  * @since 1.30
+  */
+ $wgMaxJobDBWriteDuration = false;
  /**
   * Mapping of event channels (or channel categories) to EventRelayer configuration.
   *
@@@ -38,6 -38,8 +38,8 @@@ use Wikimedia\Rdbms\DBReplicationWaitEr
   * @since 1.24
   */
  class JobRunner implements LoggerAwareInterface {
+       /** @var Config */
+       protected $config;
        /** @var callable|null Debug output handler */
        protected $debug;
  
@@@ -74,6 -76,7 +76,7 @@@
                        $logger = LoggerFactory::getInstance( 'runJobs' );
                }
                $this->setLogger( $logger );
+               $this->config = MediaWikiServices::getInstance()->getMainConfig();
        }
  
        /**
         * @return array Summary response that can easily be JSON serialized
         */
        public function run( array $options ) {
-               global $wgJobClasses, $wgTrxProfilerLimits;
+               $jobClasses = $this->config->get( 'JobClasses' );
+               $profilerLimits = $this->config->get( 'TrxProfilerLimits' );
  
                $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
  
                $noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
  
                // Bail if job type is invalid
-               if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
+               if ( $type !== false && !isset( $jobClasses[$type] ) ) {
                        $response['reached'] = 'none-possible';
                        return $response;
                }
                // Catch huge single updates that lead to replica DB lag
                $trxProfiler = Profiler::instance()->getTransactionProfiler();
                $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
-               $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
+               $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
  
                // Some jobs types should not run until a certain timestamp
                $backoffs = []; // map of (type => UNIX expiry)
                        $status = $job->run();
                        $error = $job->getLastError();
                        $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
 +                      // Important: this must be the last deferred update added (T100085, T154425)
 +                      DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
                        // Run any deferred update tasks; doUpdates() manages transactions itself
                        DeferredUpdates::doUpdates();
                } catch ( Exception $e ) {
         * @see $wgJobBackoffThrottling
         */
        private function getBackoffTimeToWait( Job $job ) {
-               global $wgJobBackoffThrottling;
+               $throttling = $this->config->get( 'JobBackoffThrottling' );
  
-               if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
-                       $job instanceof DuplicateJob // no work was done
-               ) {
+               if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
                        return 0; // not throttled
                }
  
-               $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
+               $itemsPerSecond = $throttling[$job->getType()];
                if ( $itemsPerSecond <= 0 ) {
                        return 0; // not throttled
                }
         * @throws DBError
         */
        private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
-               global $wgJobSerialCommitThreshold;
+               $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
  
                $time = false;
                $lb = $lbFactory->getMainLB( wfWikiID() );
-               if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
+               if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
                        // Generally, there is one master connection to the local DB
                        $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
                        // We need natively blocking fast locks
                        if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
                                $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
-                               if ( $time < $wgJobSerialCommitThreshold ) {
+                               if ( $time < $syncThreshold ) {
                                        $dbwSerial = false;
                                }
                        } else {
                }
  
                if ( !$dbwSerial ) {
-                       $lbFactory->commitMasterChanges( $fnameTrxOwner );
+                       $lbFactory->commitMasterChanges(
+                               $fnameTrxOwner,
+                               // Abort if any transaction was too big
+                               [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+                       );
                        return;
                }
  
                }
  
                // Actually commit the DB master changes
-               $lbFactory->commitMasterChanges( $fnameTrxOwner );
+               $lbFactory->commitMasterChanges(
+                       $fnameTrxOwner,
+                       // Abort if any transaction was too big
+                       [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+               );
                ScopedCallback::consume( $unlocker );
        }
  }