Merge "Improve beginMasterChanges and make methods for DeferredUpdates"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Wed, 31 Aug 2016 23:44:01 +0000 (23:44 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Wed, 31 Aug 2016 23:44:01 +0000 (23:44 +0000)
1  2 
includes/db/DBConnRef.php
includes/db/Database.php
includes/db/IDatabase.php
includes/db/loadbalancer/LBFactory.php
includes/db/loadbalancer/LBFactoryMulti.php
includes/db/loadbalancer/LBFactorySimple.php
includes/db/loadbalancer/LoadBalancer.php

@@@ -103,7 -103,7 +103,7 @@@ class DBConnRef implements IDatabase 
                return $this->__call( __FUNCTION__, func_get_args() );
        }
  
 -      public function pendingWriteQueryDuration() {
 +      public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
  
                return $this->__call( __FUNCTION__, func_get_args() );
        }
  
+       public function setTransactionListener( $name, callable $callback = null ) {
+               return $this->__call( __FUNCTION__, func_get_args() );
+       }
        public function startAtomic( $fname = __METHOD__ ) {
                return $this->__call( __FUNCTION__, func_get_args() );
        }
                return $this->__call( __FUNCTION__, func_get_args() );
        }
  
 -      public function ping() {
 -              return $this->__call( __FUNCTION__, func_get_args() );
 +      public function ping( &$rtt = null ) {
 +              return func_num_args()
 +                      ? $this->__call( __FUNCTION__, [ &$rtt ] )
 +                      : $this->__call( __FUNCTION__, [] ); // method cares about null vs missing
        }
  
        public function getLag() {
diff --combined includes/db/Database.php
@@@ -39,11 -39,6 +39,11 @@@ abstract class DatabaseBase implements 
  
        /** How long before it is worth doing a dummy query to test the connection */
        const PING_TTL = 1.0;
 +      const PING_QUERY = 'SELECT 1 AS ping';
 +
 +      const TINY_WRITE_SEC = .010;
 +      const SLOW_WRITE_SEC = .500;
 +      const SMALL_WRITE_ROWS = 100;
  
        /** @var string SQL query */
        protected $mLastQuery = '';
        protected $mTrxPreCommitCallbacks = [];
        /** @var array[] List of (callable, method name) */
        protected $mTrxEndCallbacks = [];
-       /** @var bool Whether to suppress triggering of post-commit callbacks */
-       protected $suppressPostCommitCallbacks = false;
+       /** @var array[] Map of (name => (callable, method name)) */
+       protected $mTrxRecurringCallbacks = [];
+       /** @var bool Whether to suppress triggering of transaction end callbacks */
+       protected $mTrxEndCallbacksSuppressed = false;
  
        /** @var string */
        protected $mTablePrefix;
         * @var int
         */
        protected $mTrxLevel = 0;
 -
        /**
         * Either a short hexidecimal string if a transaction is active or ""
         *
         * @see DatabaseBase::mTrxLevel
         */
        protected $mTrxShortId = '';
 -
        /**
         * The UNIX time that the transaction started. Callers can assume that if
         * snapshot isolation is used, then the data is *at least* up to date to that
         * @see DatabaseBase::mTrxLevel
         */
        private $mTrxTimestamp = null;
 -
        /** @var float Lag estimate at the time of BEGIN */
        private $mTrxSlaveLag = null;
 -
        /**
         * Remembers the function name given for starting the most recent transaction via begin().
         * Used to provide additional context for error reporting.
         * @see DatabaseBase::mTrxLevel
         */
        private $mTrxFname = null;
 -
        /**
         * Record if possible write queries were done in the last transaction started
         *
         * @see DatabaseBase::mTrxLevel
         */
        private $mTrxDoneWrites = false;
 -
        /**
         * Record if the current transaction was started implicitly due to DBO_TRX being set.
         *
         * @see DatabaseBase::mTrxLevel
         */
        private $mTrxAutomatic = false;
 -
        /**
         * Array of levels of atomicity within transactions
         *
         * @var array
         */
        private $mTrxAtomicLevels = [];
 -
        /**
         * Record if the current transaction was started implicitly by DatabaseBase::startAtomic
         *
         * @var bool
         */
        private $mTrxAutomaticAtomic = false;
 -
        /**
         * Track the write query callers of the current transaction
         *
         * @var string[]
         */
        private $mTrxWriteCallers = [];
 -
        /**
 -       * Track the seconds spent in write queries for the current transaction
 -       *
 -       * @var float
 +       * @var float Seconds spent in write queries for the current transaction
         */
        private $mTrxWriteDuration = 0.0;
 +      /**
 +       * @var integer Number of write queries for the current transaction
 +       */
 +      private $mTrxWriteQueryCount = 0;
 +      /**
 +       * @var float Like mTrxWriteQueryCount but excludes lock-bound, easy to replicate, queries
 +       */
 +      private $mTrxWriteAdjDuration = 0.0;
 +      /**
 +       * @var integer Number of write queries counted in mTrxWriteAdjDuration
 +       */
 +      private $mTrxWriteAdjQueryCount = 0;
 +      /**
 +       * @var float RTT time estimate
 +       */
 +      private $mRTTEstimate = 0.0;
  
        /** @var array Map of (name => 1) for locks obtained via lock() */
        private $mNamedLocksHeld = [];
        /** @var int[] Prior mFlags values */
        private $priorFlags = [];
  
 +      /** @var Profiler */
 +      protected $profiler;
        /** @var TransactionProfiler */
        protected $trxProfiler;
  
         * @return TransactionProfiler
         */
        protected function getTransactionProfiler() {
 -              if ( !$this->trxProfiler ) {
 -                      $this->trxProfiler = new TransactionProfiler();
 -              }
 -
                return $this->trxProfiler;
        }
  
                );
        }
  
 -      public function pendingWriteQueryDuration() {
 -              return $this->mTrxLevel ? $this->mTrxWriteDuration : false;
 +      public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
 +              if ( !$this->mTrxLevel ) {
 +                      return false;
 +              } elseif ( !$this->mTrxDoneWrites ) {
 +                      return 0.0;
 +              }
 +
 +              switch ( $type ) {
 +                      case self::ESTIMATE_DB_APPLY:
 +                              $this->ping( $rtt );
 +                              $rttAdjTotal = $this->mTrxWriteAdjQueryCount * $rtt;
 +                              $applyTime = max( $this->mTrxWriteAdjDuration - $rttAdjTotal, 0 );
 +                              // For omitted queries, make them count as something at least
 +                              $omitted = $this->mTrxWriteQueryCount - $this->mTrxWriteAdjQueryCount;
 +                              $applyTime += self::TINY_WRITE_SEC * $omitted;
 +
 +                              return $applyTime;
 +                      default: // everything
 +                              return $this->mTrxWriteDuration;
 +              }
        }
  
        public function pendingWriteCallers() {
  
                $this->mForeign = $foreign;
  
 -              if ( isset( $params['trxProfiler'] ) ) {
 -                      $this->trxProfiler = $params['trxProfiler']; // override
 -              }
 +              $this->profiler = isset( $params['profiler'] )
 +                      ? $params['profiler']
 +                      : Profiler::instance(); // @TODO: remove global state
 +              $this->trxProfiler = isset( $params['trxProfiler'] )
 +                      ? $params['trxProfiler']
 +                      : new TransactionProfiler();
  
                if ( $user ) {
                        $this->open( $server, $user, $password, $dbName );
                }
 +
        }
  
        /**
         * @return bool
         */
        protected function isWriteQuery( $sql ) {
 -              return !preg_match( '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql );
 +              return !preg_match(
 +                      '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql );
 +      }
 +
 +      /**
 +       * @param $sql
 +       * @return string|null
 +       */
 +      protected function getQueryVerb( $sql ) {
 +              return preg_match( '/^\s*([a-z]+)/i', $sql, $m ) ? strtoupper( $m[1] ) : null;
        }
  
        /**
         * @return bool
         */
        protected function isTransactableQuery( $sql ) {
 -              $verb = substr( $sql, 0, strcspn( $sql, " \t\r\n" ) );
 -              return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ] );
 +              $verb = $this->getQueryVerb( $sql );
 +              return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ], true );
        }
  
        public function query( $sql, $fname = __METHOD__, $tempIgnore = false ) {
                # Include query transaction state
                $queryProf .= $this->mTrxShortId ? " [TRX#{$this->mTrxShortId}]" : "";
  
 -              $profiler = Profiler::instance();
 -              if ( !( $profiler instanceof ProfilerStub ) ) {
 -                      $queryProfSection = $profiler->scopedProfileIn( $queryProf );
 -              }
 -
                $startTime = microtime( true );
 +              $this->profiler->profileIn( $queryProf );
                $ret = $this->doQuery( $commentedSql );
 -              $queryRuntime = microtime( true ) - $startTime;
 +              $this->profiler->profileOut( $queryProf );
 +              $queryRuntime = max( microtime( true ) - $startTime, 0.0 );
  
                unset( $queryProfSection ); // profile out (if set)
  
                if ( $ret !== false ) {
                        $this->lastPing = $startTime;
                        if ( $isWrite && $this->mTrxLevel ) {
 -                              $this->mTrxWriteDuration += $queryRuntime;
 +                              $this->updateTrxWriteQueryTime( $sql, $queryRuntime );
                                $this->mTrxWriteCallers[] = $fname;
                        }
                }
  
 +              if ( $sql === self::PING_QUERY ) {
 +                      $this->mRTTEstimate = $queryRuntime;
 +              }
 +
                $this->getTransactionProfiler()->recordQueryCompletion(
                        $queryProf, $startTime, $isWrite, $this->affectedRows()
                );
                return $ret;
        }
  
 +      /**
 +       * Update the estimated run-time of a query, not counting large row lock times
 +       *
 +       * LoadBalancer can be set to rollback transactions that will create huge replication
 +       * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
 +       * queries, like inserting a row can take a long time due to row locking. This method
 +       * uses some simple heuristics to discount those cases.
 +       *
 +       * @param string $sql
 +       * @param float $runtime Total runtime, including RTT
 +       */
 +      private function updateTrxWriteQueryTime( $sql, $runtime ) {
 +              $indicativeOfSlaveRuntime = true;
 +              if ( $runtime > self::SLOW_WRITE_SEC ) {
 +                      $verb = $this->getQueryVerb( $sql );
 +                      // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
 +                      if ( $verb === 'INSERT' ) {
 +                              $indicativeOfSlaveRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS;
 +                      } elseif ( $verb === 'REPLACE' ) {
 +                              $indicativeOfSlaveRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS / 2;
 +                      }
 +              }
 +
 +              $this->mTrxWriteDuration += $runtime;
 +              $this->mTrxWriteQueryCount += 1;
 +              if ( $indicativeOfSlaveRuntime ) {
 +                      $this->mTrxWriteAdjDuration += $runtime;
 +                      $this->mTrxWriteAdjQueryCount += 1;
 +              }
 +      }
 +
        private function canRecoverFromDisconnect( $sql, $priorWritesPending ) {
                # Transaction dropped; this can mean lost writes, or REPEATABLE-READ snapshots.
                # Dropped connections also mean that named locks are automatically released.
                try {
                        // Handle callbacks in mTrxEndCallbacks
                        $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+                       $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
                        return null;
                } catch ( Exception $e ) {
                        // Already logged; move on...
                }
        }
  
