Merge "Add flushReplicaSnapshots() method for just clearing snapshots"
[lhc/web/wiklou.git] / includes / db / loadbalancer / LoadBalancer.php
index f045acf..8db1085 100644 (file)
@@ -51,6 +51,8 @@ class LoadBalancer {
        private $srvCache;
        /** @var WANObjectCache */
        private $wanCache;
+       /** @var TransactionProfiler */
+       protected $trxProfiler;
 
        /** @var bool|DatabaseBase Database connection that caused a problem */
        private $mErrorConnection;
@@ -68,9 +70,10 @@ class LoadBalancer {
        private $readOnlyReason = false;
        /** @var integer Total connections opened */
        private $connsOpened = 0;
-
-       /** @var TransactionProfiler */
-       protected $trxProfiler;
+       /** @var string|bool String if a requested DBO_TRX transaction round is active */
+       private $trxRoundId = false;
+       /** @var array[] Map of (name => callable) */
+       private $trxRecurringCallbacks = [];
 
        /** @var integer Warn when this many connection are held */
        const CONN_HELD_WARN_THRESHOLD = 10;
@@ -610,7 +613,6 @@ class LoadBalancer {
                $serverIndex = $conn->getLBInfo( 'serverIndex' );
                $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
                if ( $serverIndex === null || $refCount === null ) {
-                       wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" );
                        /**
                         * This can happen in code like:
                         *   foreach ( $dbs as $db ) {
@@ -864,6 +866,15 @@ class LoadBalancer {
                        $this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
                );
                $db->setTransactionProfiler( $this->trxProfiler );
+               if ( $this->trxRoundId !== false ) {
+                       $this->applyTransactionRoundFlags( $db );
+               }
+
+               if ( $server['serverIndex'] === $this->getWriterIndex() ) {
+                       foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
+                               $db->setTransactionListener( $name, $callback );
+                       }
+               }
 
                return $db;
        }
@@ -1059,24 +1070,47 @@ class LoadBalancer {
        /**
         * Commit transactions on all open connections
         * @param string $fname Caller name
+        * @throws DBExpectedError
         */
        public function commitAll( $fname = __METHOD__ ) {
-               $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       $conn->commit( $fname, IDatabase::FLUSHING_ALL_PEERS );
-               } );
+               $failures = [];
+
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+                               try {
+                                       $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               if ( $restore && $conn->getLBInfo( 'master' ) ) {
+                                       $this->undoTransactionRoundFlags( $conn );
+                               }
+                       }
+               );
+
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
        }
 
        /**
         * Perform all pre-commit callbacks that remain part of the atomic transactions
-        * and disable any post-commit callbacks until runMasterPostCommitCallbacks()
+        * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
         * @since 1.28
         */
-       public function runMasterPreCommitCallbacks() {
+       public function finalizeMasterChanges() {
                $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
-                       // Any error will cause all DB transactions to be rolled back together.
+                       // Any error should cause all DB transactions to be rolled back together
+                       $conn->setTrxEndCallbackSuppression( false );
                        $conn->runOnTransactionPreCommitCallbacks();
-                       // Defer post-commit callbacks until COMMIT finishes for all DBs.
-                       $conn->setPostCommitCallbackSupression( true );
+                       // Defer post-commit callbacks until COMMIT finishes for all DBs
+                       $conn->setTrxEndCallbackSuppression( true );
                } );
        }
 
@@ -1090,7 +1124,7 @@ class LoadBalancer {
        public function approveMasterChanges( array $options ) {
                $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
                $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $limit ) {
-                       // If atomic section or explicit transactions are still open, some caller must have
+                       // If atomic sections or explicit transactions are still open, some caller must have
                        // caught an exception but failed to properly rollback any changes. Detect that and
                        // throw and error (causing rollback).
                        if ( $conn->explicitTrxActive() ) {
@@ -1101,39 +1135,124 @@ class LoadBalancer {
                        }
                        // Assert that the time to replicate the transaction will be sane.
                        // If this fails, then all DB transactions will be rollback back together.
-                       $time = $conn->pendingWriteQueryDuration();
+                       $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
                        if ( $limit > 0 && $time > $limit ) {
                                throw new DBTransactionError(
                                        $conn,
                                        wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
                                );
                        }
+                       // If a connection sits idle while slow queries execute on another, that connection
+                       // may end up dropped before the commit round is reached. Ping servers to detect this.
+                       if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
+                               throw new DBTransactionError(
+                                       $conn,
+                                       "A connection to the {$conn->getDBname()} database was lost before commit."
+                               );
+                       }
                } );
        }
 
