return $this->__call( __FUNCTION__, func_get_args() );
}
+ public function setTransactionListener( $name, callable $callback = null ) {
+ return $this->__call( __FUNCTION__, func_get_args() );
+ }
+
public function startAtomic( $fname = __METHOD__ ) {
return $this->__call( __FUNCTION__, func_get_args() );
}
protected $mTrxPreCommitCallbacks = [];
/** @var array[] List of (callable, method name) */
protected $mTrxEndCallbacks = [];
- /** @var bool Whether to suppress triggering of post-commit callbacks */
- protected $suppressPostCommitCallbacks = false;
+ /** @var array[] Map of (name => (callable, method name)) */
+ protected $mTrxRecurringCallbacks = [];
+ /** @var bool Whether to suppress triggering of transaction end callbacks */
+ protected $mTrxEndCallbacksSuppressed = false;
/** @var string */
protected $mTablePrefix;
try {
// Handle callbacks in mTrxEndCallbacks
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
return null;
} catch ( Exception $e ) {
// Already logged; move on...
}
}
+ final public function setTransactionListener( $name, callable $callback = null ) {
+ if ( $callback ) {
+ $this->mTrxRecurringCallbacks[$name] = [ $callback, wfGetCaller() ];
+ } else {
+ unset( $this->mTrxRecurringCallbacks[$name] );
+ }
+ }
+
/**
- * Whether to disable running of post-commit callbacks
+ * Whether to disable running of post-COMMIT/ROLLBACK callbacks
*
* This method should not be used outside of Database/LoadBalancer
*
* @param bool $suppress
* @since 1.28
*/
- final public function setPostCommitCallbackSupression( $suppress ) {
- $this->suppressPostCommitCallbacks = $suppress;
+ final public function setTrxEndCallbackSuppression( $suppress ) {
+ $this->mTrxEndCallbacksSuppressed = $suppress;
}
/**
* @throws Exception
*/
public function runOnTransactionIdleCallbacks( $trigger ) {
- if ( $this->suppressPostCommitCallbacks ) {
+ if ( $this->mTrxEndCallbacksSuppressed ) {
return;
}
}
}
+ /**
+ * Actually run any "transaction listener" callbacks.
+ *
+ * This method should not be used outside of Database/LoadBalancer
+ *
+ * @param integer $trigger IDatabase::TRIGGER_* constant
+ * @throws Exception
+ * @since 1.20
+ */
+ public function runTransactionListenerCallbacks( $trigger ) {
+ if ( $this->mTrxEndCallbacksSuppressed ) {
+ return;
+ }
+
+ /** @var Exception $e */
+ $e = null; // first exception
+
+ foreach ( $this->mTrxRecurringCallbacks as $callback ) {
+ try {
+ list( $phpCallback ) = $callback;
+ $phpCallback( $trigger, $this );
+ } catch ( Exception $ex ) {
+ MWExceptionHandler::logException( $ex );
+ $e = $e ?: $ex;
+ }
+ }
+
+ if ( $e instanceof Exception ) {
+ throw $e; // re-throw any first exception
+ }
+ }
+
final public function startAtomic( $fname = __METHOD__ ) {
if ( !$this->mTrxLevel ) {
$this->begin( $fname, self::TRANSACTION_INTERNAL );
}
$this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
}
/**
$this->mTrxIdleCallbacks = []; // clear
$this->mTrxPreCommitCallbacks = []; // clear
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
}
/**
}
}
+ public function clearSnapshot( $fname = __METHOD__ ) {
+ if ( $this->writesOrCallbacksPending() || $this->explicitTrxActive() ) {
+ // This only flushes transactions to clear snapshots, not to write data
+ throw new DBUnexpectedError(
+ $this,
+ "$fname: Cannot COMMIT to clear snapshot because writes are pending."
+ );
+ }
+
+ $this->commit( $fname, self::FLUSHING_INTERNAL );
+ }
+
public function explicitTrxActive() {
return $this->mTrxLevel && ( $this->mTrxAtomicLevels || !$this->mTrxAutomatic );
}
interface IDatabase {
/** @var int Callback triggered immediately due to no active transaction */
const TRIGGER_IDLE = 1;
- /** @var int Callback triggered by commit */
+ /** @var int Callback triggered by COMMIT */
const TRIGGER_COMMIT = 2;
- /** @var int Callback triggered by rollback */
+ /** @var int Callback triggered by ROLLBACK */
const TRIGGER_ROLLBACK = 3;
/** @var string Transaction is requested by regular caller outside of the DB layer */
/**
* Returns true if there is a transaction open with possible write
* queries or transaction pre-commit/idle callbacks waiting on it to finish.
+ * This does *not* count recurring callbacks, e.g. from setTransactionListener().
*
* @return bool
*/
* This is useful for combining cooperative locks and DB transactions.
*
* The callback takes one argument:
- * How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK)
+ * - How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK)
*
* @param callable $callback
* @return mixed
* Updates will execute in the order they were enqueued.
*
* The callback takes one argument:
- * How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_IDLE)
+ * - How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_IDLE)
*
* @param callable $callback
* @since 1.20
*/
public function onTransactionPreCommitOrIdle( callable $callback );
+ /**
+ * Run a callback each time any transaction commits or rolls back
+ *
+ * The callback takes two arguments:
+ * - IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK
+ * - This IDatabase object
+ * Callbacks must commit any transactions that they begin.
+ *
+ * Registering a callback here will not affect writesOrCallbacks() pending
+ *
+ * @param string $name Callback name
+ * @param callable|null $callback Use null to unset a listener
+ * @return mixed
+ * @since 1.28
+ */
+ public function setTransactionListener( $name, callable $callback = null );
+
/**
* Begin an atomic section of statements
*
/** @var mixed */
protected $ticket;
+ /** @var string|bool String if a requested DBO_TRX transaction round is active */
+ protected $trxRoundId = false;
/** @var string|bool Reason all LBs are read-only or false if not */
protected $readOnlyReason = false;
+ /** @var callable[] */
+ protected $replicationWaitCallbacks = [];
const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
* This allows for custom transaction rounds from any outer transaction scope.
*
* @param string $fname
+ * @throws DBTransactionError
* @since 1.28
*/
public function beginMasterChanges( $fname = __METHOD__ ) {
+ if ( $this->trxRoundId !== false ) {
+ throw new DBTransactionError(
+ null,
+ "Transaction round '{$this->trxRoundId}' already started."
+ );
+ }
+ $this->trxRoundId = $fname;
+ // Set DBO_TRX flags on all appropriate DBs
$this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
}
* @throws Exception
*/
public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
- // Perform all pre-commit callbacks, aborting on failure
- $this->forEachLBCallMethod( 'runMasterPreCommitCallbacks' );
- // Perform all pre-commit checks, aborting on failure
+ // Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
+ $this->forEachLBCallMethod( 'finalizeMasterChanges' );
+ $this->trxRoundId = false;
+ // Perform pre-commit checks, aborting on failure
$this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
// Log the DBs and methods involved in multi-DB transactions
$this->logIfMultiDbTransaction();
- // Actually perform the commit on all master DB connections
+ // Actually perform the commit on all master DB connections and revert DBO_TRX
$this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
// Run all post-commit callbacks
/** @var Exception $e */
$e = null; // first callback exception
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterPostCommitCallbacks();
+ $ex = $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_COMMIT );
$e = $e ?: $ex;
} );
// Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
* @since 1.23
*/
public function rollbackMasterChanges( $fname = __METHOD__ ) {
+ $this->trxRoundId = false;
+ $this->forEachLBCallMethod( 'suppressTransactionEndCallbacks' );
$this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
+ // Run all post-rollback callbacks
+ $this->forEachLB( function ( LoadBalancer $lb ) {
+ $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_ROLLBACK );
+ } );
}
/**
'ifWritesSince' => null
];
+ foreach ( $this->replicationWaitCallbacks as $callback ) {
+ $callback();
+ }
+
// Figure out which clusters need to be checked
/** @var LoadBalancer[] $lbs */
$lbs = [];
}
}
+ /**
+ * Add a callback to be run in every call to waitForReplication() before waiting
+ *
+ * Callbacks must clear any transactions that they start
+ *
+ * @param string $name Callback name
+ * @param callable|null $callback Use null to unset a callback
+ * @since 1.28
+ */
+ public function setWaitForReplicationListener( $name, callable $callback = null ) {
+ if ( $callback ) {
+ $this->replicationWaitCallbacks[$name] = $callback;
+ } else {
+ unset( $this->replicationWaitCallbacks[$name] );
+ }
+ }
+
/**
* Get a token asserting that no transaction writes are active
*
} );
}
+ /**
+ * @param LoadBalancer $lb
+ */
+ protected function initLoadBalancer( LoadBalancer $lb ) {
+ if ( $this->trxRoundId !== false ) {
+ $lb->beginMasterChanges( $this->trxRoundId ); // set DBO_TRX
+ }
+ }
+
/**
* Close all open database connections on all open load balancers.
* @since 1.28
* @return LoadBalancer
*/
private function newLoadBalancer( $template, $loads, $groupLoads, $readOnlyReason ) {
- return new LoadBalancer( [
+ $lb = new LoadBalancer( [
'servers' => $this->makeServerArray( $template, $loads, $groupLoads ),
'loadMonitor' => $this->loadMonitorClass,
'readOnlyReason' => $readOnlyReason,
'srvCache' => $this->srvCache,
'wanCache' => $this->wanCache
] );
+
+ $this->initLoadBalancer( $lb );
+
+ return $lb;
}
/**
}
private function newLoadBalancer( array $servers ) {
- return new LoadBalancer( [
+ $lb = new LoadBalancer( [
'servers' => $servers,
'loadMonitor' => $this->loadMonitorClass,
'readOnlyReason' => $this->readOnlyReason,
'srvCache' => $this->srvCache,
'wanCache' => $this->wanCache
] );
+
+ $this->initLoadBalancer( $lb );
+
+ return $lb;
}
/**
private $srvCache;
/** @var WANObjectCache */
private $wanCache;
+ /** @var TransactionProfiler */
+ protected $trxProfiler;
/** @var bool|DatabaseBase Database connection that caused a problem */
private $mErrorConnection;
private $readOnlyReason = false;
/** @var integer Total connections opened */
private $connsOpened = 0;
-
- /** @var TransactionProfiler */
- protected $trxProfiler;
+ /** @var string|bool String if a requested DBO_TRX transaction round is active */
+ private $trxRoundId = false;
/** @var integer Warn when this many connection are held */
const CONN_HELD_WARN_THRESHOLD = 10;
$this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
);
$db->setTransactionProfiler( $this->trxProfiler );
+ if ( $this->trxRoundId !== false ) {
+ $this->applyTransactionRoundFlags( $db );
+ }
return $db;
}
/**
* Commit transactions on all open connections
* @param string $fname Caller name
+ * @throws DBExpectedError
*/
public function commitAll( $fname = __METHOD__ ) {
- $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- } );
+ $failures = [];
+
+ $restore = ( $this->trxRoundId !== false );
+ $this->trxRoundId = false;
+ $this->forEachOpenConnection(
+ function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+ try {
+ $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+ } catch ( DBError $e ) {
+ MWExceptionHandler::logException( $e );
+ $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+ }
+ if ( $restore && $conn->getLBInfo( 'master' ) ) {
+ $this->undoTransactionRoundFlags( $conn );
+ }
+ }
+ );
+
+ if ( $failures ) {
+ throw new DBExpectedError(
+ null,
+ "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+ );
+ }
}
/**
* Perform all pre-commit callbacks that remain part of the atomic transactions
- * and disable any post-commit callbacks until runMasterPostCommitCallbacks()
+ * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
* @since 1.28
*/
- public function runMasterPreCommitCallbacks() {
+ public function finalizeMasterChanges() {
$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
- // Any error will cause all DB transactions to be rolled back together.
+ // Any error should cause all DB transactions to be rolled back together
+ $conn->setTrxEndCallbackSuppression( false );
$conn->runOnTransactionPreCommitCallbacks();
- // Defer post-commit callbacks until COMMIT finishes for all DBs.
- $conn->setPostCommitCallbackSupression( true );
+ // Defer post-commit callbacks until COMMIT finishes for all DBs
+ $conn->setTrxEndCallbackSuppression( true );
} );
}
* This allows for custom transaction rounds from any outer transaction scope.
*
* @param string $fname
+ * @throws DBExpectedError
* @since 1.28
*/
public function beginMasterChanges( $fname = __METHOD__ ) {
- $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
- if ( $conn->writesOrCallbacksPending() ) {
- throw new DBTransactionError(
- $conn,
- "Transaction with pending writes still active."
- );
- } elseif ( $conn->trxLevel() ) {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- }
- if ( $conn->getFlag( DBO_DEFAULT ) ) {
- // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
- // Force DBO_TRX even in CLI mode since a commit round is expected soon.
- $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
- $conn->onTransactionResolution( function () use ( $conn ) {
- $conn->restoreFlags( $conn::RESTORE_PRIOR );
- } );
- } else {
- // Config has explicitly requested DBO_TRX be either on or off; respect that.
- // This is useful for things like blob stores which use auto-commit mode.
+ if ( $this->trxRoundId !== false ) {
+ throw new DBTransactionError(
+ null,
+ "$fname: Transaction round '{$this->trxRoundId}' already started."
+ );
+ }
+ $this->trxRoundId = $fname;
+
+ $failures = [];
+ $this->forEachOpenMasterConnection(
+ function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
+ $conn->setTrxEndCallbackSuppression( true );
+ try {
+ $conn->clearSnapshot( $fname );
+ } catch ( DBError $e ) {
+ MWExceptionHandler::logException( $e );
+ $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+ }
+ $conn->setTrxEndCallbackSuppression( false );
+ $this->applyTransactionRoundFlags( $conn );
}
- } );
+ );
+
+ if ( $failures ) {
+ throw new DBExpectedError(
+ null,
+ "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
+ );
+ }
}
/**
* Issue COMMIT on all master connections where writes where done
* @param string $fname Caller name
+ * @throws DBExpectedError
*/
public function commitMasterChanges( $fname = __METHOD__ ) {
- $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
- if ( $conn->writesOrCallbacksPending() ) {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+ $failures = [];
+
+ $restore = ( $this->trxRoundId !== false );
+ $this->trxRoundId = false;
+ $this->forEachOpenMasterConnection(
+ function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+ try {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+ } elseif ( $restore ) {
+ $conn->clearSnapshot( $fname );
+ }
+ } catch ( DBError $e ) {
+ MWExceptionHandler::logException( $e );
+ $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+ }
+ if ( $restore ) {
+ $this->undoTransactionRoundFlags( $conn );
+ }
}
- } );
+ );
+
+ if ( $failures ) {
+ throw new DBExpectedError(
+ null,
+ "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+ );
+ }
}
/**
- * Issue all pending post-commit callbacks
+ * Issue all pending post-COMMIT/ROLLBACK callbacks
+ * @param integer $type IDatabase::TRIGGER_* constant
* @return Exception|null The first exception or null if there were none
* @since 1.28
*/
- public function runMasterPostCommitCallbacks() {
+ public function runMasterPostTrxCallbacks( $type ) {
$e = null; // first exception
- $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( &$e ) {
- $conn->setPostCommitCallbackSupression( false );
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
+ $conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
+
+ $conn->setTrxEndCallbackSuppression( false );
+ try {
+ $conn->runOnTransactionIdleCallbacks( $type );
+ } catch ( Exception $ex ) {
+ $e = $e ?: $ex;
+ }
try {
- $conn->runOnTransactionIdleCallbacks( $conn::TRIGGER_COMMIT );
+ $conn->runTransactionListenerCallbacks( $type );
} catch ( Exception $ex ) {
$e = $e ?: $ex;
}
* @since 1.23
*/
public function rollbackMasterChanges( $fname = __METHOD__ ) {
- $failedServers = [];
-
- $masterIndex = $this->getWriterIndex();
- foreach ( $this->mConns as $conns2 ) {
- if ( empty( $conns2[$masterIndex] ) ) {
- continue;
- }
- /** @var DatabaseBase $conn */
- foreach ( $conns2[$masterIndex] as $conn ) {
- if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
- try {
- $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
- } catch ( DBError $e ) {
- MWExceptionHandler::logException( $e );
- $failedServers[] = $conn->getServer();
- }
+ $restore = ( $this->trxRoundId !== false );
+ $this->trxRoundId = false;
+ $this->forEachOpenMasterConnection(
+ function ( DatabaseBase $conn ) use ( $fname, $restore ) {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+ }
+ if ( $restore ) {
+ $this->undoTransactionRoundFlags( $conn );
}
}
+ );
+ }
+
+ /**
+ * Suppress all pending post-COMMIT/ROLLBACK callbacks
+ * @return Exception|null The first exception or null if there were none
+ * @since 1.28
+ */
+ public function suppressTransactionEndCallbacks() {
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
+ $conn->setTrxEndCallbackSuppression( true );
+ } );
+ }
+
+ /**
+ * @param DatabaseBase $conn
+ */
+ private function applyTransactionRoundFlags( DatabaseBase $conn ) {
+ if ( $conn->getFlag( DBO_DEFAULT ) ) {
+ // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
+ // Force DBO_TRX even in CLI mode since a commit round is expected soon.
+ $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
+ // If config has explicitly requested DBO_TRX be either on or off by not
+ // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
+ // for things like blob stores (ExternalStore) which want auto-commit mode.
}
+ }
- if ( $failedServers ) {
- throw new DBExpectedError( null, "Rollback failed on server(s) " .
- implode( ', ', array_unique( $failedServers ) ) );
+ /**
+ * @param DatabaseBase $conn
+ */
+ private function undoTransactionRoundFlags( DatabaseBase $conn ) {
+ if ( $conn->getFlag( DBO_DEFAULT ) ) {
+ $conn->restoreFlags( $conn::RESTORE_PRIOR );
}
}
$this->assertTrue( $called, 'Callback reached' );
}
+ /**
+ * @covers DatabaseBase::setTransactionListener()
+ */
+ public function testTransactionListener() {
+ $db = $this->db;
+
+ $db->setTransactionListener( 'ping', function() use ( $db, &$called ) {
+ $called = true;
+ } );
+
+ $called = false;
+ $db->begin( __METHOD__ );
+ $db->commit( __METHOD__ );
+ $this->assertTrue( $called, 'Callback reached' );
+
+ $called = false;
+ $db->begin( __METHOD__ );
+ $db->commit( __METHOD__ );
+ $this->assertTrue( $called, 'Callback still reached' );
+
+ $called = false;
+ $db->begin( __METHOD__ );
+ $db->rollback( __METHOD__ );
+ $this->assertTrue( $called, 'Callback reached' );
+
+ $db->setTransactionListener( 'ping', null );
+ $called = false;
+ $db->begin( __METHOD__ );
+ $db->commit( __METHOD__ );
+ $this->assertFalse( $called, 'Callback not reached' );
+ }
+
+ /**
+ * @covers DatabaseBase::clearSnapshot()
+ */
+ public function testClearSnapshot() {
+ $db = $this->db;
+
+ $db->clearSnapshot( __METHOD__ ); // ok
+ $db->clearSnapshot( __METHOD__ ); // ok
+
+ $db->setFlag( DBO_TRX, $db::REMEMBER_PRIOR );
+ $db->query( 'SELECT 1', __METHOD__ );
+ $this->assertTrue( (bool)$db->trxLevel(), "Transaction started." );
+ $db->clearSnapshot( __METHOD__ ); // ok
+ $db->restoreFlags( $db::RESTORE_PRIOR );
+
+ $this->assertFalse( (bool)$db->trxLevel(), "Transaction cleared." );
+ }
+
public function testGetScopedLock() {
$db = $this->db;