+       final public function setTransactionListener( $name, callable $callback = null ) {
+               if ( $callback ) {
+                       $this->mTrxRecurringCallbacks[$name] = [ $callback, wfGetCaller() ];
+               } else {
+                       unset( $this->mTrxRecurringCallbacks[$name] );
+               }
+       }
        /**
-        * Whether to disable running of post-commit callbacks
+        * Whether to disable running of post-COMMIT/ROLLBACK callbacks
         *
         * This method should not be used outside of Database/LoadBalancer
         *
         * @param bool $suppress
         * @since 1.28
         */
-       final public function setPostCommitCallbackSupression( $suppress ) {
-               $this->suppressPostCommitCallbacks = $suppress;
+       final public function setTrxEndCallbackSuppression( $suppress ) {
+               $this->mTrxEndCallbacksSuppressed = $suppress;
        }
  
        /**
         * @throws Exception
         */
        public function runOnTransactionIdleCallbacks( $trigger ) {
-               if ( $this->suppressPostCommitCallbacks ) {
+               if ( $this->mTrxEndCallbacksSuppressed ) {
                        return;
                }
  
                }
        }
  
+       /**
+        * Actually run any "transaction listener" callbacks.
+        *
+        * This method should not be used outside of Database/LoadBalancer
+        *
+        * @param integer $trigger IDatabase::TRIGGER_* constant
+        * @throws Exception
+        * @since 1.20
+        */
+       public function runTransactionListenerCallbacks( $trigger ) {
+               if ( $this->mTrxEndCallbacksSuppressed ) {
+                       return;
+               }
+               /** @var Exception $e */
+               $e = null; // first exception
+               foreach ( $this->mTrxRecurringCallbacks as $callback ) {
+                       try {
+                               list( $phpCallback ) = $callback;
+                               $phpCallback( $trigger, $this );
+                       } catch ( Exception $ex ) {
+                               MWExceptionHandler::logException( $ex );
+                               $e = $e ?: $ex;
+                       }
+               }
+               if ( $e instanceof Exception ) {
+                       throw $e; // re-throw any first exception
+               }
+       }
        final public function startAtomic( $fname = __METHOD__ ) {
                if ( !$this->mTrxLevel ) {
                        $this->begin( $fname, self::TRANSACTION_INTERNAL );
                $this->mTrxAtomicLevels = [];
                $this->mTrxShortId = wfRandomString( 12 );
                $this->mTrxWriteDuration = 0.0;
 +              $this->mTrxWriteQueryCount = 0;
 +              $this->mTrxWriteAdjDuration = 0.0;
 +              $this->mTrxWriteAdjQueryCount = 0;
                $this->mTrxWriteCallers = [];
                // First SELECT after BEGIN will establish the snapshot in REPEATABLE-READ.
                // Get an estimate of the slave lag before then, treating estimate staleness
                }
  
                $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
+               $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
        }
  
        /**
                $this->mTrxIdleCallbacks = []; // clear
                $this->mTrxPreCommitCallbacks = []; // clear
                $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+               $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
        }
  
        /**
                }
        }
  
+       public function clearSnapshot( $fname = __METHOD__ ) {
+               if ( $this->writesOrCallbacksPending() || $this->explicitTrxActive() ) {
+                       // This only flushes transactions to clear snapshots, not to write data
+                       throw new DBUnexpectedError(
+                               $this,
+                               "$fname: Cannot COMMIT to clear snapshot because writes are pending."
+                       );
+               }
+               $this->commit( $fname, self::FLUSHING_INTERNAL );
+       }
        public function explicitTrxActive() {
                return $this->mTrxLevel && ( $this->mTrxAtomicLevels || !$this->mTrxAutomatic );
        }
                }
        }
  
 -      public function ping() {
 +      public function ping( &$rtt = null ) {
 +              // Avoid hitting the server if it was hit recently
                if ( $this->isOpen() && ( microtime( true ) - $this->lastPing ) < self::PING_TTL ) {
 -                      return true;
 +                      if ( !func_num_args() || $this->mRTTEstimate > 0 ) {
 +                              $rtt = $this->mRTTEstimate;
 +                              return true; // don't care about $rtt
 +                      }
                }
  
 -              $ignoreErrors = true;
 -              $this->clearFlag( DBO_TRX, self::REMEMBER_PRIOR );
                // This will reconnect if possible or return false if not
 -              $ok = (bool)$this->query( "SELECT 1 AS ping", __METHOD__, $ignoreErrors );
 +              $this->clearFlag( DBO_TRX, self::REMEMBER_PRIOR );
 +              $ok = ( $this->query( self::PING_QUERY, __METHOD__, true ) !== false );
                $this->restoreFlags( self::RESTORE_PRIOR );
  
 +              if ( $ok ) {
 +                      $rtt = $this->mRTTEstimate;
 +              }
 +
                return $ok;
        }
  
  interface IDatabase {
        /** @var int Callback triggered immediately due to no active transaction */
        const TRIGGER_IDLE = 1;
-       /** @var int Callback triggered by commit */
+       /** @var int Callback triggered by COMMIT */
        const TRIGGER_COMMIT = 2;
-       /** @var int Callback triggered by rollback */
+       /** @var int Callback triggered by ROLLBACK */
        const TRIGGER_ROLLBACK = 3;
  
        /** @var string Transaction is requested by regular caller outside of the DB layer */
        const TRANSACTION_EXPLICIT = '';
 -      /** @var string Transaction is requested interally via DBO_TRX/startAtomic() */
 +      /** @var string Transaction is requested internally via DBO_TRX/startAtomic() */
        const TRANSACTION_INTERNAL = 'implicit';
  
        /** @var string Transaction operation comes from service managing all DBs */
@@@ -50,7 -50,7 +50,7 @@@
        /** @var string Transaction operation comes from the database class internally */
        const FLUSHING_INTERNAL = 'flush';
  
 -      /** @var string No not remember the prior flags */
 +      /** @var string Do not remember the prior flags */
        const REMEMBER_NOTHING = '';
        /** @var string Remember the prior flags */
        const REMEMBER_PRIOR = 'remember';
        /** @var string Restore to the initial flag state */
        const RESTORE_INITIAL = 'initial';
  
 +      /** @var string Estimate total time (RTT, scanning, waiting on locks, applying) */
 +      const ESTIMATE_TOTAL = 'total';
 +      /** @var string Estimate time to apply (scanning, applying) */
 +      const ESTIMATE_DB_APPLY = 'apply';
 +
        /**
         * A string describing the current software version, and possibly
         * other details in a user-friendly way. Will be listed on Special:Version, etc.
        /**
         * Returns true if there is a transaction open with possible write
         * queries or transaction pre-commit/idle callbacks waiting on it to finish.
+        * This does *not* count recurring callbacks, e.g. from setTransactionListener().
         *
         * @return bool
         */
         *
         * High times could be due to scanning, updates, locking, and such
         *
 +       * @param string $type IDatabase::ESTIMATE_* constant [default: ESTIMATE_ALL]
         * @return float|bool Returns false if not transaction is active
         * @since 1.26
         */
 -      public function pendingWriteQueryDuration();
 +      public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL );
  
        /**
         * Get the list of method names that did write queries for this transaction
         * This is useful for combining cooperative locks and DB transactions.
         *
         * The callback takes one argument:
-        * How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK)
+        *   - How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK)
         *
         * @param callable $callback
         * @return mixed
         * Updates will execute in the order they were enqueued.
         *
         * The callback takes one argument:
-        * How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_IDLE)
+        *   - How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_IDLE)
         *
         * @param callable $callback
         * @since 1.20
         */
        public function onTransactionPreCommitOrIdle( callable $callback );
  