+       /**
+        * Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
+        *
+        * The DBO_TRX setting will be reverted to the default in each of these methods:
+        *   - commitMasterChanges()
+        *   - rollbackMasterChanges()
+        *   - commitAll()
+        * This allows for custom transaction rounds from any outer transaction scope.
+        *
+        * @param string $fname
+        * @throws DBExpectedError
+        * @since 1.28
+        */
+       public function beginMasterChanges( $fname = __METHOD__ ) {
+               if ( $this->trxRoundId !== false ) {
+                       throw new DBTransactionError(
+                               null,
+                               "$fname: Transaction round '{$this->trxRoundId}' already started."
+                       );
+               }
+               $this->trxRoundId = $fname;
+
+               $failures = [];
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
+                               $conn->setTrxEndCallbackSuppression( true );
+                               try {
+                                       $conn->clearSnapshot( $fname );
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               $conn->setTrxEndCallbackSuppression( false );
+                               $this->applyTransactionRoundFlags( $conn );
+                       }
+               );
+
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
+       }
+
        /**
         * Issue COMMIT on all master connections where writes where done
         * @param string $fname Caller name
+        * @throws DBExpectedError
         */
        public function commitMasterChanges( $fname = __METHOD__ ) {
-               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       if ( $conn->writesOrCallbacksPending() ) {
-                               $conn->commit( $fname, IDatabase::FLUSHING_ALL_PEERS );
+               $failures = [];
+
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+                               try {
+                                       if ( $conn->writesOrCallbacksPending() ) {
+                                               $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+                                       } elseif ( $restore ) {
+                                               $conn->clearSnapshot( $fname );
+                                       }
+                               } catch ( DBError $e ) {
+                                       MWExceptionHandler::logException( $e );
+                                       $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+                               }
+                               if ( $restore ) {
+                                       $this->undoTransactionRoundFlags( $conn );
+                               }
                        }
-               } );
+               );
+
+               if ( $failures ) {
+                       throw new DBExpectedError(
+                               null,
+                               "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+                       );
+               }
        }
 
        /**
-        * Issue all pending post-commit callbacks
+        * Issue all pending post-COMMIT/ROLLBACK callbacks
+        * @param integer $type IDatabase::TRIGGER_* constant
         * @return Exception|null The first exception or null if there were none
         * @since 1.28
         */
