}
/**
- * Get the list of method names that have pending write queries or callbacks
- * for this transaction
+ * Get the list of method names that have pending write queries or that
+ * have transaction callbacks that have yet to run
*
* @return array
*/
protected function pendingWriteAndCallbackCallers() {
- if ( !$this->trxLevel ) {
- return [];
- }
-
- $fnames = $this->trxWriteCallers;
+ $fnames = $this->pendingWriteCallers();
foreach ( [
$this->trxIdleCallbacks,
$this->trxPreCommitCallbacks,
}
// Sanity check that no callbacks are dangling
- if (
- $this->trxIdleCallbacks || $this->trxPreCommitCallbacks || $this->trxEndCallbacks
- ) {
+ $fnames = $this->pendingWriteAndCallbackCallers();
+ if ( $fnames ) {
throw new RuntimeException(
- "Transaction callbacks are still pending:\n" .
- implode( ', ', $this->pendingWriteAndCallbackCallers() )
+ "Transaction callbacks are still pending:\n" . implode( ', ', $fnames )
);
}
}
/**
- * Actually run and consume any "on transaction idle/resolution" callbacks.
+ * Actually consume and run any "on transaction idle/resolution" callbacks.
*
* This method should not be used outside of Database/LoadBalancer
*
* @param int $trigger IDatabase::TRIGGER_* constant
+ * @return int Number of callbacks attempted
* @since 1.20
* @throws Exception
*/
public function runOnTransactionIdleCallbacks( $trigger ) {
+ if ( $this->trxLevel ) { // sanity
+ throw new DBUnexpectedError( $this, __METHOD__ . ': a transaction is still open.' );
+ }
+
if ( $this->trxEndCallbacksSuppressed ) {
- return;
+ return 0;
}
+ $count = 0;
$autoTrx = $this->getFlag( self::DBO_TRX ); // automatic begin() enabled?
/** @var Exception $e */
$e = null; // first exception
$this->trxEndCallbacks = []; // consumed (recursion guard)
foreach ( $callbacks as $callback ) {
try {
+ ++$count;
list( $phpCallback ) = $callback;
$this->clearFlag( self::DBO_TRX ); // make each query its own transaction
call_user_func( $phpCallback, $trigger, $this );
if ( $e instanceof Exception ) {
throw $e; // re-throw any first exception
}
+
+ return $count;
}
/**
- * Actually run and consume any "on transaction pre-commit" callbacks.
+ * Actually consume and run any "on transaction pre-commit" callbacks.
*
* This method should not be used outside of Database/LoadBalancer
*
* @since 1.22
+ * @return int Number of callbacks attempted
* @throws Exception
*/
public function runOnTransactionPreCommitCallbacks() {
+ $count = 0;
+
$e = null; // first exception
do { // callbacks may add callbacks :)
$callbacks = $this->trxPreCommitCallbacks;
$this->trxPreCommitCallbacks = []; // consumed (and recursion guard)
foreach ( $callbacks as $callback ) {
try {
+ ++$count;
list( $phpCallback ) = $callback;
call_user_func( $phpCallback, $this );
} catch ( Exception $ex ) {
if ( $e instanceof Exception ) {
throw $e; // re-throw any first exception
}
+
+ return $count;
}
/**
$savepointId = $cancelable === self::ATOMIC_CANCELABLE ? self::$NOT_APPLICABLE : null;
if ( !$this->trxLevel ) {
- $this->begin( $fname, self::TRANSACTION_INTERNAL );
+ $this->begin( $fname, self::TRANSACTION_INTERNAL ); // sets trxAutomatic
// If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result
// in all changes being in one transaction to keep requests transactional.
if ( $this->getFlag( self::DBO_TRX ) ) {
);
}
- $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
- $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
+ // With FLUSHING_ALL_PEERS, callbacks will be explicitly run later
+ if ( $flush !== self::FLUSHING_ALL_PEERS ) {
+ $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
+ }
}
/**
$this->trxIdleCallbacks = [];
$this->trxPreCommitCallbacks = [];
- if ( $trxActive ) {
+ // With FLUSHING_ALL_PEERS, callbacks will be explicitly run later
+ if ( $trxActive && $flush !== self::FLUSHING_ALL_PEERS ) {
try {
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
} catch ( Exception $e ) {
/** @var string Commit/rollback is from the connection manager for the IDatabase handle */
const FLUSHING_ALL_PEERS = 'flush';
/** @var string Commit/rollback is from the IDatabase handle internally */
- const FLUSHING_INTERNAL = 'flush';
+ const FLUSHING_INTERNAL = 'flush-internal';
/** @var string Do not remember the prior flags */
const REMEMBER_NOTHING = '';
* It can also be used for updates that easily suffer from lock timeouts and deadlocks,
* but where atomicity is not essential.
*
+ * Avoid using IDatabase instances aside from this one in the callback, unless such instances
+ * never have IDatabase::DBO_TRX set. This keeps callbacks from interfering with one another.
+ *
* Updates will execute in the order they were enqueued.
*
* @note: do not assume that *other* IDatabase instances will be AUTOCOMMIT mode
* - This IDatabase object
* Callbacks must commit any transactions that they begin.
*
- * Registering a callback here will not affect writesOrCallbacks() pending
+ * Registering a callback here will not affect writesOrCallbacks() pending.
+ *
+ * Since callbacks from this method or onTransactionIdle() can start and end transactions,
+ * a single call to IDatabase::commit might trigger multiple runs of the listener callbacks.
*
* @param string $name Callback name
* @param callable|null $callback Use null to unset a listener
/** @var string Agent name for query profiling */
protected $agent;
+ /** @var string One of the ROUND_* class constants */
+ private $trxRoundStage = self::ROUND_CURSORY;
+
+ const ROUND_CURSORY = 'cursory';
+ const ROUND_BEGINNING = 'within-begin';
+ const ROUND_COMMITTING = 'within-commit';
+ const ROUND_ROLLING_BACK = 'within-rollback';
+
private static $loggerFields =
[ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ];
$this->forEachLBCallMethod( 'flushReplicaSnapshots', [ $fname ] );
}
- public function commitAll( $fname = __METHOD__, array $options = [] ) {
+ final public function commitAll( $fname = __METHOD__, array $options = [] ) {
$this->commitMasterChanges( $fname, $options );
$this->forEachLBCallMethod( 'commitAll', [ $fname ] );
}
- public function beginMasterChanges( $fname = __METHOD__ ) {
+ final public function beginMasterChanges( $fname = __METHOD__ ) {
+ $this->assertTransactionRoundStage( self::ROUND_CURSORY );
+ $this->trxRoundStage = self::ROUND_BEGINNING;
if ( $this->trxRoundId !== false ) {
throw new DBTransactionError(
null,
$this->trxRoundId = $fname;
// Set DBO_TRX flags on all appropriate DBs
$this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
+ $this->trxRoundStage = self::ROUND_CURSORY;
}
- public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
+ final public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
+ $this->assertTransactionRoundStage( self::ROUND_CURSORY );
+ $this->trxRoundStage = self::ROUND_COMMITTING;
if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
throw new DBTransactionError(
null,
$this->logIfMultiDbTransaction();
// Actually perform the commit on all master DB connections and revert DBO_TRX
$this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
- // Run all post-commit callbacks
- /** @var Exception $e */
+ // Run all post-commit callbacks until new ones stop getting added
$e = null; // first callback exception
+ do {
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
+ $ex = $lb->runMasterTransactionIdleCallbacks();
+ $e = $e ?: $ex;
+ } );
+ } while ( $this->hasMasterChanges() );
+ // Run all listener callbacks once
$this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_COMMIT );
+ $ex = $lb->runMasterTransactionListenerCallbacks();
$e = $e ?: $ex;
} );
- // Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
- $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
+ $this->trxRoundStage = self::ROUND_CURSORY;
// Throw any last post-commit callback error
if ( $e instanceof Exception ) {
throw $e;
}
}
- public function rollbackMasterChanges( $fname = __METHOD__ ) {
+ final public function rollbackMasterChanges( $fname = __METHOD__ ) {
+ $this->trxRoundStage = self::ROUND_ROLLING_BACK;
$this->trxRoundId = false;
- $this->forEachLBCallMethod( 'suppressTransactionEndCallbacks' );
$this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
- // Run all post-rollback callbacks
- $this->forEachLB( function ( ILoadBalancer $lb ) {
- $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_ROLLBACK );
- } );
+ $this->forEachLBCallMethod( 'runMasterTransactionIdleCallbacks' );
+ $this->forEachLBCallMethod( 'runMasterTransactionListenerCallbacks' );
+ $this->trxRoundStage = self::ROUND_CURSORY;
}
public function hasTransactionRound() {
return $this->ticket;
}
- public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
+ final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
if ( $ticket !== $this->ticket ) {
$this->perfLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" .
( new RuntimeException() )->getTraceAsString() );
$this->requestInfo = $info + $this->requestInfo;
}
+ /**
+ * @param string $stage
+ */
+ private function assertTransactionRoundStage( $stage ) {
+ if ( $this->trxRoundStage !== $stage ) {
+ throw new DBTransactionError(
+ null,
+ "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')"
+ );
+ }
+ }
+
/**
* Make PHP ignore user aborts/disconnects until the returned
* value leaves scope. This returns null and does nothing in CLI mode.
public function commitAll( $fname = __METHOD__ );
/**
- * Perform all pre-commit callbacks that remain part of the atomic transactions
- * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
+ * Run pre-commit callbacks and defer execution of post-commit callbacks
*
* Use this only for mutli-database commits
*/
public function commitMasterChanges( $fname = __METHOD__ );
/**
- * Issue all pending post-COMMIT/ROLLBACK callbacks
+ * Consume and run all pending post-COMMIT/ROLLBACK callbacks
*
- * Use this only for mutli-database commits
+ * @return Exception|null The first exception or null if there were none
+ */
+ public function runMasterTransactionIdleCallbacks();
+
+ /**
+ * Run all recurring post-COMMIT/ROLLBACK listener callbacks
*
- * @param int $type IDatabase::TRIGGER_* constant
* @return Exception|null The first exception or null if there were none
*/
- public function runMasterPostTrxCallbacks( $type );
+ public function runMasterTransactionListenerCallbacks();
/**
* Issue ROLLBACK only on master, only if queries were done on connection
*/
public function rollbackMasterChanges( $fname = __METHOD__ );
- /**
- * Suppress all pending post-COMMIT/ROLLBACK callbacks
- *
- * Use this only for mutli-database commits
- *
- * @return Exception|null The first exception or null if there were none
- */
- public function suppressTransactionEndCallbacks();
-
/**
* Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
*
private $connectionAttempted = false;
/** @var int */
private $maxLag = self::MAX_LAG_DEFAULT;
+ /** @var string Stage of the current transaction round in the transaction round life-cycle */
+ private $trxRoundStage = self::ROUND_CURSORY;
/** @var int Warn when this many connection are held */
const CONN_HELD_WARN_THRESHOLD = 10;
const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
+ /** @var string Transaction round, explicit or implicit, has not finished writing */
+ const ROUND_CURSORY = 'cursory';
+ /** @var string Transaction round writes are complete and ready for pre-commit checks */
+ const ROUND_FINALIZED = 'finalized';
+ /** @var string Transaction round passed final pre-commit checks */
+ const ROUND_APPROVED = 'approved';
+ /** @var string Transaction round was committed and post-commit callbacks must be run */
+ const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
+ /** @var string Transaction round was rolled back and post-rollback callbacks must be run */
+ const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
+ /** @var string Transaction round encountered an error */
+ const ROUND_ERROR = 'error';
+
public function __construct( array $params ) {
if ( !isset( $params['servers'] ) ) {
throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
}
public function commitAll( $fname = __METHOD__ ) {
- $failures = [];
-
- $restore = ( $this->trxRoundId !== false );
- $this->trxRoundId = false;
- $this->forEachOpenConnection(
- function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
- try {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- } catch ( DBError $e ) {
- call_user_func( $this->errorLogger, $e );
- $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
- }
- if ( $restore && $conn->getLBInfo( 'master' ) ) {
- $this->undoTransactionRoundFlags( $conn );
- }
- }
- );
-
- if ( $failures ) {
- throw new DBExpectedError(
- null,
- "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
- );
- }
+ $this->commitMasterChanges( $fname );
+ $this->flushMasterSnapshots( $fname );
+ $this->flushReplicaSnapshots( $fname );
}
public function finalizeMasterChanges() {
+ $this->assertTransactionRoundStage( self::ROUND_CURSORY );
+
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+ // Loop until callbacks stop adding callbacks on other connections
+ do {
+ $count = 0; // callbacks execution attempts
+ $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
+ // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
+ // Any error should cause all (peer) transactions to be rolled back together.
+ $count += $conn->runOnTransactionPreCommitCallbacks();
+ } );
+ } while ( $count > 0 );
+ // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
$this->forEachOpenMasterConnection( function ( Database $conn ) {
- // Any error should cause all DB transactions to be rolled back together
- $conn->setTrxEndCallbackSuppression( false );
- $conn->runOnTransactionPreCommitCallbacks();
- // Defer post-commit callbacks until COMMIT finishes for all DBs
$conn->setTrxEndCallbackSuppression( true );
} );
+ $this->trxRoundStage = self::ROUND_FINALIZED;
}
public function approveMasterChanges( array $options ) {
+ $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
+
$limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
+
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
$this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
// If atomic sections or explicit transactions are still open, some caller must have
// caught an exception but failed to properly rollback any changes. Detect that and
);
}
} );
+ $this->trxRoundStage = self::ROUND_APPROVED;
}
public function beginMasterChanges( $fname = __METHOD__ ) {
"$fname: Transaction round '{$this->trxRoundId}' already started."
);
}
- $this->trxRoundId = $fname;
+ $this->assertTransactionRoundStage( self::ROUND_CURSORY );
- $failures = [];
- $this->forEachOpenMasterConnection(
- function ( Database $conn ) use ( $fname, &$failures ) {
- $conn->setTrxEndCallbackSuppression( true );
- try {
- $conn->flushSnapshot( $fname );
- } catch ( DBError $e ) {
- call_user_func( $this->errorLogger, $e );
- $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
- }
- $conn->setTrxEndCallbackSuppression( false );
- $this->applyTransactionRoundFlags( $conn );
- }
- );
+ // Clear any empty transactions (no writes/callbacks) from the implicit round
+ $this->flushMasterSnapshots( $fname );
- if ( $failures ) {
- throw new DBExpectedError(
- null,
- "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
- );
- }
+ $this->trxRoundId = $fname;
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+ // Mark applicable handles as participating in this explicit transaction round.
+ // For each of these handles, any writes and callbacks will be tied to a single
+ // transaction. The (peer) handles will reject begin()/commit() calls unless they
+ // are part of an en masse commit or an en masse rollback.
+ $this->forEachOpenMasterConnection( function ( Database $conn ) {
+ $this->applyTransactionRoundFlags( $conn );
+ } );
+ $this->trxRoundStage = self::ROUND_CURSORY;
}
public function commitMasterChanges( $fname = __METHOD__ ) {
+ $this->assertTransactionRoundStage( self::ROUND_APPROVED );
+
$failures = [];
/** @noinspection PhpUnusedLocalVariableInspection */
$restore = ( $this->trxRoundId !== false );
$this->trxRoundId = false;
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+ // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
+ // Note that callbacks should already be suppressed due to finalizeMasterChanges().
$this->forEachOpenMasterConnection(
- function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
+ function ( IDatabase $conn ) use ( $fname, &$failures ) {
try {
- if ( $conn->writesOrCallbacksPending() ) {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- } elseif ( $restore ) {
- $conn->flushSnapshot( $fname );
- }
+ $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
} catch ( DBError $e ) {
call_user_func( $this->errorLogger, $e );
$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
}
- if ( $restore ) {
- $this->undoTransactionRoundFlags( $conn );
- }
}
);
-
if ( $failures ) {
- throw new DBExpectedError(
+ throw new DBTransactionError(
null,
"$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
);
}
+ if ( $restore ) {
+ // Unmark handles as participating in this explicit transaction round
+ $this->forEachOpenMasterConnection( function ( Database $conn ) {
+ $this->undoTransactionRoundFlags( $conn );
+ } );
+ }
+ $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
}
- public function runMasterPostTrxCallbacks( $type ) {
+ public function runMasterTransactionIdleCallbacks() {
+ if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+ $type = IDatabase::TRIGGER_COMMIT;
+ } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+ $type = IDatabase::TRIGGER_ROLLBACK;
+ } else {
+ throw new DBTransactionError(
+ null,
+ "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+ );
+ }
+
+ $oldStage = $this->trxRoundStage;
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+
+ // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
+ $this->forEachOpenMasterConnection( function ( Database $conn ) {
+ $conn->setTrxEndCallbackSuppression( false );
+ } );
+
$e = null; // first exception
+ // Loop until callbacks stop adding callbacks on other connections
+ do {
+ // Run any pending callbacks for each connection...
+ $count = 0; // callback execution attempts
+ $this->forEachOpenMasterConnection(
+ function ( Database $conn ) use ( $type, &$e, &$count ) {
+ if ( $conn->trxLevel() ) {
+ return; // retry in the next iteration, after commit() is called
+ }
+ try {
+ $count += $conn->runOnTransactionIdleCallbacks( $type );
+ } catch ( Exception $ex ) {
+ $e = $e ?: $ex;
+ }
+ }
+ );
+ // Clear out any active transactions left over from callbacks...
+ $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e ) {
+ if ( $conn->writesPending() ) {
+ // A callback from another handle wrote to this one and DBO_TRX is set
+ $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
+ } elseif ( $conn->trxLevel() ) {
+ // A callback from another handle read from this one and DBO_TRX is set,
+ // which can easily happen if there is only one DB (no replicas)
+ $this->queryLogger->debug( __METHOD__ . ": found empty transaction." );
+ }
+ try {
+ $conn->commit( __METHOD__, $conn::FLUSHING_ALL_PEERS );
+ } catch ( Exception $ex ) {
+ $e = $e ?: $ex;
+ }
+ } );
+ } while ( $count > 0 );
+
+ $this->trxRoundStage = $oldStage;
+
+ return $e;
+ }
+
+ public function runMasterTransactionListenerCallbacks() {
+ if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+ $type = IDatabase::TRIGGER_COMMIT;
+ } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+ $type = IDatabase::TRIGGER_ROLLBACK;
+ } else {
+ throw new DBTransactionError(
+ null,
+ "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+ );
+ }
+
+ $e = null;
+
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
$this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
- $conn->setTrxEndCallbackSuppression( false );
- // Callbacks run in AUTO-COMMIT mode, so make sure no transactions are pending...
- if ( $conn->writesPending() ) {
- // This happens if onTransactionIdle() callbacks write to *other* handles
- // (which already finished their callbacks). Let any callbacks run in the final
- // commitMasterChanges() in LBFactory::shutdown(), when the transaction is gone.
- $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
- return;
- } elseif ( $conn->trxLevel() ) {
- // This happens for single-DB setups where DB_REPLICA uses the master DB,
- // thus leaving an implicit read-only transaction open at this point. It
- // also happens if onTransactionIdle() callbacks leave implicit transactions
- // open on *other* DBs (which is slightly improper). Let these COMMIT on the
- // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
- return;
- }
- try {
- $conn->runOnTransactionIdleCallbacks( $type );
- } catch ( Exception $ex ) {
- $e = $e ?: $ex;
- }
try {
$conn->runTransactionListenerCallbacks( $type );
} catch ( Exception $ex ) {
$e = $e ?: $ex;
}
} );
+ $this->trxRoundStage = self::ROUND_CURSORY;
return $e;
}
public function rollbackMasterChanges( $fname = __METHOD__ ) {
$restore = ( $this->trxRoundId !== false );
$this->trxRoundId = false;
- $this->forEachOpenMasterConnection(
- function ( IDatabase $conn ) use ( $fname, $restore ) {
- $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
- if ( $restore ) {
- $this->undoTransactionRoundFlags( $conn );
- }
- }
- );
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+ $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+ $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+ } );
+ if ( $restore ) {
+ // Unmark handles as participating in this explicit transaction round
+ $this->forEachOpenMasterConnection( function ( Database $conn ) {
+ $this->undoTransactionRoundFlags( $conn );
+ } );
+ }
+ $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
}
- public function suppressTransactionEndCallbacks() {
- $this->forEachOpenMasterConnection( function ( Database $conn ) {
- $conn->setTrxEndCallbackSuppression( true );
- } );
+ /**
+ * @param string $stage
+ */
+ private function assertTransactionRoundStage( $stage ) {
+ if ( $this->trxRoundStage !== $stage ) {
+ throw new DBTransactionError(
+ null,
+ "Transaction round stage must be '$stage' (not '{$this->trxRoundStage}')"
+ );
+ }
}
/**
* transaction rounds and remain in auto-commit mode. Such behavior might be desired
* when a DB server is used for something like simple key/value storage.
*
- * @param IDatabase $conn
+ * @param Database $conn
*/
- private function applyTransactionRoundFlags( IDatabase $conn ) {
+ private function applyTransactionRoundFlags( Database $conn ) {
if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
return; // transaction rounds do not apply to these connections
}
}
/**
- * @param IDatabase $conn
+ * @param Database $conn
*/
- private function undoTransactionRoundFlags( IDatabase $conn ) {
+ private function undoTransactionRoundFlags( Database $conn ) {
if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
return; // transaction rounds do not apply to these connections
}
}
public function flushReplicaSnapshots( $fname = __METHOD__ ) {
- $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
- $conn->flushSnapshot( __METHOD__ );
+ $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
+ $conn->flushSnapshot( $fname );
} );
}
+ private function flushMasterSnapshots( $fname = __METHOD__ ) {
+ $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+ $conn->flushSnapshot( $fname );
+ } );
+ }
+
+ /**
+ * @return string
+ * @since 1.32
+ */
+ public function getTransactionRoundStage() {
+ return $this->trxRoundStage;
+ }
+
public function hasMasterConnection() {
return $this->isOpen( $this->getWriterIndex() );
}
$lb->closeAll();
}
+
+ public function testTransactionCallbackChains() {
+ global $wgDBserver, $wgDBname, $wgDBuser, $wgDBpassword, $wgDBtype, $wgSQLiteDataDir;
+
+ $servers = [
+ [
+ 'host' => $wgDBserver,
+ 'dbname' => $wgDBname,
+ 'tablePrefix' => $this->dbPrefix(),
+ 'user' => $wgDBuser,
+ 'password' => $wgDBpassword,
+ 'type' => $wgDBtype,
+ 'dbDirectory' => $wgSQLiteDataDir,
+ 'load' => 0,
+ 'flags' => DBO_TRX // REPEATABLE-READ for consistency
+ ],
+ ];
+
+ $lb = new LoadBalancer( [
+ 'servers' => $servers,
+ 'localDomain' => new DatabaseDomain( $wgDBname, null, $this->dbPrefix() )
+ ] );
+
+ $conn1 = $lb->openConnection( $lb->getWriterIndex(), false );
+ $conn2 = $lb->openConnection( $lb->getWriterIndex(), '' );
+
+ $count = 0;
+ $lb->forEachOpenMasterConnection( function () use ( &$count ) {
+ ++$count;
+ } );
+ $this->assertEquals( 2, $count, 'Connection handle count' );
+
+ $tlCalls = 0;
+ $lb->setTransactionListener( 'test-listener', function () use ( &$tlCalls ) {
+ ++$tlCalls;
+ } );
+
+ $lb->beginMasterChanges( __METHOD__ );
+ $bc = array_fill_keys( [ 'a', 'b', 'c', 'd' ], 0 );
+ $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) {
+ $bc['a'] = 1;
+ $conn2->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) {
+ $bc['b'] = 1;
+ $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) {
+ $bc['c'] = 1;
+ $conn1->onTransactionPreCommitOrIdle( function () use ( &$bc, $conn1, $conn2 ) {
+ $bc['d'] = 1;
+ } );
+ } );
+ } );
+ } );
+ $lb->finalizeMasterChanges();
+ $lb->approveMasterChanges( [] );
+ $lb->commitMasterChanges( __METHOD__ );
+ $lb->runMasterTransactionIdleCallbacks();
+ $lb->runMasterTransactionListenerCallbacks();
+
+ $this->assertEquals( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $bc );
+ $this->assertEquals( 2, $tlCalls );
+
+ $tlCalls = 0;
+ $lb->beginMasterChanges( __METHOD__ );
+ $ac = array_fill_keys( [ 'a', 'b', 'c', 'd' ], 0 );
+ $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) {
+ $ac['a'] = 1;
+ $conn2->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) {
+ $ac['b'] = 1;
+ $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) {
+ $ac['c'] = 1;
+ $conn1->onTransactionIdle( function () use ( &$ac, $conn1, $conn2 ) {
+ $ac['d'] = 1;
+ } );
+ } );
+ } );
+ } );
+ $lb->finalizeMasterChanges();
+ $lb->approveMasterChanges( [] );
+ $lb->commitMasterChanges( __METHOD__ );
+ $lb->runMasterTransactionIdleCallbacks();
+ $lb->runMasterTransactionListenerCallbacks();
+
+ $this->assertEquals( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $ac );
+ $this->assertEquals( 2, $tlCalls );
+
+ $conn1->close();
+ $conn2->close();
+ }
}
use Wikimedia\Rdbms\DatabaseSqlite;
use Wikimedia\Rdbms\DatabasePostgres;
use Wikimedia\Rdbms\DatabaseMssql;
+use Wikimedia\Rdbms\DBUnexpectedError;
class DatabaseTest extends PHPUnit\Framework\TestCase {
$called = true;
$db->setFlag( DBO_TRX );
} );
- $db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS );
+ $db->rollback( __METHOD__ );
$this->assertFalse( $db->getFlag( DBO_TRX ), 'DBO_TRX restored to default' );
$this->assertTrue( $called, 'Callback reached' );
}
$this->assertEquals( true, $db->lockIsFree( 'x', __METHOD__ ) );
$db->clearFlag( DBO_TRX );
+ // Pending writes with DBO_TRX
$this->assertEquals( 0, $db->trxLevel() );
-
+ $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) );
$db->setFlag( DBO_TRX );
+ $db->query( "DELETE FROM test WHERE t = 1" ); // trigger DBO_TRX transaction before lock
try {
- $this->badLockingMethodImplicit( $db );
- } catch ( RunTimeException $e ) {
- $this->assertTrue( $db->trxLevel() > 0, "Transaction not committed." );
+ $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 );
+ $this->fail( "Exception not reached" );
+ } catch ( DBUnexpectedError $e ) {
+ $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." );
+ $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ), 'Lock not acquired' );
}
- $db->clearFlag( DBO_TRX );
$db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS );
- $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) );
-
+ // Pending writes without DBO_TRX
+ $db->clearFlag( DBO_TRX );
+ $this->assertEquals( 0, $db->trxLevel() );
+ $this->assertTrue( $db->lockIsFree( 'meow2', __METHOD__ ) );
+ $db->begin( __METHOD__ );
+ $db->query( "DELETE FROM test WHERE t = 1" ); // trigger DBO_TRX transaction before lock
try {
- $this->badLockingMethodExplicit( $db );
- } catch ( RunTimeException $e ) {
- $this->assertTrue( $db->trxLevel() > 0, "Transaction not committed." );
+ $lock = $db->getScopedLockAndFlush( 'meow2', __METHOD__, 1 );
+ $this->fail( "Exception not reached" );
+ } catch ( DBUnexpectedError $e ) {
+ $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." );
+ $this->assertTrue( $db->lockIsFree( 'meow2', __METHOD__ ), 'Lock not acquired' );
}
+ $db->rollback( __METHOD__ );
+ // No pending writes, with DBO_TRX
+ $db->setFlag( DBO_TRX );
+ $this->assertEquals( 0, $db->trxLevel() );
+ $this->assertTrue( $db->lockIsFree( 'wuff', __METHOD__ ) );
+ $db->query( "SELECT 1", __METHOD__ );
+ $this->assertEquals( 1, $db->trxLevel() );
+ $lock = $db->getScopedLockAndFlush( 'wuff', __METHOD__, 1 );
+ $this->assertEquals( 0, $db->trxLevel() );
+ $this->assertFalse( $db->lockIsFree( 'wuff', __METHOD__ ), 'Lock already acquired' );
$db->rollback( __METHOD__, IDatabase::FLUSHING_ALL_PEERS );
- $this->assertTrue( $db->lockIsFree( 'meow', __METHOD__ ) );
- }
-
- private function badLockingMethodImplicit( IDatabase $db ) {
- $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 );
- $db->query( "SELECT 1" ); // trigger DBO_TRX
- throw new RunTimeException( "Uh oh!" );
- }
-
- private function badLockingMethodExplicit( IDatabase $db ) {
- $lock = $db->getScopedLockAndFlush( 'meow', __METHOD__, 1 );
+ // No pending writes, without DBO_TRX
+ $db->clearFlag( DBO_TRX );
+ $this->assertEquals( 0, $db->trxLevel() );
+ $this->assertTrue( $db->lockIsFree( 'wuff2', __METHOD__ ) );
$db->begin( __METHOD__ );
- throw new RunTimeException( "Uh oh!" );
+ try {
+ $lock = $db->getScopedLockAndFlush( 'wuff2', __METHOD__, 1 );
+ $this->fail( "Exception not reached" );
+ } catch ( DBUnexpectedError $e ) {
+ $this->assertEquals( 1, $db->trxLevel(), "Transaction not committed." );
+ $this->assertFalse( $db->lockIsFree( 'wuff2', __METHOD__ ), 'Lock not acquired' );
+ }
+ $db->rollback( __METHOD__ );
}
/**