+       /**
+        * Run a callback each time any transaction commits or rolls back
+        *
+        * The callback takes two arguments:
+        *   - IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK
+        *   - This IDatabase object
+        * Callbacks must commit any transactions that they begin.
+        *
+        * Registering a callback here will not affect writesOrCallbacks() pending
+        *
+        * @param string $name Callback name
+        * @param callable|null $callback Use null to unset a listener
+        * @return mixed
+        * @since 1.28
+        */
+       public function setTransactionListener( $name, callable $callback = null );
        /**
         * Begin an atomic section of statements
         *
        /**
         * Ping the server and try to reconnect if it there is no connection
         *
 +       * @param float|null &$rtt Value to store the estimated RTT [optional]
         * @return bool Success or failure
         */
 -      public function ping();
 +      public function ping( &$rtt = null );
  
        /**
         * Get slave lag. Currently supported only by MySQL.
@@@ -44,8 -44,12 +44,12 @@@ abstract class LBFactory implements Des
  
        /** @var mixed */
        protected $ticket;
+       /** @var string|bool String if a requested DBO_TRX transaction round is active */
+       protected $trxRoundId = false;
        /** @var string|bool Reason all LBs are read-only or false if not */
        protected $readOnlyReason = false;
+       /** @var callable[] */
+       protected $replicationWaitCallbacks = [];
  
        const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
  
        /**
         * Prepare all tracked load balancers for shutdown
         * @param integer $flags Supports SHUTDOWN_* flags
 -       * STUB
         */
        public function shutdown( $flags = 0 ) {
 +              if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
 +                      $this->shutdownChronologyProtector( $this->chronProt );
 +              }
 +              $this->commitMasterChanges( __METHOD__ ); // sanity
        }
  
        /**
         * This allows for custom transaction rounds from any outer transaction scope.
         *
         * @param string $fname
+        * @throws DBTransactionError
         * @since 1.28
         */
        public function beginMasterChanges( $fname = __METHOD__ ) {
+               if ( $this->trxRoundId !== false ) {
+                       throw new DBTransactionError(
+                               null,
+                               "Transaction round '{$this->trxRoundId}' already started."
+                       );
+               }
+               $this->trxRoundId = $fname;
+               // Set DBO_TRX flags on all appropriate DBs
                $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
        }
  
         * @throws Exception
         */
        public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
