Merge "Various dependency injection cleanups to LoadBalancer"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Wed, 14 Sep 2016 01:05:58 +0000 (01:05 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Wed, 14 Sep 2016 01:05:58 +0000 (01:05 +0000)
1  2 
includes/db/loadbalancer/LBFactory.php
includes/db/loadbalancer/LoadBalancer.php

@@@ -51,9 -51,7 +51,9 @@@ abstract class LBFactory implements Des
        /** @var callable[] */
        protected $replicationWaitCallbacks = [];
  
 -      const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
 +      const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
 +      const SHUTDOWN_CHRONPROT_ASYNC = 1; // save DB positions, but don't wait on remote DCs
 +      const SHUTDOWN_CHRONPROT_SYNC = 2; // save DB positions, waiting on all DCs
  
        /**
         * Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
@@@ -89,7 -87,7 +89,7 @@@
         * @see LoadBalancer::disable()
         */
        public function destroy() {
 -              $this->shutdown();
 +              $this->shutdown( self::SHUTDOWN_NO_CHRONPROT );
                $this->forEachLBCallMethod( 'disable' );
        }
  
         * @deprecated since 1.27, use LBFactory::destroy()
         */
        public static function destroyInstance() {
-               self::singleton()->destroy();
+               MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->destroy();
        }
  
        /**
  
        /**
         * Prepare all tracked load balancers for shutdown
 -       * @param integer $flags Supports SHUTDOWN_* flags
 -       */
 -      public function shutdown( $flags = 0 ) {
 -              if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
 -                      $this->shutdownChronologyProtector( $this->chronProt );
 +       * @param integer $mode One of the class SHUTDOWN_* constants
 +       * @param callable|null $workCallback Work to mask ChronologyProtector writes
 +       */
 +      public function shutdown(
 +              $mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
 +      ) {
 +              if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
 +                      $this->shutdownChronologyProtector( $this->chronProt, $workCallback, 'sync' );
 +              } elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
 +                      $this->shutdownChronologyProtector( $this->chronProt, null, 'async' );
                }
 +
                $this->commitMasterChanges( __METHOD__ ); // sanity
        }
  
  
        /**
         * Determine if any master connection has pending/written changes from this request
 +       * @param float $age How many seconds ago is "recent" [defaults to LB lag wait timeout]
         * @return bool
         * @since 1.27
         */
 -      public function hasOrMadeRecentMasterChanges() {
 +      public function hasOrMadeRecentMasterChanges( $age = null ) {
                $ret = false;
 -              $this->forEachLB( function ( LoadBalancer $lb ) use ( &$ret ) {
 -                      $ret = $ret || $lb->hasOrMadeRecentMasterChanges();
 +              $this->forEachLB( function ( LoadBalancer $lb ) use ( $age, &$ret ) {
 +                      $ret = $ret || $lb->hasOrMadeRecentMasterChanges( $age );
                } );
                return $ret;
        }
                        'ifWritesSince' => null
                ];
  
 -              foreach ( $this->replicationWaitCallbacks as $callback ) {
 -                      $callback();
 -              }
 -
                // Figure out which clusters need to be checked
                /** @var LoadBalancer[] $lbs */
                $lbs = [];
                        $masterPositions[$i] = $lb->getMasterPos();
                }
  
 +              // Run any listener callbacks *after* getting the DB positions. The more
 +              // time spent in the callbacks, the less time is spent in waitForAll().
 +              foreach ( $this->replicationWaitCallbacks as $callback ) {
 +                      $callback();
 +              }
 +
                $failed = [];
                foreach ( $lbs as $i => $lb ) {
                        if ( $masterPositions[$i] ) {
                        ObjectCache::getMainStashInstance(),
                        [
                                'ip' => $request->getIP(),
 -                              'agent' => $request->getHeader( 'User-Agent' )
 -                      ]
 +                              'agent' => $request->getHeader( 'User-Agent' ),
 +                      ],
 +                      $request->getFloat( 'cpPosTime', null )
                );
                if ( PHP_SAPI === 'cli' ) {
                        $chronProt->setEnabled( false );
        }
  
        /**
 +       * Get and record all of the staged DB positions into persistent memory storage
 +       *
         * @param ChronologyProtector $cp
 +       * @param callable|null $workCallback Work to do instead of waiting on syncing positions
 +       * @param string $mode One of (sync, async); whether to wait on remote datacenters
         */
 -      protected function shutdownChronologyProtector( ChronologyProtector $cp ) {
 -              // Get all the master positions needed
 +      protected function shutdownChronologyProtector(
 +              ChronologyProtector $cp, $workCallback, $mode
 +      ) {
 +              // Record all the master positions needed
                $this->forEachLB( function ( LoadBalancer $lb ) use ( $cp ) {
                        $cp->shutdownLB( $lb );
                } );
 -              // Write them to the stash
 -              $unsavedPositions = $cp->shutdown();
 +              // Write them to the persistent stash. Try to do something useful by running $work
 +              // while ChronologyProtector waits for the stash write to replicate to all DCs.
 +              $unsavedPositions = $cp->shutdown( $workCallback, $mode );
 +              if ( $unsavedPositions && $workCallback ) {
 +                      // Invoke callback in case it did not cache the result yet
 +                      $workCallback(); // work now to block for less time in waitForAll()
 +              }
                // If the positions failed to write to the stash, at least wait on local datacenter
                // replica DBs to catch up before responding. Even if there are several DCs, this increases
                // the chance that the user will see their own changes immediately afterwards. As long
                } );
        }
  