-       public function runMasterPostCommitCallbacks() {
+       public function runMasterPostTrxCallbacks( $type ) {
                $e = null; // first exception
-               $this->forEachOpenMasterConnection( function ( DatabaseBase $db ) use ( &$e ) {
-                       $db->setPostCommitCallbackSupression( false );
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
+                       $conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
+
+                       $conn->setTrxEndCallbackSuppression( false );
                        try {
-                               $db->runOnTransactionIdleCallbacks( IDatabase::TRIGGER_COMMIT );
+                               $conn->runOnTransactionIdleCallbacks( $type );
+                       } catch ( Exception $ex ) {
+                               $e = $e ?: $ex;
+                       }
+                       try {
+                               $conn->runTransactionListenerCallbacks( $type );
                        } catch ( Exception $ex ) {
                                $e = $e ?: $ex;
                        }
@@ -1149,32 +1268,66 @@ class LoadBalancer {
         * @since 1.23
         */
        public function rollbackMasterChanges( $fname = __METHOD__ ) {
-               $failedServers = [];
-
-               $masterIndex = $this->getWriterIndex();
-               foreach ( $this->mConns as $conns2 ) {
-                       if ( empty( $conns2[$masterIndex] ) ) {
-                               continue;
-                       }
-                       /** @var DatabaseBase $conn */
-                       foreach ( $conns2[$masterIndex] as $conn ) {
-                               if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
-                                       try {
-                                               $conn->rollback( $fname, IDatabase::FLUSHING_ALL_PEERS );
-                                       } catch ( DBError $e ) {
-                                               MWExceptionHandler::logException( $e );
-                                               $failedServers[] = $conn->getServer();
-                                       }
+               $restore = ( $this->trxRoundId !== false );
+               $this->trxRoundId = false;
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $fname, $restore ) {
+                               if ( $conn->writesOrCallbacksPending() ) {
+                                       $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+                               }
+                               if ( $restore ) {
+                                       $this->undoTransactionRoundFlags( $conn );
                                }
                        }
+               );
+       }
+
+       /**
+        * Suppress all pending post-COMMIT/ROLLBACK callbacks
+        * @return Exception|null The first exception or null if there were none
+        * @since 1.28
+        */
+       public function suppressTransactionEndCallbacks() {
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
+                       $conn->setTrxEndCallbackSuppression( true );
+               } );
+       }
+
+       /**
+        * @param DatabaseBase $conn
+        */
+       private function applyTransactionRoundFlags( DatabaseBase $conn ) {
+               if ( $conn->getFlag( DBO_DEFAULT ) ) {
+                       // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
+                       // Force DBO_TRX even in CLI mode since a commit round is expected soon.
+                       $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
+                       // If config has explicitly requested DBO_TRX be either on or off by not
+                       // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
+                       // for things like blob stores (ExternalStore) which want auto-commit mode.
                }
+       }
 
-               if ( $failedServers ) {
-                       throw new DBExpectedError( null, "Rollback failed on server(s) " .
-                               implode( ', ', array_unique( $failedServers ) ) );
+       /**
+        * @param DatabaseBase $conn
+        */
+       private function undoTransactionRoundFlags( DatabaseBase $conn ) {
+               if ( $conn->getFlag( DBO_DEFAULT ) ) {
+                       $conn->restoreFlags( $conn::RESTORE_PRIOR );
                }
        }
 
+       /**
+        * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
+        *
+        * @param string $fname Caller name
+        * @since 1.28
+        */
+       public function flushReplicaSnapshots( $fname = __METHOD__ ) {
+               $this->forEachOpenReplicaConnection( function ( DatabaseBase $conn ) {
+                       $conn->clearSnapshot( __METHOD__ );
+               } );
+       }
+
        /**
         * @return bool Whether a master connection is already open
         * @since 1.24
@@ -1417,6 +1570,26 @@ class LoadBalancer {
                }
        }
 
+       /**
+        * Call a function with each open replica DB connection object
+        * @param callable $callback
+        * @param array $params
+        * @since 1.28
+        */
+       public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
+               foreach ( $this->mConns as $connsByServer ) {
+                       foreach ( $connsByServer as $i => $serverConns ) {
+                               if ( $i === $this->getWriterIndex() ) {
+                                       continue; // skip master
+                               }
+                               foreach ( $serverConns as $conn ) {
+                                       $mergedParams = array_merge( [ $conn ], $params );
+                                       call_user_func_array( $callback, $mergedParams );
+                               }
+                       }
+               }
+       }
+
        /**
         * Get the hostname and lag time of the most-lagged slave
         *
@@ -1532,4 +1705,25 @@ class LoadBalancer {
        public function clearLagTimeCache() {
                $this->getLoadMonitor()->clearCaches();
        }
+
+       /**
+        * Set a callback via DatabaseBase::setTransactionListener() on
+        * all current and future master connections of this load balancer
+        *
+        * @param string $name Callback name
+        * @param callable|null $callback
+        * @since 1.28
+        */
+       public function setTransactionListener( $name, callable $callback = null ) {
+               if ( $callback ) {
+                       $this->trxRecurringCallbacks[$name] = $callback;
+               } else {
+                       unset( $this->trxRecurringCallbacks[$name] );
+               }
+               $this->forEachOpenMasterConnection(
+                       function ( DatabaseBase $conn ) use ( $name, $callback ) {
+                               $conn->setTransactionListener( $name, $callback );
+                       }
+               );
+       }
 }