-               // Perform all pre-commit callbacks, aborting on failure
-               $this->forEachLBCallMethod( 'runMasterPreCommitCallbacks' );
-               // Perform all pre-commit checks, aborting on failure
+               // Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
+               $this->forEachLBCallMethod( 'finalizeMasterChanges' );
+               $this->trxRoundId = false;
+               // Perform pre-commit checks, aborting on failure
                $this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
                // Log the DBs and methods involved in multi-DB transactions
                $this->logIfMultiDbTransaction();
-               // Actually perform the commit on all master DB connections
+               // Actually perform the commit on all master DB connections and revert DBO_TRX
                $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
                // Run all post-commit callbacks
                /** @var Exception $e */
                $e = null; // first callback exception
                $this->forEachLB( function ( LoadBalancer $lb ) use ( &$e ) {
-                       $ex = $lb->runMasterPostCommitCallbacks();
+                       $ex = $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_COMMIT );
                        $e = $e ?: $ex;
                } );
                // Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
         * @since 1.23
         */
        public function rollbackMasterChanges( $fname = __METHOD__ ) {
+               $this->trxRoundId = false;
+               $this->forEachLBCallMethod( 'suppressTransactionEndCallbacks' );
                $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
+               // Run all post-rollback callbacks
+               $this->forEachLB( function ( LoadBalancer $lb ) {
+                       $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_ROLLBACK );
+               } );
        }
  
        /**
                        'ifWritesSince' => null
                ];
  
+               foreach ( $this->replicationWaitCallbacks as $callback ) {
+                       $callback();
+               }
                // Figure out which clusters need to be checked
                /** @var LoadBalancer[] $lbs */
                $lbs = [];
                }
        }
  
