protected $mTrxPreCommitCallbacks = [];
/** @var array[] List of (callable, method name) */
protected $mTrxEndCallbacks = [];
+ /** @var bool Whether to suppress triggering of post-commit callbacks */
+ protected $suppressPostCommitCallbacks = false;
protected $mTablePrefix;
protected $mSchema;
}
/**
- * Actually run and consume any "on transaction idle" callbacks.
+ * Whether to disable running of post-commit callbacks
+ *
+ * This method should not be used outside of Database/LoadBalancer
+ *
+ * @param bool $suppress
+ * @since 1.28
+ */
+ final public function setPostCommitCallbackSupression( $suppress ) {
+ $this->suppressPostCommitCallbacks = $suppress;
+ }
+
+ /**
+ * Actually run and consume any "on transaction idle/resolution" callbacks.
+ *
+ * This method should not be used outside of Database/LoadBalancer
*
* @param integer $trigger IDatabase::TRIGGER_* constant
* @since 1.20
*/
- protected function runOnTransactionIdleCallbacks( $trigger ) {
+ public function runOnTransactionIdleCallbacks( $trigger ) {
+ if ( $this->suppressPostCommitCallbacks ) {
+ return;
+ }
+
$autoTrx = $this->getFlag( DBO_TRX ); // automatic begin() enabled?
$e = $ePrior = null; // last exception
$this->mTrxIdleCallbacks,
$this->mTrxEndCallbacks // include "transaction resolution" callbacks
);
- $this->mTrxIdleCallbacks = []; // recursion guard
- $this->mTrxEndCallbacks = []; // recursion guard
+ $this->mTrxIdleCallbacks = []; // consumed (and recursion guard)
+ $this->mTrxEndCallbacks = []; // consumed (recursion guard)
foreach ( $callbacks as $callback ) {
try {
list( $phpCallback ) = $callback;
$e = $ePrior = null; // last exception
do { // callbacks may add callbacks :)
$callbacks = $this->mTrxPreCommitCallbacks;
- $this->mTrxPreCommitCallbacks = []; // recursion guard
+ $this->mTrxPreCommitCallbacks = []; // consumed (and recursion guard)
foreach ( $callbacks as $callback ) {
try {
list( $phpCallback ) = $callback;
* 1. To commit changes to the masters.
* 2. To release the snapshot on all connections, master and slave.
* @param string $fname Caller name
+ * @param array $options Options map:
+ * - maxWriteDuration: abort if more than this much time was spent in write queries
*/
- public function commitAll( $fname = __METHOD__ ) {
- $this->logMultiDbTransaction();
-
- $start = microtime( true );
+ public function commitAll( $fname = __METHOD__, array $options = [] ) {
+ $this->commitMasterChanges( $fname, $options );
$this->forEachLBCallMethod( 'commitAll', [ $fname ] );
- $timeMs = 1000 * ( microtime( true ) - $start );
-
- RequestContext::getMain()->getStats()->timing( "db.commit-all", $timeMs );
}
/**
* - maxWriteDuration: abort if more than this much time was spent in write queries
*/
public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
- $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
-
- // Run pre-commit callbacks to keep them out of the COMMIT step. If one errors out here
- // then all DB transactions can be rolled back before anything was committed yet.
- $this->forEachLBCallMethod( 'runPreCommitCallbacks' );
-
- $this->logMultiDbTransaction();
- $this->forEachLB( function ( LoadBalancer $lb ) use ( $limit ) {
- $lb->forEachOpenConnection( function ( IDatabase $db ) use ( $limit ) {
- $time = $db->pendingWriteQueryDuration();
- if ( $limit > 0 && $time > $limit ) {
- throw new DBTransactionError(
- $db,
- wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
- );
- }
- } );
- } );
-
+ // Perform all pre-commit callbacks, aborting on failure
+ $this->forEachLBCallMethod( 'runMasterPreCommitCallbacks' );
+ // Perform all pre-commit checks, aborting on failure
+ $this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
+ // Log the DBs and methods involved in multi-DB transactions
+ $this->logIfMultiDbTransaction();
+ // Actually perform the commit on all master DB connections
+ $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
+ // Run all post-commit callbacks
+ $this->forEachLBCallMethod( 'runMasterPostCommitCallbacks' );
+ // Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
$this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
}
/**
* Log query info if multi DB transactions are going to be committed now
*/
- private function logMultiDbTransaction() {
+ private function logIfMultiDbTransaction() {
$callersByDB = [];
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$callersByDB ) {
$masterName = $lb->getServerName( $lb->getWriterIndex() );
* @param string $fname Caller name
*/
public function commitAll( $fname = __METHOD__ ) {
- foreach ( $this->mConns as $conns2 ) {
- foreach ( $conns2 as $conns3 ) {
- /** @var DatabaseBase[] $conns3 */
- foreach ( $conns3 as $conn ) {
- if ( $conn->trxLevel() ) {
- $conn->commit( $fname, 'flush' );
- }
- }
+ $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
+ $conn->commit( $fname, 'flush' );
+ } );
+ }
+
+ /**
+ * Perform all pre-commit callbacks that remain part of the atomic transactions
+ * and disable any post-commit callbacks until runMasterPostCommitCallbacks()
+ * @since 1.28
+ */
+ public function runMasterPreCommitCallbacks() {
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
+ // Any error will cause all DB transactions to be rolled back together.
+ $conn->runOnTransactionPreCommitCallbacks();
+ // Defer post-commit callbacks until COMMIT finishes for all DBs.
+ $conn->setPostCommitCallbackSupression( true );
+ } );
+ }
+
+ /**
+ * Perform all pre-commit checks for things like replication safety
+ * @param array $options Includes:
+ * - maxWriteDuration : max write query duration time in seconds
+ * @throws DBTransactionError
+ * @since 1.28
+ */
+ public function approveMasterChanges( array $options ) {
+ $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $limit ) {
+ // Assert that the time to replicate the transaction will be sane.
+ // If this fails, then all DB transactions will be rollback back together.
+ $time = $conn->pendingWriteQueryDuration();
+ if ( $limit > 0 && $time > $limit ) {
+ throw new DBTransactionError(
+ $conn,
+ wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
+ );
}
- }
+ } );
}
/**
- * Issue COMMIT only on master, only if queries were done on connection
+ * Issue COMMIT on all master connections where writes where done
* @param string $fname Caller name
*/
public function commitMasterChanges( $fname = __METHOD__ ) {
- $masterIndex = $this->getWriterIndex();
- foreach ( $this->mConns as $conns2 ) {
- if ( empty( $conns2[$masterIndex] ) ) {
- continue;
- }
- /** @var DatabaseBase $conn */
- foreach ( $conns2[$masterIndex] as $conn ) {
- if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
- $conn->commit( $fname, 'flush' );
- }
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->commit( $fname, 'flush' );
}
- }
+ } );
+ }
+
+ /**
+ * Issue all pending post-commit callbacks
+ * @since 1.28
+ */
+ public function runMasterPostCommitCallbacks() {
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $db ) {
+ $db->setPostCommitCallbackSupression( false );
+ $db->runOnTransactionIdleCallbacks( IDatabase::TRIGGER_COMMIT );
+ } );
}
/**
}
}
- /**
- * Call runOnTransactionPreCommitCallbacks() on all DB handles
- *
- * This method should not be used outside of LBFactory/LoadBalancer
- *
- * @since 1.28
- */
- public function runPreCommitCallbacks() {
- $masterIndex = $this->getWriterIndex();
- foreach ( $this->mConns as $conns2 ) {
- if ( empty( $conns2[$masterIndex] ) ) {
- continue;
- }
- /** @var DatabaseBase $conn */
- foreach ( $conns2[$masterIndex] as $conn ) {
- if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
- $conn->runOnTransactionPreCommitCallbacks();
- }
- }
- }
- }
-
/**
* @return bool Whether a master connection is already open
* @since 1.24
}
}
+ /**
+ * Call a function with each open connection object to a master
+ * @param callable $callback
+ * @param array $params
+ * @since 1.28
+ */
+ public function forEachOpenMasterConnection( $callback, array $params = [] ) {
+ $masterIndex = $this->getWriterIndex();
+ foreach ( $this->mConns as $connsByServer ) {
+ if ( isset( $connsByServer[$masterIndex] ) ) {
+ /** @var DatabaseBase $conn */
+ foreach ( $connsByServer[$masterIndex] as $conn ) {
+ $mergedParams = array_merge( [ $conn ], $params );
+ call_user_func_array( $callback, $mergedParams );
+ }
+ }
+ }
+ }
+
/**
* Get the hostname and lag time of the most-lagged slave
*