+       /**
+        * Base parameters to LoadBalancer::__construct()
+        */
+       final protected function baseLoadBalancerParams() {
+               return [
+                       'readOnlyReason' => $this->readOnlyReason,
+                       'trxProfiler' => $this->trxProfiler,
+                       'srvCache' => $this->srvCache,
+                       'wanCache' => $this->wanCache,
+                       'localDomain' => wfWikiID(),
+                       'errorLogger' => [ MWExceptionHandler::class, 'logException' ]
+               ];
+       }
        /**
         * @param LoadBalancer $lb
         */
                }
        }
  
 +      /**
 +       * Append ?cpPosTime parameter to a URL for ChronologyProtector purposes if needed
 +       *
 +       * Note that unlike cookies, this works accross domains
 +       *
 +       * @param string $url
 +       * @param float $time UNIX timestamp just before shutdown() was called
 +       * @return string
 +       * @since 1.28
 +       */
 +      public function appendPreShutdownTimeAsQuery( $url, $time ) {
 +              $usedCluster = 0;
 +              $this->forEachLB( function ( LoadBalancer $lb ) use ( &$usedCluster ) {
 +                      $usedCluster |= ( $lb->getServerCount() > 1 );
 +              } );
 +
 +              if ( !$usedCluster ) {
 +                      return $url; // no master/replica clusters touched
 +              }
 +
 +              return wfAppendQuery( $url, [ 'cpPosTime' => $time ] );
 +      }
 +
        /**
         * Close all open database connections on all open load balancers.
         * @since 1.28
        public function closeAll() {
                $this->forEachLBCallMethod( 'closeAll', [] );
        }
 -
  }
@@@ -74,6 -74,10 +74,10 @@@ class LoadBalancer 
        private $trxRoundId = false;
        /** @var array[] Map of (name => callable) */
        private $trxRecurringCallbacks = [];
+       /** @var string Local Wiki ID and default for selectDB() calls */
+       private $localDomain;
+       /** @var callable Exception logger */
+       private $errorLogger;
  
        /** @var integer Warn when this many connection are held */
        const CONN_HELD_WARN_THRESHOLD = 10;
         *  - waitTimeout : Maximum time to wait for replicas for consistency [optional]
         *  - srvCache : BagOStuff object [optional]
         *  - wanCache : WANObjectCache object [optional]
+        *  - localDomain: The wiki ID of the "local"/"current" wiki [optional]
+        *  - errorLogger: Callback that takes an Exception and logs it [optional]
         * @throws MWException
         */
        public function __construct( array $params ) {
                $this->mWaitTimeout = isset( $params['waitTimeout'] )
                        ? $params['waitTimeout']
                        : self::POS_WAIT_TIMEOUT;
+               $this->localDomain = isset( $params['localDomain'] ) ? $params['localDomain'] : '';
  
                $this->mReadIndex = -1;
                $this->mWriteIndex = -1;
                } else {
                        $this->trxProfiler = new TransactionProfiler();
                }