+       /**
+        * Add a callback to be run in every call to waitForReplication() before waiting
+        *
+        * Callbacks must clear any transactions that they start
+        *
+        * @param string $name Callback name
+        * @param callable|null $callback Use null to unset a callback
+        * @since 1.28
+        */
+       public function setWaitForReplicationListener( $name, callable $callback = null ) {
+               if ( $callback ) {
+                       $this->replicationWaitCallbacks[$name] = $callback;
+               } else {
+                       unset( $this->replicationWaitCallbacks[$name] );
+               }
+       }
        /**
         * Get a token asserting that no transaction writes are active
         *
                } );
        }
  
+       /**
+        * @param LoadBalancer $lb
+        */
+       protected function initLoadBalancer( LoadBalancer $lb ) {
+               if ( $this->trxRoundId !== false ) {
+                       $lb->beginMasterChanges( $this->trxRoundId ); // set DBO_TRX
+               }
+       }
        /**
         * Close all open database connections on all open load balancers.
         * @since 1.28
        }
  
  }
 -
 -/**
 - * Exception class for attempted DB access
 - */
 -class DBAccessError extends MWException {
 -      public function __construct() {
 -              parent::__construct( "Mediawiki tried to access the database via wfGetDB(). " .
 -                      "This is not allowed, because database access has been disabled." );
 -      }
 -}
 -
 -/**
 - * Exception class for replica DB wait timeouts
 - */
 -class DBReplicationWaitError extends Exception {
 -}
