Make onTransactionIdle() safer for multi-DB commits
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 23 Jul 2016 05:37:13 +0000 (22:37 -0700)
committerKrinkle <krinklemail@gmail.com>
Tue, 26 Jul 2016 21:58:19 +0000 (21:58 +0000)
* For DBs managed by LBFactory, they will trigger after the COMMIT
  step finishes for all DBs.
* Make commitAll() respect the same logic as commitMasterChanges().
* Add LoadBalancer::forEachOpenMasterConnection() convenience method.
* Rename logMultiDbTransaction() to less confusing logIfMultiDbTransaction().
* Various other small code cleanups.

Change-Id: I6c9b40ba8b9e7600cce774f26b9c401e60fa8803

includes/db/Database.php
includes/db/loadbalancer/LBFactory.php
includes/db/loadbalancer/LoadBalancer.php

index ad9a7e1..7d04cac 100644 (file)
@@ -58,6 +58,8 @@ abstract class DatabaseBase implements IDatabase {
        protected $mTrxPreCommitCallbacks = [];
        /** @var array[] List of (callable, method name) */
        protected $mTrxEndCallbacks = [];
+       /** @var bool Whether to suppress triggering of post-commit callbacks */
+       protected $suppressPostCommitCallbacks = false;
 
        protected $mTablePrefix;
        protected $mSchema;
@@ -2480,12 +2482,30 @@ abstract class DatabaseBase implements IDatabase {
        }
 
        /**
-        * Actually run and consume any "on transaction idle" callbacks.
+        * Whether to disable running of post-commit callbacks
+        *
+        * This method should not be used outside of Database/LoadBalancer
+        *
+        * @param bool $suppress
+        * @since 1.28
+        */
+       final public function setPostCommitCallbackSupression( $suppress ) {
+               $this->suppressPostCommitCallbacks = $suppress;
+       }
+
+       /**
+        * Actually run and consume any "on transaction idle/resolution" callbacks.
+        *
+        * This method should not be used outside of Database/LoadBalancer
         *
         * @param integer $trigger IDatabase::TRIGGER_* constant
         * @since 1.20
         */
-       protected function runOnTransactionIdleCallbacks( $trigger ) {
+       public function runOnTransactionIdleCallbacks( $trigger ) {
+               if ( $this->suppressPostCommitCallbacks ) {
+                       return;
+               }
+
                $autoTrx = $this->getFlag( DBO_TRX ); // automatic begin() enabled?
 
                $e = $ePrior = null; // last exception
@@ -2494,8 +2514,8 @@ abstract class DatabaseBase implements IDatabase {
                                $this->mTrxIdleCallbacks,
                                $this->mTrxEndCallbacks // include "transaction resolution" callbacks
                        );
-                       $this->mTrxIdleCallbacks = []; // recursion guard
-                       $this->mTrxEndCallbacks = []; // recursion guard
+                       $this->mTrxIdleCallbacks = []; // consumed (and recursion guard)
+                       $this->mTrxEndCallbacks = []; // consumed (recursion guard)
                        foreach ( $callbacks as $callback ) {
                                try {
                                        list( $phpCallback ) = $callback;
@@ -2536,7 +2556,7 @@ abstract class DatabaseBase implements IDatabase {
                $e = $ePrior = null; // last exception
                do { // callbacks may add callbacks :)
                        $callbacks = $this->mTrxPreCommitCallbacks;
-                       $this->mTrxPreCommitCallbacks = []; // recursion guard
+                       $this->mTrxPreCommitCallbacks = []; // consumed (and recursion guard)
                        foreach ( $callbacks as $callback ) {
                                try {
                                        list( $phpCallback ) = $callback;
index 9a1d679..053f9f8 100644 (file)
@@ -204,15 +204,12 @@ abstract class LBFactory implements DestructibleService {
         * 1. To commit changes to the masters.
         * 2. To release the snapshot on all connections, master and slave.
         * @param string $fname Caller name
+        * @param array $options Options map:
+        *   - maxWriteDuration: abort if more than this much time was spent in write queries
         */
-       public function commitAll( $fname = __METHOD__ ) {
-               $this->logMultiDbTransaction();
-
-               $start = microtime( true );
+       public function commitAll( $fname = __METHOD__, array $options = [] ) {
+               $this->commitMasterChanges( $fname, $options );
                $this->forEachLBCallMethod( 'commitAll', [ $fname ] );
-               $timeMs = 1000 * ( microtime( true ) - $start );
-
-               RequestContext::getMain()->getStats()->timing( "db.commit-all", $timeMs );
        }
 
        /**
@@ -222,25 +219,17 @@ abstract class LBFactory implements DestructibleService {
         *   - maxWriteDuration: abort if more than this much time was spent in write queries
         */
        public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
-               $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
-
-               // Run pre-commit callbacks to keep them out of the COMMIT step. If one errors out here
-               // then all DB transactions can be rolled back before anything was committed yet.
-               $this->forEachLBCallMethod( 'runPreCommitCallbacks' );
-
-               $this->logMultiDbTransaction();
-               $this->forEachLB( function ( LoadBalancer $lb ) use ( $limit ) {
-                       $lb->forEachOpenConnection( function ( IDatabase $db ) use ( $limit ) {
-                               $time = $db->pendingWriteQueryDuration();
-                               if ( $limit > 0 && $time > $limit ) {
-                                       throw new DBTransactionError(
-                                               $db,
-                                               wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
-                                       );
-                               }
-                       } );
-               } );
-
+               // Perform all pre-commit callbacks, aborting on failure
+               $this->forEachLBCallMethod( 'runMasterPreCommitCallbacks' );
+               // Perform all pre-commit checks, aborting on failure
+               $this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
+               // Log the DBs and methods involved in multi-DB transactions
+               $this->logIfMultiDbTransaction();
+               // Actually perform the commit on all master DB connections
+               $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
+               // Run all post-commit callbacks
+               $this->forEachLBCallMethod( 'runMasterPostCommitCallbacks' );
+               // Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
                $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
        }
 
@@ -256,7 +245,7 @@ abstract class LBFactory implements DestructibleService {
        /**
         * Log query info if multi DB transactions are going to be committed now
         */
-       private function logMultiDbTransaction() {
+       private function logIfMultiDbTransaction() {
                $callersByDB = [];
                $this->forEachLB( function ( LoadBalancer $lb ) use ( &$callersByDB ) {
                        $masterName = $lb->getServerName( $lb->getWriterIndex() );
index e64100f..a67eac1 100644 (file)
@@ -1047,35 +1047,68 @@ class LoadBalancer {
         * @param string $fname Caller name
         */
        public function commitAll( $fname = __METHOD__ ) {
-               foreach ( $this->mConns as $conns2 ) {
-                       foreach ( $conns2 as $conns3 ) {
-                               /** @var DatabaseBase[] $conns3 */
-                               foreach ( $conns3 as $conn ) {
-                                       if ( $conn->trxLevel() ) {
-                                               $conn->commit( $fname, 'flush' );
-                                       }
-                               }
+               $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
+                       $conn->commit( $fname, 'flush' );
+               } );
+       }
+
+       /**
+        * Perform all pre-commit callbacks that remain part of the atomic transactions
+        * and disable any post-commit callbacks until runMasterPostCommitCallbacks()
+        * @since 1.28
+        */
+       public function runMasterPreCommitCallbacks() {
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
+                       // Any error will cause all DB transactions to be rolled back together.
+                       $conn->runOnTransactionPreCommitCallbacks();
+                       // Defer post-commit callbacks until COMMIT finishes for all DBs.
+                       $conn->setPostCommitCallbackSupression( true );
+               } );
+       }
+
+       /**
+        * Perform all pre-commit checks for things like replication safety
+        * @param array $options Includes:
+        *   - maxWriteDuration : max write query duration time in seconds
+        * @throws DBTransactionError
+        * @since 1.28
+        */
+       public function approveMasterChanges( array $options ) {
+               $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $limit ) {
+                       // Assert that the time to replicate the transaction will be sane.
+                       // If this fails, then all DB transactions will be rollback back together.
+                       $time = $conn->pendingWriteQueryDuration();
+                       if ( $limit > 0 && $time > $limit ) {
+                               throw new DBTransactionError(
+                                       $conn,
+                                       wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
+                               );
                        }
-               }
+               } );
        }
 
        /**
-        * Issue COMMIT only on master, only if queries were done on connection
+        * Issue COMMIT on all master connections where writes where done
         * @param string $fname Caller name
         */
        public function commitMasterChanges( $fname = __METHOD__ ) {
-               $masterIndex = $this->getWriterIndex();
-               foreach ( $this->mConns as $conns2 ) {
-                       if ( empty( $conns2[$masterIndex] ) ) {
-                               continue;
-                       }
-                       /** @var DatabaseBase $conn */
-                       foreach ( $conns2[$masterIndex] as $conn ) {
-                               if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
-                                       $conn->commit( $fname, 'flush' );
-                               }
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
+                       if ( $conn->writesOrCallbacksPending() ) {
+                               $conn->commit( $fname, 'flush' );
                        }
-               }
+               } );
+       }
+
+       /**
+        * Issue all pending post-commit callbacks
+        * @since 1.28
+        */
+       public function runMasterPostCommitCallbacks() {
+               $this->forEachOpenMasterConnection( function ( DatabaseBase $db ) {
+                       $db->setPostCommitCallbackSupression( false );
+                       $db->runOnTransactionIdleCallbacks( IDatabase::TRIGGER_COMMIT );
+               } );
        }
 
        /**
@@ -1111,28 +1144,6 @@ class LoadBalancer {
                }
        }
 
-       /**
-        * Call runOnTransactionPreCommitCallbacks() on all DB handles
-        *
-        * This method should not be used outside of LBFactory/LoadBalancer
-        *
-        * @since 1.28
-        */
-       public function runPreCommitCallbacks() {
-               $masterIndex = $this->getWriterIndex();
-               foreach ( $this->mConns as $conns2 ) {
-                       if ( empty( $conns2[$masterIndex] ) ) {
-                               continue;
-                       }
-                       /** @var DatabaseBase $conn */
-                       foreach ( $conns2[$masterIndex] as $conn ) {
-                               if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
-                                       $conn->runOnTransactionPreCommitCallbacks();
-                               }
-                       }
-               }
-       }
-
        /**
         * @return bool Whether a master connection is already open
         * @since 1.24
@@ -1326,6 +1337,25 @@ class LoadBalancer {
                }
        }
 
+       /**
+        * Call a function with each open connection object to a master
+        * @param callable $callback
+        * @param array $params
+        * @since 1.28
+        */
+       public function forEachOpenMasterConnection( $callback, array $params = [] ) {
+               $masterIndex = $this->getWriterIndex();
+               foreach ( $this->mConns as $connsByServer ) {
+                       if ( isset( $connsByServer[$masterIndex] ) ) {
+                               /** @var DatabaseBase $conn */
+                               foreach ( $connsByServer[$masterIndex] as $conn ) {
+                                       $mergedParams = array_merge( [ $conn ], $params );
+                                       call_user_func_array( $callback, $mergedParams );
+                               }
+                       }
+               }
+       }
+
        /**
         * Get the hostname and lag time of the most-lagged slave
         *