From 5bfa77f8d918fc70c7acec52287b548ca043716b Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 28 Mar 2018 13:01:32 -0700 Subject: [PATCH] rdbms: enforce and improve LBFactory/LoadBalancer callback handling * Handle the case where an onTransaction* callback for one handle adds more onTransaction* callbacks to a different handle. Instead of supporting only a short chain of such callbacks, try to resolve the whole chain by using a loop in LoadBalancer and LBFactory. * Add sanity checks to enforce the proper call order of LoadBalancer transaction methods, such as those that execute callbacks. This is the order that LBFactory already uses. Use ROUND_ERROR for problems that can ruin the instance state. Such problems require rollback. * Correct setTrxEndCallbackSuppression() calls in beginMasterChanges() that were making tests fail. * Make Database handle callback suppression for FLUSHING_ALL_PEERS instead of making LoadBalancer/LBFactory have to manage it. * Simplify finalizeMasterChanges() given that suppression does not actually effect runOnTransactionPreCommitCallbacks(). * Make dangling callback warning in Database::close work properly. * Actually use $fname in flushReplicaSnapshots(). * Use DBTransactionError instead of DBExpectedError in some places where stages fail. * Fix failing testGetScopedLock() unit tests so everything passes. Add more comments to setTransactionListener and onTransactionIdle. Change-Id: I6a25a6e4e5ba666e0da065a24846cbab7e786c7b --- includes/libs/rdbms/database/Database.php | 51 ++-- includes/libs/rdbms/database/IDatabase.php | 10 +- includes/libs/rdbms/lbfactory/LBFactory.php | 59 +++- .../libs/rdbms/loadbalancer/ILoadBalancer.php | 24 +- .../libs/rdbms/loadbalancer/LoadBalancer.php | 281 +++++++++++------- .../phpunit/includes/db/LoadBalancerTest.php | 87 ++++++ .../libs/rdbms/database/DatabaseTest.php | 66 ++-- 7 files changed, 404 insertions(+), 174 deletions(-) diff --git a/includes/libs/rdbms/database/Database.php b/includes/libs/rdbms/database/Database.php index cb51113916..f44b7cb0cc 100644 --- a/includes/libs/rdbms/database/Database.php +++ b/includes/libs/rdbms/database/Database.php @@ -722,17 +722,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } /** - * Get the list of method names that have pending write queries or callbacks - * for this transaction + * Get the list of method names that have pending write queries or that + * have transaction callbacks that have yet to run * * @return array */ protected function pendingWriteAndCallbackCallers() { - if ( !$this->trxLevel ) { - return []; - } - - $fnames = $this->trxWriteCallers; + $fnames = $this->pendingWriteCallers(); foreach ( [ $this->trxIdleCallbacks, $this->trxPreCommitCallbacks, @@ -960,12 +956,10 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } // Sanity check that no callbacks are dangling - if ( - $this->trxIdleCallbacks || $this->trxPreCommitCallbacks || $this->trxEndCallbacks - ) { + $fnames = $this->pendingWriteAndCallbackCallers(); + if ( $fnames ) { throw new RuntimeException( - "Transaction callbacks are still pending:\n" . - implode( ', ', $this->pendingWriteAndCallbackCallers() ) + "Transaction callbacks are still pending:\n" . implode( ', ', $fnames ) ); } @@ -3414,19 +3408,25 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } /** - * Actually run and consume any "on transaction idle/resolution" callbacks. + * Actually consume and run any "on transaction idle/resolution" callbacks. * * This method should not be used outside of Database/LoadBalancer * * @param int $trigger IDatabase::TRIGGER_* constant + * @return int Number of callbacks attempted * @since 1.20 * @throws Exception */ public function runOnTransactionIdleCallbacks( $trigger ) { + if ( $this->trxLevel ) { // sanity + throw new DBUnexpectedError( $this, __METHOD__ . ': a transaction is still open.' ); + } + if ( $this->trxEndCallbacksSuppressed ) { - return; + return 0; } + $count = 0; $autoTrx = $this->getFlag( self::DBO_TRX ); // automatic begin() enabled? /** @var Exception $e */ $e = null; // first exception @@ -3439,6 +3439,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware $this->trxEndCallbacks = []; // consumed (recursion guard) foreach ( $callbacks as $callback ) { try { + ++$count; list( $phpCallback ) = $callback; $this->clearFlag( self::DBO_TRX ); // make each query its own transaction call_user_func( $phpCallback, $trigger, $this ); @@ -3462,23 +3463,29 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware if ( $e instanceof Exception ) { throw $e; // re-throw any first exception } + + return $count; } /** - * Actually run and consume any "on transaction pre-commit" callbacks. + * Actually consume and run any "on transaction pre-commit" callbacks. * * This method should not be used outside of Database/LoadBalancer * * @since 1.22 + * @return int Number of callbacks attempted * @throws Exception */ public function runOnTransactionPreCommitCallbacks() { + $count = 0; + $e = null; // first exception do { // callbacks may add callbacks :) $callbacks = $this->trxPreCommitCallbacks; $this->trxPreCommitCallbacks = []; // consumed (and recursion guard) foreach ( $callbacks as $callback ) { try { + ++$count; list( $phpCallback ) = $callback; call_user_func( $phpCallback, $this ); } catch ( Exception $ex ) { @@ -3491,6 +3498,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware if ( $e instanceof Exception ) { throw $e; // re-throw any first exception } + + return $count; } /** @@ -3591,7 +3600,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware $savepointId = $cancelable === self::ATOMIC_CANCELABLE ? self::$NOT_APPLICABLE : null; if ( !$this->trxLevel ) { - $this->begin( $fname, self::TRANSACTION_INTERNAL ); + $this->begin( $fname, self::TRANSACTION_INTERNAL ); // sets trxAutomatic // If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result // in all changes being in one transaction to keep requests transactional. if ( $this->getFlag( self::DBO_TRX ) ) { @@ -3845,8 +3854,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware ); } - $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT ); - $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT ); + // With FLUSHING_ALL_PEERS, callbacks will be explicitly run later + if ( $flush !== self::FLUSHING_ALL_PEERS ) { + $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT ); + $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT ); + } } /** @@ -3895,7 +3907,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware $this->trxIdleCallbacks = []; $this->trxPreCommitCallbacks = []; - if ( $trxActive ) { + // With FLUSHING_ALL_PEERS, callbacks will be explicitly run later + if ( $trxActive && $flush !== self::FLUSHING_ALL_PEERS ) { try { $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK ); } catch ( Exception $e ) { diff --git a/includes/libs/rdbms/database/IDatabase.php b/includes/libs/rdbms/database/IDatabase.php index 6b3efa2bf0..bfaa95067e 100644 --- a/includes/libs/rdbms/database/IDatabase.php +++ b/includes/libs/rdbms/database/IDatabase.php @@ -58,7 +58,7 @@ interface IDatabase { /** @var string Commit/rollback is from the connection manager for the IDatabase handle */ const FLUSHING_ALL_PEERS = 'flush'; /** @var string Commit/rollback is from the IDatabase handle internally */ - const FLUSHING_INTERNAL = 'flush'; + const FLUSHING_INTERNAL = 'flush-internal'; /** @var string Do not remember the prior flags */ const REMEMBER_NOTHING = ''; @@ -1508,6 +1508,9 @@ interface IDatabase { * It can also be used for updates that easily suffer from lock timeouts and deadlocks, * but where atomicity is not essential. * + * Avoid using IDatabase instances aside from this one in the callback, unless such instances + * never have IDatabase::DBO_TRX set. This keeps callbacks from interfering with one another. + * * Updates will execute in the order they were enqueued. * * @note: do not assume that *other* IDatabase instances will be AUTOCOMMIT mode @@ -1555,7 +1558,10 @@ interface IDatabase { * - This IDatabase object * Callbacks must commit any transactions that they begin. * - * Registering a callback here will not affect writesOrCallbacks() pending + * Registering a callback here will not affect writesOrCallbacks() pending. + * + * Since callbacks from this method or onTransactionIdle() can start and end transactions, + * a single call to IDatabase::commit might trigger multiple runs of the listener callbacks. * * @param string $name Callback name * @param callable|null $callback Use null to unset a listener diff --git a/includes/libs/rdbms/lbfactory/LBFactory.php b/includes/libs/rdbms/lbfactory/LBFactory.php index c272147639..ca684c329a 100644 --- a/includes/libs/rdbms/lbfactory/LBFactory.php +++ b/includes/libs/rdbms/lbfactory/LBFactory.php @@ -88,6 +88,14 @@ abstract class LBFactory implements ILBFactory { /** @var string Agent name for query profiling */ protected $agent; + /** @var string One of the ROUND_* class constants */ + private $trxRoundStage = self::ROUND_CURSORY; + + const ROUND_CURSORY = 'cursory'; + const ROUND_BEGINNING = 'within-begin'; + const ROUND_COMMITTING = 'within-commit'; + const ROUND_ROLLING_BACK = 'within-rollback'; + private static $loggerFields = [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ]; @@ -206,12 +214,14 @@ abstract class LBFactory implements ILBFactory { $this->forEachLBCallMethod( 'flushReplicaSnapshots', [ $fname ] ); } - public function commitAll( $fname = __METHOD__, array $options = [] ) { + final public function commitAll( $fname = __METHOD__, array $options = [] ) { $this->commitMasterChanges( $fname, $options ); $this->forEachLBCallMethod( 'commitAll', [ $fname ] ); } - public function beginMasterChanges( $fname = __METHOD__ ) { + final public function beginMasterChanges( $fname = __METHOD__ ) { + $this->assertTransactionRoundStage( self::ROUND_CURSORY ); + $this->trxRoundStage = self::ROUND_BEGINNING; if ( $this->trxRoundId !== false ) { throw new DBTransactionError( null, @@ -221,9 +231,12 @@ abstract class LBFactory implements ILBFactory { $this->trxRoundId = $fname; // Set DBO_TRX flags on all appropriate DBs $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] ); + $this->trxRoundStage = self::ROUND_CURSORY; } - public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) { + final public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) { + $this->assertTransactionRoundStage( self::ROUND_CURSORY ); + $this->trxRoundStage = self::ROUND_COMMITTING; if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) { throw new DBTransactionError( null, @@ -241,29 +254,33 @@ abstract class LBFactory implements ILBFactory { $this->logIfMultiDbTransaction(); // Actually perform the commit on all master DB connections and revert DBO_TRX $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] ); - // Run all post-commit callbacks - /** @var Exception $e */ + // Run all post-commit callbacks until new ones stop getting added $e = null; // first callback exception + do { + $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) { + $ex = $lb->runMasterTransactionIdleCallbacks(); + $e = $e ?: $ex; + } ); + } while ( $this->hasMasterChanges() ); + // Run all listener callbacks once $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) { - $ex = $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_COMMIT ); + $ex = $lb->runMasterTransactionListenerCallbacks(); $e = $e ?: $ex; } ); - // Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB - $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] ); + $this->trxRoundStage = self::ROUND_CURSORY; // Throw any last post-commit callback error if ( $e instanceof Exception ) { throw $e; } } - public function rollbackMasterChanges( $fname = __METHOD__ ) { + final public function rollbackMasterChanges( $fname = __METHOD__ ) { + $this->trxRoundStage = self::ROUND_ROLLING_BACK; $this->trxRoundId = false; - $this->forEachLBCallMethod( 'suppressTransactionEndCallbacks' ); $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] ); - // Run all post-rollback callbacks - $this->forEachLB( function ( ILoadBalancer $lb ) { - $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_ROLLBACK ); - } ); + $this->forEachLBCallMethod( 'runMasterTransactionIdleCallbacks' ); + $this->forEachLBCallMethod( 'runMasterTransactionListenerCallbacks' ); + $this->trxRoundStage = self::ROUND_CURSORY; } public function hasTransactionRound() { @@ -408,7 +425,7 @@ abstract class LBFactory implements ILBFactory { return $this->ticket; } - public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) { + final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) { if ( $ticket !== $this->ticket ) { $this->perfLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" . ( new RuntimeException() )->getTraceAsString() ); @@ -597,6 +614,18 @@ abstract class LBFactory implements ILBFactory { $this->requestInfo = $info + $this->requestInfo; } + /** + * @param string $stage + */ + private function assertTransactionRoundStage( $stage ) { + if ( $this->trxRoundStage !== $stage ) { + throw new DBTransactionError( + null, + "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')" + ); + } + } + /** * Make PHP ignore user aborts/disconnects until the returned * value leaves scope. This returns null and does nothing in CLI mode. diff --git a/includes/libs/rdbms/loadbalancer/ILoadBalancer.php b/includes/libs/rdbms/loadbalancer/ILoadBalancer.php index fec496e455..dd257e5ffa 100644 --- a/includes/libs/rdbms/loadbalancer/ILoadBalancer.php +++ b/includes/libs/rdbms/loadbalancer/ILoadBalancer.php @@ -377,8 +377,7 @@ interface ILoadBalancer { public function commitAll( $fname = __METHOD__ ); /** - * Perform all pre-commit callbacks that remain part of the atomic transactions - * and disable any post-commit callbacks until runMasterPostTrxCallbacks() + * Run pre-commit callbacks and defer execution of post-commit callbacks * * Use this only for mutli-database commits */ @@ -417,14 +416,18 @@ interface ILoadBalancer { public function commitMasterChanges( $fname = __METHOD__ ); /** - * Issue all pending post-COMMIT/ROLLBACK callbacks + * Consume and run all pending post-COMMIT/ROLLBACK callbacks * - * Use this only for mutli-database commits + * @return Exception|null The first exception or null if there were none + */ + public function runMasterTransactionIdleCallbacks(); + + /** + * Run all recurring post-COMMIT/ROLLBACK listener callbacks * - * @param int $type IDatabase::TRIGGER_* constant * @return Exception|null The first exception or null if there were none */ - public function runMasterPostTrxCallbacks( $type ); + public function runMasterTransactionListenerCallbacks(); /** * Issue ROLLBACK only on master, only if queries were done on connection @@ -433,15 +436,6 @@ interface ILoadBalancer { */ public function rollbackMasterChanges( $fname = __METHOD__ ); - /** - * Suppress all pending post-COMMIT/ROLLBACK callbacks - * - * Use this only for mutli-database commits - * - * @return Exception|null The first exception or null if there were none - */ - public function suppressTransactionEndCallbacks(); - /** * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot * diff --git a/includes/libs/rdbms/loadbalancer/LoadBalancer.php b/includes/libs/rdbms/loadbalancer/LoadBalancer.php index 8de6064366..e70d49e979 100644 --- a/includes/libs/rdbms/loadbalancer/LoadBalancer.php +++ b/includes/libs/rdbms/loadbalancer/LoadBalancer.php @@ -120,6 +120,8 @@ class LoadBalancer implements ILoadBalancer { private $connectionAttempted = false; /** @var int */ private $maxLag = self::MAX_LAG_DEFAULT; + /** @var string Stage of the current transaction round in the transaction round life-cycle */ + private $trxRoundStage = self::ROUND_CURSORY; /** @var int Warn when this many connection are held */ const CONN_HELD_WARN_THRESHOLD = 10; @@ -139,6 +141,19 @@ class LoadBalancer implements ILoadBalancer { const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit'; const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit'; + /** @var string Transaction round, explicit or implicit, has not finished writing */ + const ROUND_CURSORY = 'cursory'; + /** @var string Transaction round writes are complete and ready for pre-commit checks */ + const ROUND_FINALIZED = 'finalized'; + /** @var string Transaction round passed final pre-commit checks */ + const ROUND_APPROVED = 'approved'; + /** @var string Transaction round was committed and post-commit callbacks must be run */ + const ROUND_COMMIT_CALLBACKS = 'commit-callbacks'; + /** @var string Transaction round was rolled back and post-rollback callbacks must be run */ + const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks'; + /** @var string Transaction round encountered an error */ + const ROUND_ERROR = 'error'; + public function __construct( array $params ) { if ( !isset( $params['servers'] ) ) { throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' ); @@ -1242,44 +1257,37 @@ class LoadBalancer implements ILoadBalancer { } public function commitAll( $fname = __METHOD__ ) { - $failures = []; - - $restore = ( $this->trxRoundId !== false ); - $this->trxRoundId = false; - $this->forEachOpenConnection( - function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) { - try { - $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS ); - } catch ( DBError $e ) { - call_user_func( $this->errorLogger, $e ); - $failures[] = "{$conn->getServer()}: {$e->getMessage()}"; - } - if ( $restore && $conn->getLBInfo( 'master' ) ) { - $this->undoTransactionRoundFlags( $conn ); - } - } - ); - - if ( $failures ) { - throw new DBExpectedError( - null, - "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) ) - ); - } + $this->commitMasterChanges( $fname ); + $this->flushMasterSnapshots( $fname ); + $this->flushReplicaSnapshots( $fname ); } public function finalizeMasterChanges() { + $this->assertTransactionRoundStage( self::ROUND_CURSORY ); + + $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise + // Loop until callbacks stop adding callbacks on other connections + do { + $count = 0; // callbacks execution attempts + $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) { + // Run any pre-commit callbacks while leaving the post-commit ones suppressed. + // Any error should cause all (peer) transactions to be rolled back together. + $count += $conn->runOnTransactionPreCommitCallbacks(); + } ); + } while ( $count > 0 ); + // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles $this->forEachOpenMasterConnection( function ( Database $conn ) { - // Any error should cause all DB transactions to be rolled back together - $conn->setTrxEndCallbackSuppression( false ); - $conn->runOnTransactionPreCommitCallbacks(); - // Defer post-commit callbacks until COMMIT finishes for all DBs $conn->setTrxEndCallbackSuppression( true ); } ); + $this->trxRoundStage = self::ROUND_FINALIZED; } public function approveMasterChanges( array $options ) { + $this->assertTransactionRoundStage( self::ROUND_FINALIZED ); + $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0; + + $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) { // If atomic sections or explicit transactions are still open, some caller must have // caught an exception but failed to properly rollback any changes. Detect that and @@ -1309,6 +1317,7 @@ class LoadBalancer implements ILoadBalancer { ); } } ); + $this->trxRoundStage = self::ROUND_APPROVED; } public function beginMasterChanges( $fname = __METHOD__ ) { @@ -1318,32 +1327,26 @@ class LoadBalancer implements ILoadBalancer { "$fname: Transaction round '{$this->trxRoundId}' already started." ); } - $this->trxRoundId = $fname; + $this->assertTransactionRoundStage( self::ROUND_CURSORY ); - $failures = []; - $this->forEachOpenMasterConnection( - function ( Database $conn ) use ( $fname, &$failures ) { - $conn->setTrxEndCallbackSuppression( true ); - try { - $conn->flushSnapshot( $fname ); - } catch ( DBError $e ) { - call_user_func( $this->errorLogger, $e ); - $failures[] = "{$conn->getServer()}: {$e->getMessage()}"; - } - $conn->setTrxEndCallbackSuppression( false ); - $this->applyTransactionRoundFlags( $conn ); - } - ); + // Clear any empty transactions (no writes/callbacks) from the implicit round + $this->flushMasterSnapshots( $fname ); - if ( $failures ) { - throw new DBExpectedError( - null, - "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) ) - ); - } + $this->trxRoundId = $fname; + $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise + // Mark applicable handles as participating in this explicit transaction round. + // For each of these handles, any writes and callbacks will be tied to a single + // transaction. The (peer) handles will reject begin()/commit() calls unless they + // are part of an en masse commit or an en masse rollback. + $this->forEachOpenMasterConnection( function ( Database $conn ) { + $this->applyTransactionRoundFlags( $conn ); + } ); + $this->trxRoundStage = self::ROUND_CURSORY; } public function commitMasterChanges( $fname = __METHOD__ ) { + $this->assertTransactionRoundStage( self::ROUND_APPROVED ); + $failures = []; /** @noinspection PhpUnusedLocalVariableInspection */ @@ -1351,62 +1354,117 @@ class LoadBalancer implements ILoadBalancer { $restore = ( $this->trxRoundId !== false ); $this->trxRoundId = false; + $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise + // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT). + // Note that callbacks should already be suppressed due to finalizeMasterChanges(). $this->forEachOpenMasterConnection( - function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) { + function ( IDatabase $conn ) use ( $fname, &$failures ) { try { - if ( $conn->writesOrCallbacksPending() ) { - $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS ); - } elseif ( $restore ) { - $conn->flushSnapshot( $fname ); - } + $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS ); } catch ( DBError $e ) { call_user_func( $this->errorLogger, $e ); $failures[] = "{$conn->getServer()}: {$e->getMessage()}"; } - if ( $restore ) { - $this->undoTransactionRoundFlags( $conn ); - } } ); - if ( $failures ) { - throw new DBExpectedError( + throw new DBTransactionError( null, "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) ) ); } + if ( $restore ) { + // Unmark handles as participating in this explicit transaction round + $this->forEachOpenMasterConnection( function ( Database $conn ) { + $this->undoTransactionRoundFlags( $conn ); + } ); + } + $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS; } - public function runMasterPostTrxCallbacks( $type ) { + public function runMasterTransactionIdleCallbacks() { + if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { + $type = IDatabase::TRIGGER_COMMIT; + } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { + $type = IDatabase::TRIGGER_ROLLBACK; + } else { + throw new DBTransactionError( + null, + "Transaction should be in the callback stage (not '{$this->trxRoundStage}')" + ); + } + + $oldStage = $this->trxRoundStage; + $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise + + // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs + $this->forEachOpenMasterConnection( function ( Database $conn ) { + $conn->setTrxEndCallbackSuppression( false ); + } ); + $e = null; // first exception + // Loop until callbacks stop adding callbacks on other connections + do { + // Run any pending callbacks for each connection... + $count = 0; // callback execution attempts + $this->forEachOpenMasterConnection( + function ( Database $conn ) use ( $type, &$e, &$count ) { + if ( $conn->trxLevel() ) { + return; // retry in the next iteration, after commit() is called + } + try { + $count += $conn->runOnTransactionIdleCallbacks( $type ); + } catch ( Exception $ex ) { + $e = $e ?: $ex; + } + } + ); + // Clear out any active transactions left over from callbacks... + $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e ) { + if ( $conn->writesPending() ) { + // A callback from another handle wrote to this one and DBO_TRX is set + $this->queryLogger->warning( __METHOD__ . ": found writes pending." ); + } elseif ( $conn->trxLevel() ) { + // A callback from another handle read from this one and DBO_TRX is set, + // which can easily happen if there is only one DB (no replicas) + $this->queryLogger->debug( __METHOD__ . ": found empty transaction." ); + } + try { + $conn->commit( __METHOD__, $conn::FLUSHING_ALL_PEERS ); + } catch ( Exception $ex ) { + $e = $e ?: $ex; + } + } ); + } while ( $count > 0 ); + + $this->trxRoundStage = $oldStage; + + return $e; + } + + public function runMasterTransactionListenerCallbacks() { + if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { + $type = IDatabase::TRIGGER_COMMIT; + } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { + $type = IDatabase::TRIGGER_ROLLBACK; + } else { + throw new DBTransactionError( + null, + "Transaction should be in the callback stage (not '{$this->trxRoundStage}')" + ); + } + + $e = null; + + $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) { - $conn->setTrxEndCallbackSuppression( false ); - // Callbacks run in AUTO-COMMIT mode, so make sure no transactions are pending... - if ( $conn->writesPending() ) { - // This happens if onTransactionIdle() callbacks write to *other* handles - // (which already finished their callbacks). Let any callbacks run in the final - // commitMasterChanges() in LBFactory::shutdown(), when the transaction is gone. - $this->queryLogger->warning( __METHOD__ . ": found writes pending." ); - return; - } elseif ( $conn->trxLevel() ) { - // This happens for single-DB setups where DB_REPLICA uses the master DB, - // thus leaving an implicit read-only transaction open at this point. It - // also happens if onTransactionIdle() callbacks leave implicit transactions - // open on *other* DBs (which is slightly improper). Let these COMMIT on the - // next call to commitMasterChanges(), possibly in LBFactory::shutdown(). - return; - } - try { - $conn->runOnTransactionIdleCallbacks( $type ); - } catch ( Exception $ex ) { - $e = $e ?: $ex; - } try { $conn->runTransactionListenerCallbacks( $type ); } catch ( Exception $ex ) { $e = $e ?: $ex; } } ); + $this->trxRoundStage = self::ROUND_CURSORY; return $e; } @@ -1414,20 +1472,29 @@ class LoadBalancer implements ILoadBalancer { public function rollbackMasterChanges( $fname = __METHOD__ ) { $restore = ( $this->trxRoundId !== false ); $this->trxRoundId = false; - $this->forEachOpenMasterConnection( - function ( IDatabase $conn ) use ( $fname, $restore ) { - $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS ); - if ( $restore ) { - $this->undoTransactionRoundFlags( $conn ); - } - } - ); + $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise + $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) { + $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS ); + } ); + if ( $restore ) { + // Unmark handles as participating in this explicit transaction round + $this->forEachOpenMasterConnection( function ( Database $conn ) { + $this->undoTransactionRoundFlags( $conn ); + } ); + } + $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS; } - public function suppressTransactionEndCallbacks() { - $this->forEachOpenMasterConnection( function ( Database $conn ) { - $conn->setTrxEndCallbackSuppression( true ); - } ); + /** + * @param string $stage + */ + private function assertTransactionRoundStage( $stage ) { + if ( $this->trxRoundStage !== $stage ) { + throw new DBTransactionError( + null, + "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')" + ); + } } /** @@ -1437,9 +1504,9 @@ class LoadBalancer implements ILoadBalancer { * transaction rounds and remain in auto-commit mode. Such behavior might be desired * when a DB server is used for something like simple key/value storage. * - * @param IDatabase $conn + * @param Database $conn */ - private function applyTransactionRoundFlags( IDatabase $conn ) { + private function applyTransactionRoundFlags( Database $conn ) { if ( $conn->getLBInfo( 'autoCommitOnly' ) ) { return; // transaction rounds do not apply to these connections } @@ -1456,9 +1523,9 @@ class LoadBalancer implements ILoadBalancer { } /** - * @param IDatabase $conn + * @param Database $conn */ - private function undoTransactionRoundFlags( IDatabase $conn ) { + private function undoTransactionRoundFlags( Database $conn ) { if ( $conn->getLBInfo( 'autoCommitOnly' ) ) { return; // transaction rounds do not apply to these connections } @@ -1473,11 +1540,25 @@ class LoadBalancer implements ILoadBalancer { } public function flushReplicaSnapshots( $fname = __METHOD__ ) { - $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) { - $conn->flushSnapshot( __METHOD__ ); + $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) { + $conn->flushSnapshot( $fname ); } ); } + private function flushMasterSnapshots( $fname = __METHOD__ ) { + $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) { + $conn->flushSnapshot( $fname ); + } ); + } + + /** + * @return string + * @since 1.32 + */ + public function getTransactionRoundStage() { + return $this->trxRoundStage; + } + public function hasMasterConnection() { return $this->isOpen( $this->getWriterIndex() ); } diff --git a/tests/phpunit/includes/db/LoadBalancerTest.php b/tests/phpunit/includes/db/LoadBalancerTest.php index e054569d03..7462f1d530 100644 --- a/tests/phpunit/includes/db/LoadBalancerTest.php +++ b/tests/phpunit/includes/db/LoadBalancerTest.php @@ -302,4 +302,91 @@ class LoadBalancerTest extends MediaWikiTestCase { $lb->closeAll(); } + + public function testTransactionCallbackChains() { + global $wgDBserver, $wgDBname, $wgDBuser, $wgDBpassword, $wgDBtype, $wgSQLiteDataDir; + + $servers = [ + [ + 'host' => $wgDBserver, + 'dbname' => $wgDBname, + 'tablePrefix' => $this->dbPrefix(), + 'user' => $wgDBuser, + 'password' => $wgDBpassword, + 'type' => $wgDBtype, + 'dbDirectory' => $wgSQLiteDataDir, + 'load' => 0, + 'flags' => DBO_TRX // REPEATABLE-READ for consistency + ], + ]; + + $lb = new LoadBalancer( [ + 'servers' => $servers, + 'localDomain' => new DatabaseDomain( $wgDBname, null, $this->dbPrefix() ) + ] ); + + $conn1 = $lb->openConnection( $lb->getWriterIndex(), false ); + $conn2 = $lb->openConnection( $lb->getWriterIndex(), '' ); + + $count = 0; + $lb->forEachOpenMasterConnection( function () use ( &$count ) { + ++$count; + } ); + $this->assertEquals( 2, $count, 'Connection handle count' ); + + $tlCalls = 0; + $lb->setTransactionListener( 'test-listener', function () use ( &$tlCalls ) { + ++$tlCalls; + } ); + + $lb->beginMasterChanges( __METHOD__ ); + $bc = array_fill_keys( [ 'a', 'b', 'c', 'd' ], 0 ); + $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) { + $bc['a'] = 1; + $conn2->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) { + $bc['b'] = 1; + $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) { + $bc['c'] = 1; + $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) { + $bc['d'] = 1; + } ); + } ); + } ); + } ); + $lb->finalizeMasterChanges(); + $lb->approveMasterChanges( [] ); + $lb->commitMasterChanges( __METHOD__ ); + $lb->runMasterTransactionIdleCallbacks(); + $lb->runMasterTransactionListenerCallbacks(); + + $this->assertEquals( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $bc ); + $this->assertEquals( 2, $tlCalls ); + + $tlCalls = 0; + $lb->beginMasterChanges( __METHOD__ ); + $ac = array_fill_keys( [ 'a', 'b', 'c', 'd' ], 0 ); + $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) { + $ac['a'] = 1; + $conn2->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) { + $ac['b'] = 1; + $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) { + $ac['c'] = 1; + $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) { + $ac['d'] = 1; + } ); + } ); + } ); + } ); + $lb->finalizeMasterChanges(); + $lb->approveMasterChanges( [] ); + $lb->commitMasterChanges( __METHOD__ ); + $lb->runMasterTransactionIdleCallbacks(); + $lb->runMasterTransactionListenerCallbacks(); + + $this->assertEquals( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $ac ); + $this->assertEquals( 2, $tlCalls ); + + $conn1->close(); + $conn2->close(); + } } diff --git a/tests/phpunit/includes/libs/rdbms/database/DatabaseTest.php b/tests/phpunit/includes/libs/rdbms/database/DatabaseTest.php index 3335a2b911..f08b3767dd 100644 --- a/tests/phpunit/includes/libs/rdbms/database/DatabaseTest.php +++ b/tests/phpunit/includes/libs/rdbms/database/DatabaseTest.php @@ -9,6 +9,7 @@ use Wikimedia\TestingAccessWrapper; use Wikimedia\Rdbms\DatabaseSqlite; use Wikimedia\Rdbms\DatabasePostgres; use Wikimedia\Rdbms\DatabaseMssql; +use Wikimedia\Rdbms\DBUnexpectedError; class DatabaseTest extends PHPUnit\Framework\TestCase { @@ -367,7 +368,7 @@ class DatabaseTest extends PHPUnit\Framework\TestCase { $called = true; $db->setFlag( DBO_TRX ); } ); - $db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS ); + $db->rollback( __METHOD__ ); $this->assertFalse( $db->getFlag( DBO_TRX ), 'DBO_TRX restored to default' ); $this->assertTrue( $called, 'Callback reached' ); } @@ -489,37 +490,56 @@ class DatabaseTest extends PHPUnit\Framework\TestCase { $this->assertEquals( true, $db->lockIsFree( 'x', __METHOD__ ) ); $db->clearFlag( DBO_TRX ); + // Pending writes with DBO_TRX $this->assertEquals( 0, $db->trxLevel() ); - + $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) ); $db->setFlag( DBO_TRX ); + $db->query( "DELETE FROM test WHERE t = 1" ); // trigger DBO_TRX transaction before lock try { - $this->badLockingMethodImplicit( $db ); - } catch ( RunTimeException $e ) { - $this->assertTrue( $db->trxLevel() > 0, "Transaction not committed." ); + $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 ); + $this->fail( "Exception not reached" ); + } catch ( DBUnexpectedError $e ) { + $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." ); + $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ), 'Lock not acquired' ); } - $db->clearFlag( DBO_TRX ); $db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS ); - $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) ); - + // Pending writes without DBO_TRX + $db->clearFlag( DBO_TRX ); + $this->assertEquals( 0, $db->trxLevel() ); + $this->assertTrue( $db->lockIsFree( 'meow2', __METHOD__ ) ); + $db->begin( __METHOD__ ); + $db->query( "DELETE FROM test WHERE t = 1" ); // trigger DBO_TRX transaction before lock try { - $this->badLockingMethodExplicit( $db ); - } catch ( RunTimeException $e ) { - $this->assertTrue( $db->trxLevel() > 0, "Transaction not committed." ); + $lock = $db->getScopedLockAndFlush( 'meow2', __METHOD__, 1 ); + $this->fail( "Exception not reached" ); + } catch ( DBUnexpectedError $e ) { + $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." ); + $this->assertTrue( $db->lockIsFree( 'meow2', __METHOD__ ), 'Lock not acquired' ); } + $db->rollback( __METHOD__ ); + // No pending writes, with DBO_TRX + $db->setFlag( DBO_TRX ); + $this->assertEquals( 0, $db->trxLevel() ); + $this->assertTrue( $db->lockIsFree( 'wuff', __METHOD__ ) ); + $db->query( "SELECT 1", __METHOD__ ); + $this->assertEquals( 1, $db->trxLevel() ); + $lock = $db->getScopedLockAndFlush( 'wuff', __METHOD__, 1 ); + $this->assertEquals( 0, $db->trxLevel() ); + $this->assertFalse( $db->lockIsFree( 'wuff', __METHOD__ ), 'Lock already acquired' ); $db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS ); - $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) ); - } - - private function badLockingMethodImplicit( IDatabase $db ) { - $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 ); - $db->query( "SELECT 1" ); // trigger DBO_TRX - throw new RunTimeException( "Uh oh!" ); - } - - private function badLockingMethodExplicit( IDatabase $db ) { - $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 ); + // No pending writes, without DBO_TRX + $db->clearFlag( DBO_TRX ); + $this->assertEquals( 0, $db->trxLevel() ); + $this->assertTrue( $db->lockIsFree( 'wuff2', __METHOD__ ) ); $db->begin( __METHOD__ ); - throw new RunTimeException( "Uh oh!" ); + try { + $lock = $db->getScopedLockAndFlush( 'wuff2', __METHOD__, 1 ); + $this->fail( "Exception not reached" ); + } catch ( DBUnexpectedError $e ) { + $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." ); + $this->assertFalse( $db->lockIsFree( 'wuff2', __METHOD__ ), 'Lock not acquired' ); + } + $db->rollback( __METHOD__ ); } /** -- 2.20.1