@@@ -313,7 -313,7 +313,7 @@@ class LBFactoryMulti extends LBFactory 
         * @return LoadBalancer
         */
        private function newLoadBalancer( $template, $loads, $groupLoads, $readOnlyReason ) {
-               return new LoadBalancer( [
+               $lb = new LoadBalancer( [
                        'servers' => $this->makeServerArray( $template, $loads, $groupLoads ),
                        'loadMonitor' => $this->loadMonitorClass,
                        'readOnlyReason' => $readOnlyReason,
                        'srvCache' => $this->srvCache,
                        'wanCache' => $this->wanCache
                ] );
+               $this->initLoadBalancer( $lb );
+               return $lb;
        }
  
        /**
                        call_user_func_array( $callback, array_merge( [ $lb ], $params ) );
                }
        }
 -
 -      public function shutdown( $flags = 0 ) {
 -              if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
 -                      $this->shutdownChronologyProtector( $this->chronProt );
 -              }
 -              $this->commitMasterChanges( __METHOD__ ); // sanity
 -      }
  }
@@@ -133,7 -133,7 +133,7 @@@ class LBFactorySimple extends LBFactor
        }
  
        private function newLoadBalancer( array $servers ) {
-               return new LoadBalancer( [
+               $lb = new LoadBalancer( [
                        'servers' => $servers,
                        'loadMonitor' => $this->loadMonitorClass,
                        'readOnlyReason' => $this->readOnlyReason,
                        'srvCache' => $this->srvCache,
                        'wanCache' => $this->wanCache
                ] );
+               $this->initLoadBalancer( $lb );
+               return $lb;
        }
  
        /**
                        call_user_func_array( $callback, array_merge( [ $lb ], $params ) );
                }
        }
 -
 -      public function shutdown( $flags = 0 ) {
 -              if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
 -                      $this->shutdownChronologyProtector( $this->chronProt );
 -              }
 -              $this->commitMasterChanges( __METHOD__ ); // sanity
 -      }
  }
@@@ -51,6 -51,8 +51,8 @@@ class LoadBalancer 
        private $srvCache;
        /** @var WANObjectCache */
        private $wanCache;
+       /** @var TransactionProfiler */
+       protected $trxProfiler;
  
        /** @var bool|DatabaseBase Database connection that caused a problem */
        private $mErrorConnection;
@@@ -68,9 -70,8 +70,8 @@@
        private $readOnlyReason = false;
        /** @var integer Total connections opened */
        private $connsOpened = 0;
-       /** @var TransactionProfiler */
-       protected $trxProfiler;
+       /** @var string|bool String if a requested DBO_TRX transaction round is active */
+       private $trxRoundId = false;
  
        /** @var integer Warn when this many connection are held */
        const CONN_HELD_WARN_THRESHOLD = 10;
                        $this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
                );
                $db->setTransactionProfiler( $this->trxProfiler );
+               if ( $this->trxRoundId !== false ) {
+                       $this->applyTransactionRoundFlags( $db );
+               }
  
                return $db;
        }
        /**
         * Commit transactions on all open connections
         * @param string $fname Caller name
+        * @throws DBExpectedError
         */
        public function commitAll( $fname = __METHOD__ ) {
-               $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-               } );
+               $failures = [];
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+                               try {
+                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               if ( $restore && $conn->getLBInfo( 'master' ) ) {
+                                       $this->undoTransactionRoundFlags( $conn );
+                               }
+                       }
+               );
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
        }
  
        /**
         * Perform all pre-commit callbacks that remain part of the atomic transactions
-        * and disable any post-commit callbacks until runMasterPostCommitCallbacks()
+        * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
         * @since 1.28
         */
