return $this->__call( __FUNCTION__, func_get_args() );
}
- public function pendingWriteQueryDuration() {
+ public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
return $this->__call( __FUNCTION__, func_get_args() );
}
return $this->__call( __FUNCTION__, func_get_args() );
}
+ public function setTransactionListener( $name, callable $callback = null ) {
+ return $this->__call( __FUNCTION__, func_get_args() );
+ }
+
public function startAtomic( $fname = __METHOD__ ) {
return $this->__call( __FUNCTION__, func_get_args() );
}
return $this->__call( __FUNCTION__, func_get_args() );
}
- public function ping() {
- return $this->__call( __FUNCTION__, func_get_args() );
+ public function ping( &$rtt = null ) {
+ return func_num_args()
+ ? $this->__call( __FUNCTION__, [ &$rtt ] )
+ : $this->__call( __FUNCTION__, [] ); // method cares about null vs missing
}
public function getLag() {
/** How long before it is worth doing a dummy query to test the connection */
const PING_TTL = 1.0;
+ const PING_QUERY = 'SELECT 1 AS ping';
+
+ const TINY_WRITE_SEC = .010;
+ const SLOW_WRITE_SEC = .500;
+ const SMALL_WRITE_ROWS = 100;
/** @var string SQL query */
protected $mLastQuery = '';
protected $mTrxPreCommitCallbacks = [];
/** @var array[] List of (callable, method name) */
protected $mTrxEndCallbacks = [];
- /** @var bool Whether to suppress triggering of post-commit callbacks */
- protected $suppressPostCommitCallbacks = false;
+ /** @var array[] Map of (name => (callable, method name)) */
+ protected $mTrxRecurringCallbacks = [];
+ /** @var bool Whether to suppress triggering of transaction end callbacks */
+ protected $mTrxEndCallbacksSuppressed = false;
/** @var string */
protected $mTablePrefix;
* @var int
*/
protected $mTrxLevel = 0;
-
/**
* Either a short hexidecimal string if a transaction is active or ""
*
* @see DatabaseBase::mTrxLevel
*/
protected $mTrxShortId = '';
-
/**
* The UNIX time that the transaction started. Callers can assume that if
* snapshot isolation is used, then the data is *at least* up to date to that
* @see DatabaseBase::mTrxLevel
*/
private $mTrxTimestamp = null;
-
/** @var float Lag estimate at the time of BEGIN */
private $mTrxSlaveLag = null;
-
/**
* Remembers the function name given for starting the most recent transaction via begin().
* Used to provide additional context for error reporting.
* @see DatabaseBase::mTrxLevel
*/
private $mTrxFname = null;
-
/**
* Record if possible write queries were done in the last transaction started
*
* @see DatabaseBase::mTrxLevel
*/
private $mTrxDoneWrites = false;
-
/**
* Record if the current transaction was started implicitly due to DBO_TRX being set.
*
* @see DatabaseBase::mTrxLevel
*/
private $mTrxAutomatic = false;
-
/**
* Array of levels of atomicity within transactions
*
* @var array
*/
private $mTrxAtomicLevels = [];
-
/**
* Record if the current transaction was started implicitly by DatabaseBase::startAtomic
*
* @var bool
*/
private $mTrxAutomaticAtomic = false;
-
/**
* Track the write query callers of the current transaction
*
* @var string[]
*/
private $mTrxWriteCallers = [];
-
/**
- * Track the seconds spent in write queries for the current transaction
- *
- * @var float
+ * @var float Seconds spent in write queries for the current transaction
*/
private $mTrxWriteDuration = 0.0;
+ /**
+ * @var integer Number of write queries for the current transaction
+ */
+ private $mTrxWriteQueryCount = 0;
+ /**
+ * @var float Like mTrxWriteQueryCount but excludes lock-bound, easy to replicate, queries
+ */
+ private $mTrxWriteAdjDuration = 0.0;
+ /**
+ * @var integer Number of write queries counted in mTrxWriteAdjDuration
+ */
+ private $mTrxWriteAdjQueryCount = 0;
+ /**
+ * @var float RTT time estimate
+ */
+ private $mRTTEstimate = 0.0;
/** @var array Map of (name => 1) for locks obtained via lock() */
private $mNamedLocksHeld = [];
/** @var int[] Prior mFlags values */
private $priorFlags = [];
+ /** @var Profiler */
+ protected $profiler;
/** @var TransactionProfiler */
protected $trxProfiler;
* @return TransactionProfiler
*/
protected function getTransactionProfiler() {
- if ( !$this->trxProfiler ) {
- $this->trxProfiler = new TransactionProfiler();
- }
-
return $this->trxProfiler;
}
);
}
- public function pendingWriteQueryDuration() {
- return $this->mTrxLevel ? $this->mTrxWriteDuration : false;
+ public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
+ if ( !$this->mTrxLevel ) {
+ return false;
+ } elseif ( !$this->mTrxDoneWrites ) {
+ return 0.0;
+ }
+
+ switch ( $type ) {
+ case self::ESTIMATE_DB_APPLY:
+ $this->ping( $rtt );
+ $rttAdjTotal = $this->mTrxWriteAdjQueryCount * $rtt;
+ $applyTime = max( $this->mTrxWriteAdjDuration - $rttAdjTotal, 0 );
+ // For omitted queries, make them count as something at least
+ $omitted = $this->mTrxWriteQueryCount - $this->mTrxWriteAdjQueryCount;
+ $applyTime += self::TINY_WRITE_SEC * $omitted;
+
+ return $applyTime;
+ default: // everything
+ return $this->mTrxWriteDuration;
+ }
}
public function pendingWriteCallers() {
$this->mForeign = $foreign;
- if ( isset( $params['trxProfiler'] ) ) {
- $this->trxProfiler = $params['trxProfiler']; // override
- }
+ $this->profiler = isset( $params['profiler'] )
+ ? $params['profiler']
+ : Profiler::instance(); // @TODO: remove global state
+ $this->trxProfiler = isset( $params['trxProfiler'] )
+ ? $params['trxProfiler']
+ : new TransactionProfiler();
if ( $user ) {
$this->open( $server, $user, $password, $dbName );
}
+
}
/**
* @return bool
*/
protected function isWriteQuery( $sql ) {
- return !preg_match( '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql );
+ return !preg_match(
+ '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql );
+ }
+
+ /**
+ * @param $sql
+ * @return string|null
+ */
+ protected function getQueryVerb( $sql ) {
+ return preg_match( '/^\s*([a-z]+)/i', $sql, $m ) ? strtoupper( $m[1] ) : null;
}
/**
* @return bool
*/
protected function isTransactableQuery( $sql ) {
- $verb = substr( $sql, 0, strcspn( $sql, " \t\r\n" ) );
- return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ] );
+ $verb = $this->getQueryVerb( $sql );
+ return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ], true );
}
public function query( $sql, $fname = __METHOD__, $tempIgnore = false ) {
# Include query transaction state
$queryProf .= $this->mTrxShortId ? " [TRX#{$this->mTrxShortId}]" : "";
- $profiler = Profiler::instance();
- if ( !( $profiler instanceof ProfilerStub ) ) {
- $queryProfSection = $profiler->scopedProfileIn( $queryProf );
- }
-
$startTime = microtime( true );
+ $this->profiler->profileIn( $queryProf );
$ret = $this->doQuery( $commentedSql );
- $queryRuntime = microtime( true ) - $startTime;
+ $this->profiler->profileOut( $queryProf );
+ $queryRuntime = max( microtime( true ) - $startTime, 0.0 );
unset( $queryProfSection ); // profile out (if set)
if ( $ret !== false ) {
$this->lastPing = $startTime;
if ( $isWrite && $this->mTrxLevel ) {
- $this->mTrxWriteDuration += $queryRuntime;
+ $this->updateTrxWriteQueryTime( $sql, $queryRuntime );
$this->mTrxWriteCallers[] = $fname;
}
}
+ if ( $sql === self::PING_QUERY ) {
+ $this->mRTTEstimate = $queryRuntime;
+ }
+
$this->getTransactionProfiler()->recordQueryCompletion(
$queryProf, $startTime, $isWrite, $this->affectedRows()
);
return $ret;
}
+ /**
+ * Update the estimated run-time of a query, not counting large row lock times
+ *
+ * LoadBalancer can be set to rollback transactions that will create huge replication
+ * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
+ * queries, like inserting a row can take a long time due to row locking. This method
+ * uses some simple heuristics to discount those cases.
+ *
+ * @param string $sql
+ * @param float $runtime Total runtime, including RTT
+ */
+ private function updateTrxWriteQueryTime( $sql, $runtime ) {
+ $indicativeOfSlaveRuntime = true;
+ if ( $runtime > self::SLOW_WRITE_SEC ) {
+ $verb = $this->getQueryVerb( $sql );
+ // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
+ if ( $verb === 'INSERT' ) {
+ $indicativeOfSlaveRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS;
+ } elseif ( $verb === 'REPLACE' ) {
+ $indicativeOfSlaveRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS / 2;
+ }
+ }
+
+ $this->mTrxWriteDuration += $runtime;
+ $this->mTrxWriteQueryCount += 1;
+ if ( $indicativeOfSlaveRuntime ) {
+ $this->mTrxWriteAdjDuration += $runtime;
+ $this->mTrxWriteAdjQueryCount += 1;
+ }
+ }
+
private function canRecoverFromDisconnect( $sql, $priorWritesPending ) {
# Transaction dropped; this can mean lost writes, or REPEATABLE-READ snapshots.
# Dropped connections also mean that named locks are automatically released.
try {
// Handle callbacks in mTrxEndCallbacks
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
return null;
} catch ( Exception $e ) {
// Already logged; move on...
}
}
+ final public function setTransactionListener( $name, callable $callback = null ) {
+ if ( $callback ) {
+ $this->mTrxRecurringCallbacks[$name] = [ $callback, wfGetCaller() ];
+ } else {
+ unset( $this->mTrxRecurringCallbacks[$name] );
+ }
+ }
+
/**
- * Whether to disable running of post-commit callbacks
+ * Whether to disable running of post-COMMIT/ROLLBACK callbacks
*
* This method should not be used outside of Database/LoadBalancer
*
* @param bool $suppress
* @since 1.28
*/
- final public function setPostCommitCallbackSupression( $suppress ) {
- $this->suppressPostCommitCallbacks = $suppress;
+ final public function setTrxEndCallbackSuppression( $suppress ) {
+ $this->mTrxEndCallbacksSuppressed = $suppress;
}
/**
* @throws Exception
*/
public function runOnTransactionIdleCallbacks( $trigger ) {
- if ( $this->suppressPostCommitCallbacks ) {
+ if ( $this->mTrxEndCallbacksSuppressed ) {
return;
}
}
}
+ /**
+ * Actually run any "transaction listener" callbacks.
+ *
+ * This method should not be used outside of Database/LoadBalancer
+ *
+ * @param integer $trigger IDatabase::TRIGGER_* constant
+ * @throws Exception
+ * @since 1.20
+ */
+ public function runTransactionListenerCallbacks( $trigger ) {
+ if ( $this->mTrxEndCallbacksSuppressed ) {
+ return;
+ }
+
+ /** @var Exception $e */
+ $e = null; // first exception
+
+ foreach ( $this->mTrxRecurringCallbacks as $callback ) {
+ try {
+ list( $phpCallback ) = $callback;
+ $phpCallback( $trigger, $this );
+ } catch ( Exception $ex ) {
+ MWExceptionHandler::logException( $ex );
+ $e = $e ?: $ex;
+ }
+ }
+
+ if ( $e instanceof Exception ) {
+ throw $e; // re-throw any first exception
+ }
+ }
+
final public function startAtomic( $fname = __METHOD__ ) {
if ( !$this->mTrxLevel ) {
$this->begin( $fname, self::TRANSACTION_INTERNAL );
$this->mTrxAtomicLevels = [];
$this->mTrxShortId = wfRandomString( 12 );
$this->mTrxWriteDuration = 0.0;
+ $this->mTrxWriteQueryCount = 0;
+ $this->mTrxWriteAdjDuration = 0.0;
+ $this->mTrxWriteAdjQueryCount = 0;
$this->mTrxWriteCallers = [];
// First SELECT after BEGIN will establish the snapshot in REPEATABLE-READ.
// Get an estimate of the slave lag before then, treating estimate staleness
}
$this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
}
/**
$this->mTrxIdleCallbacks = []; // clear
$this->mTrxPreCommitCallbacks = []; // clear
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
}
/**
}
}
+ public function clearSnapshot( $fname = __METHOD__ ) {
+ if ( $this->writesOrCallbacksPending() || $this->explicitTrxActive() ) {
+ // This only flushes transactions to clear snapshots, not to write data
+ throw new DBUnexpectedError(
+ $this,
+ "$fname: Cannot COMMIT to clear snapshot because writes are pending."
+ );
+ }
+
+ $this->commit( $fname, self::FLUSHING_INTERNAL );
+ }
+
public function explicitTrxActive() {
return $this->mTrxLevel && ( $this->mTrxAtomicLevels || !$this->mTrxAutomatic );
}
}
}
- public function ping() {
+ public function ping( &$rtt = null ) {
+ // Avoid hitting the server if it was hit recently
if ( $this->isOpen() && ( microtime( true ) - $this->lastPing ) < self::PING_TTL ) {
- return true;
+ if ( !func_num_args() || $this->mRTTEstimate > 0 ) {
+ $rtt = $this->mRTTEstimate;
+ return true; // don't care about $rtt
+ }
}
- $ignoreErrors = true;
- $this->clearFlag( DBO_TRX, self::REMEMBER_PRIOR );
// This will reconnect if possible or return false if not
- $ok = (bool)$this->query( "SELECT 1 AS ping", __METHOD__, $ignoreErrors );
+ $this->clearFlag( DBO_TRX, self::REMEMBER_PRIOR );
+ $ok = ( $this->query( self::PING_QUERY, __METHOD__, true ) !== false );
$this->restoreFlags( self::RESTORE_PRIOR );
+ if ( $ok ) {
+ $rtt = $this->mRTTEstimate;
+ }
+
return $ok;
}
interface IDatabase {
/** @var int Callback triggered immediately due to no active transaction */
const TRIGGER_IDLE = 1;
- /** @var int Callback triggered by commit */
+ /** @var int Callback triggered by COMMIT */
const TRIGGER_COMMIT = 2;
- /** @var int Callback triggered by rollback */
+ /** @var int Callback triggered by ROLLBACK */
const TRIGGER_ROLLBACK = 3;
/** @var string Transaction is requested by regular caller outside of the DB layer */
const TRANSACTION_EXPLICIT = '';
- /** @var string Transaction is requested interally via DBO_TRX/startAtomic() */
+ /** @var string Transaction is requested internally via DBO_TRX/startAtomic() */
const TRANSACTION_INTERNAL = 'implicit';
/** @var string Transaction operation comes from service managing all DBs */
/** @var string Transaction operation comes from the database class internally */
const FLUSHING_INTERNAL = 'flush';
- /** @var string No not remember the prior flags */
+ /** @var string Do not remember the prior flags */
const REMEMBER_NOTHING = '';
/** @var string Remember the prior flags */
const REMEMBER_PRIOR = 'remember';
/** @var string Restore to the initial flag state */
const RESTORE_INITIAL = 'initial';
+ /** @var string Estimate total time (RTT, scanning, waiting on locks, applying) */
+ const ESTIMATE_TOTAL = 'total';
+ /** @var string Estimate time to apply (scanning, applying) */
+ const ESTIMATE_DB_APPLY = 'apply';
+
/**
* A string describing the current software version, and possibly
* other details in a user-friendly way. Will be listed on Special:Version, etc.
/**
* Returns true if there is a transaction open with possible write
* queries or transaction pre-commit/idle callbacks waiting on it to finish.
+ * This does *not* count recurring callbacks, e.g. from setTransactionListener().
*
* @return bool
*/
*
* High times could be due to scanning, updates, locking, and such
*
+ * @param string $type IDatabase::ESTIMATE_* constant [default: ESTIMATE_ALL]
* @return float|bool Returns false if not transaction is active
* @since 1.26
*/
- public function pendingWriteQueryDuration();
+ public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL );
/**
* Get the list of method names that did write queries for this transaction
* This is useful for combining cooperative locks and DB transactions.
*
* The callback takes one argument:
- * How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK)
+ * - How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK)
*
* @param callable $callback
* @return mixed
* Updates will execute in the order they were enqueued.
*
* The callback takes one argument:
- * How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_IDLE)
+ * - How the transaction ended (IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_IDLE)
*
* @param callable $callback
* @since 1.20
*/
public function onTransactionPreCommitOrIdle( callable $callback );
+ /**
+ * Run a callback each time any transaction commits or rolls back
+ *
+ * The callback takes two arguments:
+ * - IDatabase::TRIGGER_COMMIT or IDatabase::TRIGGER_ROLLBACK
+ * - This IDatabase object
+ * Callbacks must commit any transactions that they begin.
+ *
+ * Registering a callback here will not affect writesOrCallbacks() pending
+ *
+ * @param string $name Callback name
+ * @param callable|null $callback Use null to unset a listener
+ * @return mixed
+ * @since 1.28
+ */
+ public function setTransactionListener( $name, callable $callback = null );
+
/**
* Begin an atomic section of statements
*
/**
* Ping the server and try to reconnect if it there is no connection
*
+ * @param float|null &$rtt Value to store the estimated RTT [optional]
* @return bool Success or failure
*/
- public function ping();
+ public function ping( &$rtt = null );
/**
* Get slave lag. Currently supported only by MySQL.
/** @var mixed */
protected $ticket;
+ /** @var string|bool String if a requested DBO_TRX transaction round is active */
+ protected $trxRoundId = false;
/** @var string|bool Reason all LBs are read-only or false if not */
protected $readOnlyReason = false;
+ /** @var callable[] */
+ protected $replicationWaitCallbacks = [];
const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
/**
* Prepare all tracked load balancers for shutdown
* @param integer $flags Supports SHUTDOWN_* flags
- * STUB
*/
public function shutdown( $flags = 0 ) {
+ if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
+ $this->shutdownChronologyProtector( $this->chronProt );
+ }
+ $this->commitMasterChanges( __METHOD__ ); // sanity
}
/**
* This allows for custom transaction rounds from any outer transaction scope.
*
* @param string $fname
+ * @throws DBTransactionError
* @since 1.28
*/
public function beginMasterChanges( $fname = __METHOD__ ) {
+ if ( $this->trxRoundId !== false ) {
+ throw new DBTransactionError(
+ null,
+ "Transaction round '{$this->trxRoundId}' already started."
+ );
+ }
+ $this->trxRoundId = $fname;
+ // Set DBO_TRX flags on all appropriate DBs
$this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
}
* @throws Exception
*/
public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
- // Perform all pre-commit callbacks, aborting on failure
- $this->forEachLBCallMethod( 'runMasterPreCommitCallbacks' );
- // Perform all pre-commit checks, aborting on failure
+ // Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
+ $this->forEachLBCallMethod( 'finalizeMasterChanges' );
+ $this->trxRoundId = false;
+ // Perform pre-commit checks, aborting on failure
$this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
// Log the DBs and methods involved in multi-DB transactions
$this->logIfMultiDbTransaction();
- // Actually perform the commit on all master DB connections
+ // Actually perform the commit on all master DB connections and revert DBO_TRX
$this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
// Run all post-commit callbacks
/** @var Exception $e */
$e = null; // first callback exception
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterPostCommitCallbacks();
+ $ex = $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_COMMIT );
$e = $e ?: $ex;
} );
// Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
* @since 1.23
*/
public function rollbackMasterChanges( $fname = __METHOD__ ) {
+ $this->trxRoundId = false;
+ $this->forEachLBCallMethod( 'suppressTransactionEndCallbacks' );
$this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
+ // Run all post-rollback callbacks
+ $this->forEachLB( function ( LoadBalancer $lb ) {
+ $lb->runMasterPostTrxCallbacks( IDatabase::TRIGGER_ROLLBACK );
+ } );
}
/**
'ifWritesSince' => null
];
+ foreach ( $this->replicationWaitCallbacks as $callback ) {
+ $callback();
+ }
+
// Figure out which clusters need to be checked
/** @var LoadBalancer[] $lbs */
$lbs = [];
}
}
+ /**
+ * Add a callback to be run in every call to waitForReplication() before waiting
+ *
+ * Callbacks must clear any transactions that they start
+ *
+ * @param string $name Callback name
+ * @param callable|null $callback Use null to unset a callback
+ * @since 1.28
+ */
+ public function setWaitForReplicationListener( $name, callable $callback = null ) {
+ if ( $callback ) {
+ $this->replicationWaitCallbacks[$name] = $callback;
+ } else {
+ unset( $this->replicationWaitCallbacks[$name] );
+ }
+ }
+
/**
* Get a token asserting that no transaction writes are active
*
} );
}
+ /**
+ * @param LoadBalancer $lb
+ */
+ protected function initLoadBalancer( LoadBalancer $lb ) {
+ if ( $this->trxRoundId !== false ) {
+ $lb->beginMasterChanges( $this->trxRoundId ); // set DBO_TRX
+ }
+ }
+
/**
* Close all open database connections on all open load balancers.
* @since 1.28
}
}
-
-/**
- * Exception class for attempted DB access
- */
-class DBAccessError extends MWException {
- public function __construct() {
- parent::__construct( "Mediawiki tried to access the database via wfGetDB(). " .
- "This is not allowed, because database access has been disabled." );
- }
-}
-
-/**
- * Exception class for replica DB wait timeouts
- */
-class DBReplicationWaitError extends Exception {
-}
* @return LoadBalancer
*/
private function newLoadBalancer( $template, $loads, $groupLoads, $readOnlyReason ) {
- return new LoadBalancer( [
+ $lb = new LoadBalancer( [
'servers' => $this->makeServerArray( $template, $loads, $groupLoads ),
'loadMonitor' => $this->loadMonitorClass,
'readOnlyReason' => $readOnlyReason,
'srvCache' => $this->srvCache,
'wanCache' => $this->wanCache
] );
+
+ $this->initLoadBalancer( $lb );
+
+ return $lb;
}
/**
call_user_func_array( $callback, array_merge( [ $lb ], $params ) );
}
}
-
- public function shutdown( $flags = 0 ) {
- if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
- $this->shutdownChronologyProtector( $this->chronProt );
- }
- $this->commitMasterChanges( __METHOD__ ); // sanity
- }
}
}
private function newLoadBalancer( array $servers ) {
- return new LoadBalancer( [
+ $lb = new LoadBalancer( [
'servers' => $servers,
'loadMonitor' => $this->loadMonitorClass,
'readOnlyReason' => $this->readOnlyReason,
'srvCache' => $this->srvCache,
'wanCache' => $this->wanCache
] );
+
+ $this->initLoadBalancer( $lb );
+
+ return $lb;
}
/**
call_user_func_array( $callback, array_merge( [ $lb ], $params ) );
}
}
-
- public function shutdown( $flags = 0 ) {
- if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
- $this->shutdownChronologyProtector( $this->chronProt );
- }
- $this->commitMasterChanges( __METHOD__ ); // sanity
- }
}
private $srvCache;
/** @var WANObjectCache */
private $wanCache;
+ /** @var TransactionProfiler */
+ protected $trxProfiler;
/** @var bool|DatabaseBase Database connection that caused a problem */
private $mErrorConnection;
private $readOnlyReason = false;
/** @var integer Total connections opened */
private $connsOpened = 0;
-
- /** @var TransactionProfiler */
- protected $trxProfiler;
+ /** @var string|bool String if a requested DBO_TRX transaction round is active */
+ private $trxRoundId = false;
/** @var integer Warn when this many connection are held */
const CONN_HELD_WARN_THRESHOLD = 10;
$this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
);
$db->setTransactionProfiler( $this->trxProfiler );
+ if ( $this->trxRoundId !== false ) {
+ $this->applyTransactionRoundFlags( $db );
+ }
return $db;
}
/**
* Commit transactions on all open connections
* @param string $fname Caller name
+ * @throws DBExpectedError
*/
public function commitAll( $fname = __METHOD__ ) {
- $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- } );
+ $failures = [];
+
+ $restore = ( $this->trxRoundId !== false );
+ $this->trxRoundId = false;
+ $this->forEachOpenConnection(
+ function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+ try {
+ $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+ } catch ( DBError $e ) {
+ MWExceptionHandler::logException( $e );
+ $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+ }
+ if ( $restore && $conn->getLBInfo( 'master' ) ) {
+ $this->undoTransactionRoundFlags( $conn );
+ }
+ }
+ );
+
+ if ( $failures ) {
+ throw new DBExpectedError(
+ null,
+ "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+ );
+ }
}
/**
* Perform all pre-commit callbacks that remain part of the atomic transactions
- * and disable any post-commit callbacks until runMasterPostCommitCallbacks()
+ * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
* @since 1.28
*/
- public function runMasterPreCommitCallbacks() {
+ public function finalizeMasterChanges() {
$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
- // Any error will cause all DB transactions to be rolled back together.
+ // Any error should cause all DB transactions to be rolled back together
+ $conn->setTrxEndCallbackSuppression( false );
$conn->runOnTransactionPreCommitCallbacks();
- // Defer post-commit callbacks until COMMIT finishes for all DBs.
- $conn->setPostCommitCallbackSupression( true );
+ // Defer post-commit callbacks until COMMIT finishes for all DBs
+ $conn->setTrxEndCallbackSuppression( true );
} );
}
}
// Assert that the time to replicate the transaction will be sane.
// If this fails, then all DB transactions will be rollback back together.
- $time = $conn->pendingWriteQueryDuration();
+ $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
if ( $limit > 0 && $time > $limit ) {
throw new DBTransactionError(
$conn,
* This allows for custom transaction rounds from any outer transaction scope.
*
* @param string $fname
+ * @throws DBExpectedError
* @since 1.28
*/
public function beginMasterChanges( $fname = __METHOD__ ) {
- $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
- if ( $conn->writesOrCallbacksPending() ) {
- throw new DBTransactionError(
- $conn,
- "Transaction with pending writes still active."
- );
- } elseif ( $conn->trxLevel() ) {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- }
- if ( $conn->getFlag( DBO_DEFAULT ) ) {
- // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
- // Force DBO_TRX even in CLI mode since a commit round is expected soon.
- $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
- $conn->onTransactionResolution( function () use ( $conn ) {
- $conn->restoreFlags( $conn::RESTORE_PRIOR );
- } );
- } else {
- // Config has explicitly requested DBO_TRX be either on or off; respect that.
- // This is useful for things like blob stores which use auto-commit mode.
+ if ( $this->trxRoundId !== false ) {
+ throw new DBTransactionError(
+ null,
+ "$fname: Transaction round '{$this->trxRoundId}' already started."
+ );
+ }
+ $this->trxRoundId = $fname;
+
+ $failures = [];
+ $this->forEachOpenMasterConnection(
+ function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
+ $conn->setTrxEndCallbackSuppression( true );
+ try {
+ $conn->clearSnapshot( $fname );
+ } catch ( DBError $e ) {
+ MWExceptionHandler::logException( $e );
+ $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+ }
+ $conn->setTrxEndCallbackSuppression( false );
+ $this->applyTransactionRoundFlags( $conn );
}
- } );
+ );
+
+ if ( $failures ) {
+ throw new DBExpectedError(
+ null,
+ "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
+ );
+ }
}
/**
* Issue COMMIT on all master connections where writes where done
* @param string $fname Caller name
+ * @throws DBExpectedError
*/
public function commitMasterChanges( $fname = __METHOD__ ) {
- $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
- if ( $conn->writesOrCallbacksPending() ) {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+ $failures = [];
+
+ $restore = ( $this->trxRoundId !== false );
+ $this->trxRoundId = false;
+ $this->forEachOpenMasterConnection(
+ function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
+ try {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
+ } elseif ( $restore ) {
+ $conn->clearSnapshot( $fname );
+ }
+ } catch ( DBError $e ) {
+ MWExceptionHandler::logException( $e );
+ $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
+ }
+ if ( $restore ) {
+ $this->undoTransactionRoundFlags( $conn );
+ }
}
- } );
+ );
+
+ if ( $failures ) {
+ throw new DBExpectedError(
+ null,
+ "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
+ );
+ }
}
/**
- * Issue all pending post-commit callbacks
+ * Issue all pending post-COMMIT/ROLLBACK callbacks
+ * @param integer $type IDatabase::TRIGGER_* constant
* @return Exception|null The first exception or null if there were none
* @since 1.28
*/
- public function runMasterPostCommitCallbacks() {
+ public function runMasterPostTrxCallbacks( $type ) {
$e = null; // first exception
- $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( &$e ) {
- $conn->setPostCommitCallbackSupression( false );
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
+ $conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
+
+ $conn->setTrxEndCallbackSuppression( false );
try {
- $conn->runOnTransactionIdleCallbacks( $conn::TRIGGER_COMMIT );
+ $conn->runOnTransactionIdleCallbacks( $type );
+ } catch ( Exception $ex ) {
+ $e = $e ?: $ex;
+ }
+ try {
+ $conn->runTransactionListenerCallbacks( $type );
} catch ( Exception $ex ) {
$e = $e ?: $ex;
}
* @since 1.23
*/
public function rollbackMasterChanges( $fname = __METHOD__ ) {
- $failedServers = [];
-
- $masterIndex = $this->getWriterIndex();
- foreach ( $this->mConns as $conns2 ) {
- if ( empty( $conns2[$masterIndex] ) ) {
- continue;
- }
- /** @var DatabaseBase $conn */
- foreach ( $conns2[$masterIndex] as $conn ) {
- if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
- try {
- $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
- } catch ( DBError $e ) {
- MWExceptionHandler::logException( $e );
- $failedServers[] = $conn->getServer();
- }
+ $restore = ( $this->trxRoundId !== false );
+ $this->trxRoundId = false;
+ $this->forEachOpenMasterConnection(
+ function ( DatabaseBase $conn ) use ( $fname, $restore ) {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+ }
+ if ( $restore ) {
+ $this->undoTransactionRoundFlags( $conn );
}
}
+ );
+ }
+
+ /**
+ * Suppress all pending post-COMMIT/ROLLBACK callbacks
+ * @return Exception|null The first exception or null if there were none
+ * @since 1.28
+ */
+ public function suppressTransactionEndCallbacks() {
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
+ $conn->setTrxEndCallbackSuppression( true );
+ } );
+ }
+
+ /**
+ * @param DatabaseBase $conn
+ */
+ private function applyTransactionRoundFlags( DatabaseBase $conn ) {
+ if ( $conn->getFlag( DBO_DEFAULT ) ) {
+ // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
+ // Force DBO_TRX even in CLI mode since a commit round is expected soon.
+ $conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
+ // If config has explicitly requested DBO_TRX be either on or off by not
+ // setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
+ // for things like blob stores (ExternalStore) which want auto-commit mode.
}
+ }
- if ( $failedServers ) {
- throw new DBExpectedError( null, "Rollback failed on server(s) " .
- implode( ', ', array_unique( $failedServers ) ) );
+ /**
+ * @param DatabaseBase $conn
+ */
+ private function undoTransactionRoundFlags( DatabaseBase $conn ) {
+ if ( $conn->getFlag( DBO_DEFAULT ) ) {
+ $conn->restoreFlags( $conn::RESTORE_PRIOR );
}
}