+               $this->errorLogger = isset( $params['errorLogger'] )
+                       ? $params['errorLogger']
+                       : function ( Exception $e ) {
+                               trigger_error( E_WARNING, $e->getMessage() );
+                       };
        }
  
        /**
         * @return bool|int|string
         */
        public function getReaderIndex( $group = false, $wiki = false ) {
-               global $wgDBtype;
-               # @todo FIXME: For now, only go through all this for mysql databases
-               if ( $wgDBtype != 'mysql' ) {
-                       return $this->getWriterIndex();
-               }
                if ( count( $this->mServers ) == 1 ) {
                        # Skip the load balancing if there's only one server
-                       return 0;
+                       return $this->getWriterIndex();
                } elseif ( $group === false && $this->mReadIndex >= 0 ) {
                        # Shortcut if generic reader exists already
                        return $this->mReadIndex;
                        throw new MWException( "Empty server array given to LoadBalancer" );
                }
  
-               # Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
+               # Scale the configured load ratios according to the dynamic load if supported
                $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
  
                $laggedReplicaMode = false;
                                ' with invalid server index' );
                }
  
-               if ( $wiki === wfWikiID() ) {
+               if ( $wiki === $this->localDomain ) {
                        $wiki = false;
                }
  
                if ( $this->connsOpened > $oldConnsOpened ) {
                        $host = $conn->getServer();
                        $dbname = $conn->getDBname();
-                       $trxProf = Profiler::instance()->getTransactionProfiler();
-                       $trxProf->recordConnection( $host, $dbname, $masterOnly );
+                       $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
                }
  
                if ( $masterOnly ) {
         * @return DBConnRef
         */
        public function getLazyConnectionRef( $db, $groups = [], $wiki = false ) {
+               $wiki = ( $wiki !== false ) ? $wiki : $this->localDomain;
                return new DBConnRef( $this, [ $db, $groups, $wiki ] );
        }
  
         * @return DatabaseBase
         */
        private function openForeignConnection( $i, $wiki ) {
-               list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
+               list( $dbName, $prefix ) = explode( '-', $wiki, 2 ) + [ '', '' ];
                if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
                        // Reuse an already-used connection
                        $conn = $this->mConns['foreignUsed'][$i][$wiki];
                                try {
                                        $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
                                } catch ( DBError $e ) {
-                                       MWExceptionHandler::logException( $e );
+                                       call_user_func( $this->errorLogger, $e );
                                        $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
                                }
                                if ( $restore && $conn->getLBInfo( 'master' ) ) {
                                try {
                                        $conn->flushSnapshot( $fname );
                                } catch ( DBError $e ) {
-                                       MWExceptionHandler::logException( $e );
+                                       call_user_func( $this->errorLogger, $e );
                                        $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
                                }
                                $conn->setTrxEndCallbackSuppression( false );
                                                $conn->flushSnapshot( $fname );
                                        }
                                } catch ( DBError $e ) {
-                                       MWExceptionHandler::logException( $e );
+                                       call_user_func( $this->errorLogger, $e );
                                        $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
                                }
                                if ( $restore ) {
         *
         * @param IDatabase $conn Replica DB
         * @param DBMasterPos|bool $pos Master position; default: current position
 -       * @param integer $timeout Timeout in seconds
 +       * @param integer|null $timeout Timeout in seconds [optional]
         * @return bool Success
         * @since 1.27
         */
 -      public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
 +      public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
                if ( $this->getServerCount() == 1 || !$conn->getLBInfo( 'replica' ) ) {
                        return true; // server is not a replica DB
                }
                        return false; // something is misconfigured
                }
  
 +              $timeout = $timeout ?: $this->mWaitTimeout;
                $result = $conn->masterPosWait( $pos, $timeout );
                if ( $result == -1 || is_null( $result ) ) {
                        $msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
                        }
                );
        }
+       /**
+        * Set a new table prefix for the existing local wiki ID for testing
+        *
+        * @param string $prefix
+        * @since 1.28
+        */
+       public function setDomainPrefix( $prefix ) {
+               list( $dbName, ) = explode( '-', $this->localDomain, 2 );
+               $this->localDomain = "{$dbName}-{$prefix}";
+       }
  }