-       public function runMasterPreCommitCallbacks() {
+       public function finalizeMasterChanges() {
                $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
-                       // Any error will cause all DB transactions to be rolled back together.
+                       // Any error should cause all DB transactions to be rolled back together
+                       $conn->setTrxEndCallbackSuppression( false );
                        $conn->runOnTransactionPreCommitCallbacks();
-                       // Defer post-commit callbacks until COMMIT finishes for all DBs.
-                       $conn->setPostCommitCallbackSupression( true );
+                       // Defer post-commit callbacks until COMMIT finishes for all DBs
+                       $conn->setTrxEndCallbackSuppression( true );
                } );
        }
  
                        }
                        // Assert that the time to replicate the transaction will be sane.
                        // If this fails, then all DB transactions will be rollback back together.
 -                      $time = $conn->pendingWriteQueryDuration();
 +                      $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
                        if ( $limit > 0 && $time > $limit ) {
                                throw new DBTransactionError(
                                        $conn,
         * This allows for custom transaction rounds from any outer transaction scope.
         *
         * @param string $fname
+        * @throws DBExpectedError
         * @since 1.28
         */
        public function beginMasterChanges( $fname = __METHOD__ ) {
-               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       if ( $conn->writesOrCallbacksPending() ) {
-                               throw new DBTransactionError(
-                                       $conn,
-                                       "Transaction with pending writes still active."
-                               );
-                       } elseif ( $conn->trxLevel() ) {
-                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
-                       }
-                       if ( $conn->getFlag( DBO_DEFAULT ) ) {
-                               // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
-                               // Force DBO_TRX even in CLI mode since a commit round is expected soon.
-                               $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
-                               $conn->onTransactionResolution( function () use ( $conn ) {
-                                       $conn->restoreFlags( $conn::RESTORE_PRIOR );
-                               } );
-                       } else {
-                               // Config has explicitly requested DBO_TRX be either on or off; respect that.
-                               // This is useful for things like blob stores which use auto-commit mode.
+               if ( $this->trxRoundId !== false ) {
+                       throw new DBTransactionError(
+                               null,
+                               "$fname: Transaction round '{$this->trxRoundId}' already started."
+                       );
+               }
+               $this->trxRoundId = $fname;
+               $failures = [];
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
+                               $conn->setTrxEndCallbackSuppression( true );
+                               try {
+                                       $conn->clearSnapshot( $fname );
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               $conn->setTrxEndCallbackSuppression( false );
+                               $this->applyTransactionRoundFlags( $conn );
                        }
-               } );
+               );
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
        }
  
        /**
         * Issue COMMIT on all master connections where writes where done
         * @param string $fname Caller name
+        * @throws DBExpectedError
         */
        public function commitMasterChanges( $fname = __METHOD__ ) {
-               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       if ( $conn->writesOrCallbacksPending() ) {
-                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+               $failures = [];
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+                               try {
+                                       if ( $conn->writesOrCallbacksPending() ) {
+                                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+                                       } elseif ( $restore ) {
+                                               $conn->clearSnapshot( $fname );
+                                       }
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               if ( $restore ) {
+                                       $this->undoTransactionRoundFlags( $conn );
+                               }
                        }
-               } );
+               );
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
        }
  
        /**
-        * Issue all pending post-commit callbacks
+        * Issue all pending post-COMMIT/ROLLBACK callbacks
+        * @param integer $type IDatabase::TRIGGER_* constant
         * @return Exception|null The first exception or null if there were none
         * @since 1.28
         */
