/** @var array Map of (table name => 1) for TEMPORARY tables */
protected $sessionTempTables = [];
- /** @var int Whether there is an active transaction (1 or 0) */
- protected $trxLevel = 0;
- /** @var string Hexidecimal string if a transaction is active or empty string otherwise */
+ /** @var string ID of the active transaction or the empty string otherwise */
protected $trxShortId = '';
/** @var int Transaction status */
protected $trxStatus = self::STATUS_TRX_NONE;
return $res;
}
- public function trxLevel() {
- return $this->trxLevel;
+ final public function trxLevel() {
+ return ( $this->trxShortId != '' ) ? 1 : 0;
}
public function trxTimestamp() {
- return $this->trxLevel ? $this->trxTimestamp : null;
+ return $this->trxLevel() ? $this->trxTimestamp : null;
}
/**
}
public function writesPending() {
- return $this->trxLevel && $this->trxDoneWrites;
+ return $this->trxLevel() && $this->trxDoneWrites;
}
public function writesOrCallbacksPending() {
- return $this->trxLevel && (
+ return $this->trxLevel() && (
$this->trxDoneWrites ||
$this->trxIdleCallbacks ||
$this->trxPreCommitCallbacks ||
}
public function preCommitCallbacksPending() {
- return $this->trxLevel && $this->trxPreCommitCallbacks;
+ return $this->trxLevel() && $this->trxPreCommitCallbacks;
}
/**
}
public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
return false;
} elseif ( !$this->trxDoneWrites ) {
return 0.0;
}
public function pendingWriteCallers() {
- return $this->trxLevel ? $this->trxWriteCallers : [];
+ return $this->trxLevel() ? $this->trxWriteCallers : [];
}
public function pendingWriteRowsAffected() {
// This should mostly do nothing if the connection is already closed
if ( $this->conn ) {
// Roll back any dangling transaction first
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
if ( $this->trxAtomicLevels ) {
// Cannot let incomplete atomic sections be committed
$levels = $this->flatAtomicSectionList();
final protected function executeQuery( $sql, $fname, $flags ) {
$this->assertHasConnectionHandle();
- $priorTransaction = $this->trxLevel;
+ $priorTransaction = $this->trxLevel();
if ( $this->isWriteQuery( $sql ) ) {
# In theory, non-persistent writes are allowed in read-only mode, but due to things
// Keep track of whether the transaction has write queries pending
if ( $isPermWrite ) {
$this->lastWriteTime = microtime( true );
- if ( $this->trxLevel && !$this->trxDoneWrites ) {
+ if ( $this->trxLevel() && !$this->trxDoneWrites ) {
$this->trxDoneWrites = true;
$this->trxProfiler->transactionWritingIn(
$this->server, $this->getDomainID(), $this->trxShortId );
if ( $ret !== false ) {
$this->lastPing = $startTime;
- if ( $isPermWrite && $this->trxLevel ) {
+ if ( $isPermWrite && $this->trxLevel() ) {
$this->updateTrxWriteQueryTime( $sql, $queryRuntime, $this->affectedRows() );
$this->trxWriteCallers[] = $fname;
}
*/
private function beginIfImplied( $sql, $fname ) {
if (
- !$this->trxLevel &&
+ !$this->trxLevel() &&
$this->getFlag( self::DBO_TRX ) &&
$this->isTransactableQuery( $sql )
) {
// https://www.postgresql.org/docs/9.4/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
$this->sessionNamedLocks = [];
// Session loss implies transaction loss
- $this->trxLevel = 0;
+ $oldTrxShortId = $this->consumeTrxShortId();
$this->trxAtomicCounter = 0;
$this->trxIdleCallbacks = []; // T67263; transaction already lost
$this->trxPreCommitCallbacks = []; // T67263; transaction already lost
$this->trxProfiler->transactionWritingOut(
$this->server,
$this->getDomainID(),
- $this->trxShortId,
+ $oldTrxShortId,
$this->pendingWriteQueryDuration( self::ESTIMATE_TOTAL ),
$this->trxWriteAffectedRows
);
}
}
+ /**
+ * Reset the transaction ID and return the old one
+ *
+ * @return string The old transaction ID or the empty string if there wasn't one
+ */
+ private function consumeTrxShortId() {
+ $old = $this->trxShortId;
+ $this->trxShortId = '';
+
+ return $old;
+ }
+
/**
* Checks whether the cause of the error is detected to be a timeout.
*
public function lockForUpdate(
$table, $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
) {
- if ( !$this->trxLevel && !$this->getFlag( self::DBO_TRX ) ) {
+ if ( !$this->trxLevel() && !$this->getFlag( self::DBO_TRX ) ) {
throw new DBUnexpectedError(
$this,
__METHOD__ . ': no transaction is active nor is DBO_TRX set'
}
final public function onTransactionResolution( callable $callback, $fname = __METHOD__ ) {
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
throw new DBUnexpectedError( $this, "No transaction is active." );
}
$this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
}
final public function onTransactionCommitOrIdle( callable $callback, $fname = __METHOD__ ) {
- if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+ if ( !$this->trxLevel() && $this->getTransactionRoundId() ) {
// Start an implicit transaction similar to how query() does
$this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
$this->trxAutomatic = true;
}
$this->trxIdleCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
$this->runOnTransactionIdleCallbacks( self::TRIGGER_IDLE );
}
}
}
final public function onTransactionPreCommitOrIdle( callable $callback, $fname = __METHOD__ ) {
- if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+ if ( !$this->trxLevel() && $this->getTransactionRoundId() ) {
// Start an implicit transaction similar to how query() does
$this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
$this->trxAutomatic = true;
}
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
$this->trxPreCommitCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
} else {
// No transaction is active nor will start implicitly, so make one for this callback
}
final public function onAtomicSectionCancel( callable $callback, $fname = __METHOD__ ) {
- if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+ if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
}
$this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
* @return AtomicSectionIdentifier|null ID of the topmost atomic section level
*/
private function currentAtomicSectionId() {
- if ( $this->trxLevel && $this->trxAtomicLevels ) {
+ if ( $this->trxLevel() && $this->trxAtomicLevels ) {
$levelInfo = end( $this->trxAtomicLevels );
return $levelInfo[1];
* @throws Exception
*/
public function runOnTransactionIdleCallbacks( $trigger ) {
- if ( $this->trxLevel ) { // sanity
+ if ( $this->trxLevel() ) { // sanity
throw new DBUnexpectedError( $this, __METHOD__ . ': a transaction is still open.' );
}
) {
$savepointId = $cancelable === self::ATOMIC_CANCELABLE ? self::$NOT_APPLICABLE : null;
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
$this->begin( $fname, self::TRANSACTION_INTERNAL ); // sets trxAutomatic
// If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result
// in all changes being in one transaction to keep requests transactional.
}
final public function endAtomic( $fname = __METHOD__ ) {
- if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+ if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
}
final public function cancelAtomic(
$fname = __METHOD__, AtomicSectionIdentifier $sectionId = null
) {
- if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+ if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
}
}
// Protect against mismatched atomic section, transaction nesting, and snapshot loss
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
if ( $this->trxAtomicLevels ) {
$levels = $this->flatAtomicSectionList();
$msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open.";
$this->assertHasConnectionHandle();
$this->doBegin( $fname );
+ $this->trxShortId = sprintf( '%06x', mt_rand( 0, 0xffffff ) );
$this->trxStatus = self::STATUS_TRX_OK;
$this->trxStatusIgnoredCause = null;
$this->trxAtomicCounter = 0;
$this->trxDoneWrites = false;
$this->trxAutomaticAtomic = false;
$this->trxAtomicLevels = [];
- $this->trxShortId = sprintf( '%06x', mt_rand( 0, 0xffffff ) );
$this->trxWriteDuration = 0.0;
$this->trxWriteQueryCount = 0;
$this->trxWriteAffectedRows = 0;
*
* @see Database::begin()
* @param string $fname
+ * @throws DBError
*/
protected function doBegin( $fname ) {
$this->query( 'BEGIN', $fname );
- $this->trxLevel = 1;
}
final public function commit( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) {
throw new DBUnexpectedError( $this, "$fname: invalid flush parameter '$flush'." );
}
- if ( $this->trxLevel && $this->trxAtomicLevels ) {
+ if ( $this->trxLevel() && $this->trxAtomicLevels ) {
// There are still atomic sections open; this cannot be ignored
$levels = $this->flatAtomicSectionList();
throw new DBUnexpectedError(
}
if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
return; // nothing to do
} elseif ( !$this->trxAutomatic ) {
throw new DBUnexpectedError(
"$fname: Flushing an explicit transaction, getting out of sync."
);
}
- } elseif ( !$this->trxLevel ) {
+ } elseif ( !$this->trxLevel() ) {
$this->queryLogger->error(
"$fname: No transaction to commit, something got out of sync." );
return; // nothing to do
$writeTime = $this->pendingWriteQueryDuration( self::ESTIMATE_DB_APPLY );
$this->doCommit( $fname );
+ $oldTrxShortId = $this->consumeTrxShortId();
$this->trxStatus = self::STATUS_TRX_NONE;
if ( $this->trxDoneWrites ) {
$this->trxProfiler->transactionWritingOut(
$this->server,
$this->getDomainID(),
- $this->trxShortId,
+ $oldTrxShortId,
$writeTime,
$this->trxWriteAffectedRows
);
*
* @see Database::commit()
* @param string $fname
+ * @throws DBError
*/
protected function doCommit( $fname ) {
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
$this->query( 'COMMIT', $fname );
- $this->trxLevel = 0;
}
}
final public function rollback( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) {
- $trxActive = $this->trxLevel;
+ $trxActive = $this->trxLevel();
if ( $flush !== self::FLUSHING_INTERNAL
&& $flush !== self::FLUSHING_ALL_PEERS
$this->assertHasConnectionHandle();
$this->doRollback( $fname );
+ $oldTrxShortId = $this->consumeTrxShortId();
$this->trxStatus = self::STATUS_TRX_NONE;
$this->trxAtomicLevels = [];
// Estimate the RTT via a query now that trxStatus is OK
$this->trxProfiler->transactionWritingOut(
$this->server,
$this->getDomainID(),
- $this->trxShortId,
+ $oldTrxShortId,
$writeTime,
$this->trxWriteAffectedRows
);
*
* @see Database::rollback()
* @param string $fname
+ * @throws DBError
*/
protected function doRollback( $fname ) {
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
# Disconnects cause rollback anyway, so ignore those errors
$ignoreErrors = true;
$this->query( 'ROLLBACK', $fname, $ignoreErrors );
- $this->trxLevel = 0;
}
}
}
public function explicitTrxActive() {
- return $this->trxLevel && ( $this->trxAtomicLevels || !$this->trxAutomatic );
+ return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic );
}
public function duplicateTableStructure(
* @since 1.27
*/
final protected function getRecordedTransactionLagStatus() {
- return ( $this->trxLevel && $this->trxReplicaLag !== null )
+ return ( $this->trxLevel() && $this->trxReplicaLag !== null )
? [ 'lag' => $this->trxReplicaLag, 'since' => $this->trxTimestamp() ]
: null;
}
* Run a few simple sanity checks and close dangling connections
*/
public function __destruct() {
- if ( $this->trxLevel && $this->trxDoneWrites ) {
+ if ( $this->trxLevel() && $this->trxDoneWrites ) {
trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})." );
}
class LBFactoryMulti extends LBFactory {
/** @var array A map of database names to section names */
private $sectionsByDB;
-
/**
* @var array A 2-d map. For each section, gives a map of server names to
* load ratios
*/
private $sectionLoads;
-
/**
* @var array[] Server info associative array
* @note The host, hostName and load entries will be overridden
*/
private $serverTemplate;
- // Optional settings
-
/** @var array A 3-d map giving server load ratios for each section and group */
private $groupLoadsBySection = [];
-
/** @var array A 3-d map giving server load ratios by DB name */
private $groupLoadsByDB = [];
-
/** @var array A map of hostname to IP address */
private $hostsByName = [];
-
/** @var array A map of external storage cluster name to server load map */
private $externalLoads = [];
-
/**
* @var array A set of server info keys overriding serverTemplate for
* external storage
*/
private $externalTemplateOverrides;
-
/**
* @var array A 2-d map overriding serverTemplate and
* externalTemplateOverrides on a server-by-server basis. Applies to both
* core and external storage
*/
private $templateOverridesByServer;
-
/** @var array A 2-d map overriding the server info by section */
private $templateOverridesBySection;
-
/** @var array A 2-d map overriding the server info by external storage cluster */
private $templateOverridesByCluster;
-
/** @var array An override array for all master servers */
private $masterTemplateOverrides;
-
/**
* @var array|bool A map of section name to read-only message. Missing or
* false for read/write
/** @var LoadBalancer[] */
private $mainLBs = [];
-
/** @var LoadBalancer[] */
private $extLBs = [];
-
/** @var string */
private $loadMonitorClass = 'LoadMonitor';
-
/** @var string */
private $lastDomain;
-
/** @var string */
private $lastSection;
if ( $this->lastDomain === $domain ) {
return $this->lastSection;
}
- list( $dbName, ) = $this->getDBNameAndPrefix( $domain );
- $section = $this->sectionsByDB[$dbName] ?? 'DEFAULT';
+
+ $database = $this->getDatabaseFromDomain( $domain );
+ $section = $this->sectionsByDB[$database] ?? 'DEFAULT';
$this->lastSection = $section;
$this->lastDomain = $domain;
return $section;
}
- /**
- * @param bool|string $domain
- * @return LoadBalancer
- */
public function newMainLB( $domain = false ) {
- list( $dbName, ) = $this->getDBNameAndPrefix( $domain );
+ $database = $this->getDatabaseFromDomain( $domain );
$section = $this->getSectionForDomain( $domain );
- $groupLoads = $this->groupLoadsByDB[$dbName] ?? [];
+ $groupLoads = $this->groupLoadsByDB[$database] ?? [];
if ( isset( $this->groupLoadsBySection[$section] ) ) {
$groupLoads = array_merge_recursive(
);
}
- /**
- * @param DatabaseDomain|string|bool $domain Domain ID, or false for the current domain
- * @return LoadBalancer
- */
public function getMainLB( $domain = false ) {
$section = $this->getSectionForDomain( $domain );
if ( !isset( $this->mainLBs[$section] ) ) {
/**
* @param DatabaseDomain|string|bool $domain Domain ID, or false for the current domain
- * @return array [database name, table prefix]
+ * @return string
*/
- private function getDBNameAndPrefix( $domain = false ) {
- $domain = ( $domain === false )
- ? $this->localDomain
- : DatabaseDomain::newFromId( $domain );
-
- return [ $domain->getDatabase(), $domain->getTablePrefix() ];
+ private function getDatabaseFromDomain( $domain = false ) {
+ return ( $domain === false )
+ ? $this->localDomain->getDatabase()
+ : DatabaseDomain::newFromId( $domain )->getDatabase();
}
- /**
- * Execute a function for each tracked load balancer
- * The callback is called with the load balancer as the first parameter,
- * and $params passed as the subsequent parameters.
- * @param callable $callback
- * @param array $params
- */
public function forEachLB( $callback, array $params = [] ) {
foreach ( $this->mainLBs as $lb ) {
$callback( $lb, ...$params );