-       public function runMasterPostCommitCallbacks() {
+       public function runMasterPostTrxCallbacks( $type ) {
                $e = null; // first exception
-               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( &$e ) {
-                       $conn->setPostCommitCallbackSupression( false );
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
+                       $conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
+                       $conn->setTrxEndCallbackSuppression( false );
                        try {
-                               $conn->runOnTransactionIdleCallbacks( $conn::TRIGGER_COMMIT );
+                               $conn->runOnTransactionIdleCallbacks( $type );
+                       } catch ( Exception $ex ) {
+                               $e = $e ?: $ex;
+                       }
+                       try {
+                               $conn->runTransactionListenerCallbacks( $type );
                        } catch ( Exception $ex ) {
                                $e = $e ?: $ex;
                        }
         * @since 1.23
         */
        public function rollbackMasterChanges( $fname = __METHOD__ ) {
-               $failedServers = [];
-               $masterIndex = $this->getWriterIndex();
-               foreach ( $this->mConns as $conns2 ) {
-                       if ( empty( $conns2[$masterIndex] ) ) {
-                               continue;
-                       }
-                       /** @var DatabaseBase $conn */
-                       foreach ( $conns2[$masterIndex] as $conn ) {
-                               if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
-                                       try {
-                                               $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
-                                       } catch ( DBError $e ) {
-                                               MWExceptionHandler::logException( $e );
-                                               $failedServers[] = $conn->getServer();
-                                       }
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore ) {
+                               if ( $conn->writesOrCallbacksPending() ) {
+                                       $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+                               }
+                               if ( $restore ) {
+                                       $this->undoTransactionRoundFlags( $conn );
                                }
                        }
+               );
+       }
+       /**
+        * Suppress all pending post-COMMIT/ROLLBACK callbacks
+        * @return Exception|null The first exception or null if there were none
+        * @since 1.28
+        */
+       public function suppressTransactionEndCallbacks() {
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
+                       $conn->setTrxEndCallbackSuppression( true );
+               } );
+       }
+       /**
+        * @param DatabaseBase $conn
+        */
+       private function applyTransactionRoundFlags( DatabaseBase $conn ) {
+               if ( $conn->getFlag( DBO_DEFAULT ) ) {
+                       // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
+                       // Force DBO_TRX even in CLI mode since a commit round is expected soon.
+                       $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
+                       // If config has explicitly requested DBO_TRX be either on or off by not
+                       // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
+                       // for things like blob stores (ExternalStore) which want auto-commit mode.
                }
+       }
  
-               if ( $failedServers ) {
-                       throw new DBExpectedError( null, "Rollback failed on server(s) " .
-                               implode( ', ', array_unique( $failedServers ) ) );
+       /**
+        * @param DatabaseBase $conn
+        */
+       private function undoTransactionRoundFlags( DatabaseBase $conn ) {
+               if ( $conn->getFlag( DBO_DEFAULT ) ) {
+                       $conn->restoreFlags( $conn::RESTORE_PRIOR );
                }
        }