use UnexpectedValueException;
use Exception;
use RuntimeException;
+use Throwable;
/**
* Relational database abstraction object
* @since 1.28
*/
abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAwareInterface {
- /** Number of times to re-try an operation in case of deadlock */
- const DEADLOCK_TRIES = 4;
- /** Minimum time to wait before retry, in microseconds */
- const DEADLOCK_DELAY_MIN = 500000;
- /** Maximum time to wait before retry */
- const DEADLOCK_DELAY_MAX = 1500000;
-
- /** How long before it is worth doing a dummy query to test the connection */
- const PING_TTL = 1.0;
- const PING_QUERY = 'SELECT 1 AS ping';
-
- const TINY_WRITE_SEC = 0.010;
- const SLOW_WRITE_SEC = 0.500;
- const SMALL_WRITE_ROWS = 100;
-
- /** @var string Lock granularity is on the level of the entire database */
- const ATTR_DB_LEVEL_LOCKING = 'db-level-locking';
- /** @var string The SCHEMA keyword refers to a grouping of tables in a database */
- const ATTR_SCHEMAS_AS_TABLE_GROUPS = 'supports-schemas';
-
- /** @var int New Database instance will not be connected yet when returned */
- const NEW_UNCONNECTED = 0;
- /** @var int New Database instance will already be connected when returned */
- const NEW_CONNECTED = 1;
-
- /** @var string The last SQL query attempted */
- private $lastQuery = '';
- /** @var float|bool UNIX timestamp of last write query */
- private $lastWriteTime = false;
- /** @var string|bool */
- private $lastPhpError = false;
-
/** @var string Server that this instance is currently connected to */
protected $server;
/** @var string User that this instance is currently connected under the name of */
protected $cliMode;
/** @var string Agent name for query profiling */
protected $agent;
+ /** @var int Bitfield of class DBO_* constants */
+ protected $flags;
+ /** @var array LoadBalancer tracking information */
+ protected $lbInfo = [];
+ /** @var array|bool Variables use for schema element placeholders */
+ protected $schemaVars = false;
/** @var array Parameters used by initConnection() to establish a connection */
protected $connectionParams = [];
+ /** @var array SQL variables values to use for all new connections */
+ protected $connectionVariables = [];
+ /** @var string Current SQL query delimiter */
+ protected $delimiter = ';';
+ /** @var string|bool|null Stashed value of html_errors INI setting */
+ protected $htmlErrors;
+ /** @var int Row batch size to use for emulated INSERT SELECT queries */
+ protected $nonNativeInsertSelectBatchSize = 10000;
+
/** @var BagOStuff APC cache */
protected $srvCache;
/** @var LoggerInterface */
protected $errorLogger;
/** @var callable Deprecation logging callback */
protected $deprecationLogger;
-
+ /** @var callable|null */
+ protected $profiler;
+ /** @var TransactionProfiler */
+ protected $trxProfiler;
+ /** @var DatabaseDomain */
+ protected $currentDomain;
/** @var object|resource|null Database connection */
- protected $conn = null;
- /** @var bool */
- protected $opened = false;
+ protected $conn;
- /** @var array[] List of (callable, method name, atomic section id) */
- protected $trxIdleCallbacks = [];
- /** @var array[] List of (callable, method name, atomic section id) */
- protected $trxPreCommitCallbacks = [];
- /** @var array[] List of (callable, method name, atomic section id) */
- protected $trxEndCallbacks = [];
- /** @var callable[] Map of (name => callable) */
- protected $trxRecurringCallbacks = [];
- /** @var bool Whether to suppress triggering of transaction end callbacks */
- protected $trxEndCallbacksSuppressed = false;
+ /** @var IDatabase|null Lazy handle to the master DB this server replicates from */
+ private $lazyMasterHandle;
- /** @var int */
- protected $flags;
- /** @var array */
- protected $lbInfo = [];
- /** @var array|bool */
- protected $schemaVars = false;
- /** @var array */
- protected $sessionVars = [];
- /** @var array|null */
- protected $preparedArgs;
- /** @var string|bool|null Stashed value of html_errors INI setting */
- protected $htmlErrors;
- /** @var string */
- protected $delimiter = ';';
- /** @var DatabaseDomain */
- protected $currentDomain;
- /** @var integer|null Rows affected by the last query to query() or its CRUD wrappers */
- protected $affectedRowCount;
+ /** @var array Map of (name => 1) for locks obtained via lock() */
+ protected $sessionNamedLocks = [];
+ /** @var array Map of (table name => 1) for TEMPORARY tables */
+ protected $sessionTempTables = [];
- /**
- * @var int Transaction status
- */
+ /** @var string ID of the active transaction or the empty string otherwise */
+ protected $trxShortId = '';
+ /** @var int Transaction status */
protected $trxStatus = self::STATUS_TRX_NONE;
- /**
- * @var Exception|null The last error that caused the status to become STATUS_TRX_ERROR
- */
+ /** @var Exception|null The last error that caused the status to become STATUS_TRX_ERROR */
protected $trxStatusCause;
- /**
- * @var array|null If wasKnownStatementRollbackError() prevented trxStatus from being set,
- * the relevant details are stored here.
- */
- protected $trxStatusIgnoredCause;
- /**
- * Either 1 if a transaction is active or 0 otherwise.
- * The other Trx fields may not be meaningfull if this is 0.
- *
- * @var int
- */
- protected $trxLevel = 0;
- /**
- * Either a short hexidecimal string if a transaction is active or ""
- *
- * @var string
- * @see Database::trxLevel
- */
- protected $trxShortId = '';
- /**
- * The UNIX time that the transaction started. Callers can assume that if
- * snapshot isolation is used, then the data is *at least* up to date to that
- * point (possibly more up-to-date since the first SELECT defines the snapshot).
- *
- * @var float|null
- * @see Database::trxLevel
- */
+ /** @var array|null Error details of the last statement-only rollback */
+ private $trxStatusIgnoredCause;
+ /** @var float|null UNIX timestamp at the time of BEGIN for the last transaction */
private $trxTimestamp = null;
- /** @var float Lag estimate at the time of BEGIN */
+ /** @var float Replication lag estimate at the time of BEGIN for the last transaction */
private $trxReplicaLag = null;
- /**
- * Remembers the function name given for starting the most recent transaction via begin().
- * Used to provide additional context for error reporting.
- *
- * @var string
- * @see Database::trxLevel
- */
+ /** @var string Name of the function that start the last transaction */
private $trxFname = null;
- /**
- * Record if possible write queries were done in the last transaction started
- *
- * @var bool
- * @see Database::trxLevel
- */
+ /** @var bool Whether possible write queries were done in the last transaction started */
private $trxDoneWrites = false;
- /**
- * Record if the current transaction was started implicitly due to DBO_TRX being set.
- *
- * @var bool
- * @see Database::trxLevel
- */
+ /** @var bool Whether the current transaction was started implicitly due to DBO_TRX */
private $trxAutomatic = false;
- /**
- * Counter for atomic savepoint identifiers. Reset when a new transaction begins.
- *
- * @var int
- */
+ /** @var int Counter for atomic savepoint identifiers (reset with each transaction) */
private $trxAtomicCounter = 0;
- /**
- * Array of levels of atomicity within transactions
- *
- * @var array List of (name, unique ID, savepoint ID)
- */
+ /** @var array List of (name, unique ID, savepoint ID) for each active atomic section level */
private $trxAtomicLevels = [];
- /**
- * Record if the current transaction was started implicitly by Database::startAtomic
- *
- * @var bool
- */
+ /** @var bool Whether the current transaction was started implicitly by startAtomic() */
private $trxAutomaticAtomic = false;
- /**
- * Track the write query callers of the current transaction
- *
- * @var string[]
- */
+ /** @var string[] Write query callers of the current transaction */
private $trxWriteCallers = [];
- /**
- * @var float Seconds spent in write queries for the current transaction
- */
+ /** @var float Seconds spent in write queries for the current transaction */
private $trxWriteDuration = 0.0;
- /**
- * @var int Number of write queries for the current transaction
- */
+ /** @var int Number of write queries for the current transaction */
private $trxWriteQueryCount = 0;
- /**
- * @var int Number of rows affected by write queries for the current transaction
- */
+ /** @var int Number of rows affected by write queries for the current transaction */
private $trxWriteAffectedRows = 0;
- /**
- * @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries
- */
+ /** @var float Like trxWriteQueryCount but excludes lock-bound, easy to replicate, queries */
private $trxWriteAdjDuration = 0.0;
- /**
- * @var int Number of write queries counted in trxWriteAdjDuration
- */
+ /** @var int Number of write queries counted in trxWriteAdjDuration */
private $trxWriteAdjQueryCount = 0;
- /**
- * @var float RTT time estimate
- */
- private $rttEstimate = 0.0;
-
- /** @var array Map of (name => 1) for locks obtained via lock() */
- private $namedLocksHeld = [];
- /** @var array Map of (table name => 1) for TEMPORARY tables */
- protected $sessionTempTables = [];
-
- /** @var IDatabase|null Lazy handle to the master DB this server replicates from */
- private $lazyMasterHandle;
-
- /** @var float UNIX timestamp */
- protected $lastPing = 0.0;
+ /** @var array[] List of (callable, method name, atomic section id) */
+ private $trxIdleCallbacks = [];
+ /** @var array[] List of (callable, method name, atomic section id) */
+ private $trxPreCommitCallbacks = [];
+ /** @var array[] List of (callable, method name, atomic section id) */
+ private $trxEndCallbacks = [];
+ /** @var array[] List of (callable, method name, atomic section id) */
+ private $trxSectionCancelCallbacks = [];
+ /** @var callable[] Map of (name => callable) */
+ private $trxRecurringCallbacks = [];
+ /** @var bool Whether to suppress triggering of transaction end callbacks */
+ private $trxEndCallbacksSuppressed = false;
/** @var int[] Prior flags member variable values */
private $priorFlags = [];
- /** @var callable|null */
- protected $profiler;
- /** @var TransactionProfiler */
- protected $trxProfiler;
+ /** @var integer|null Rows affected by the last query to query() or its CRUD wrappers */
+ protected $affectedRowCount;
- /** @var int */
- protected $nonNativeInsertSelectBatchSize = 10000;
+ /** @var float UNIX timestamp */
+ private $lastPing = 0.0;
+ /** @var string The last SQL query attempted */
+ private $lastQuery = '';
+ /** @var float|bool UNIX timestamp of last write query */
+ private $lastWriteTime = false;
+ /** @var string|bool */
+ private $lastPhpError = false;
+ /** @var float Query rount trip time estimate */
+ private $lastRoundTripEstimate = 0.0;
- /** @var string Idiom used when a cancelable atomic section started the transaction */
- private static $NOT_APPLICABLE = 'n/a';
- /** @var string Prefix to the atomic section counter used to make savepoint IDs */
- private static $SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic';
+ /** @var string Lock granularity is on the level of the entire database */
+ const ATTR_DB_LEVEL_LOCKING = 'db-level-locking';
+ /** @var string The SCHEMA keyword refers to a grouping of tables in a database */
+ const ATTR_SCHEMAS_AS_TABLE_GROUPS = 'supports-schemas';
+
+ /** @var int New Database instance will not be connected yet when returned */
+ const NEW_UNCONNECTED = 0;
+ /** @var int New Database instance will already be connected when returned */
+ const NEW_CONNECTED = 1;
/** @var int Transaction is in a error state requiring a full or savepoint rollback */
const STATUS_TRX_ERROR = 1;
/** @var int No transaction is active */
const STATUS_TRX_NONE = 3;
+ /** @var string Idiom used when a cancelable atomic section started the transaction */
+ private static $NOT_APPLICABLE = 'n/a';
+ /** @var string Prefix to the atomic section counter used to make savepoint IDs */
+ private static $SAVEPOINT_PREFIX = 'wikimedia_rdbms_atomic';
+
/** @var int Writes to this temporary table do not affect lastDoneWrites() */
- const TEMP_NORMAL = 1;
+ private static $TEMP_NORMAL = 1;
/** @var int Writes to this temporary table effect lastDoneWrites() */
- const TEMP_PSEUDO_PERMANENT = 2;
+ private static $TEMP_PSEUDO_PERMANENT = 2;
+
+ /** Number of times to re-try an operation in case of deadlock */
+ private static $DEADLOCK_TRIES = 4;
+ /** Minimum time to wait before retry, in microseconds */
+ private static $DEADLOCK_DELAY_MIN = 500000;
+ /** Maximum time to wait before retry */
+ private static $DEADLOCK_DELAY_MAX = 1500000;
+
+ /** How long before it is worth doing a dummy query to test the connection */
+ private static $PING_TTL = 1.0;
+ private static $PING_QUERY = 'SELECT 1 AS ping';
+
+ private static $TINY_WRITE_SEC = 0.010;
+ private static $SLOW_WRITE_SEC = 0.500;
+ private static $SMALL_WRITE_ROWS = 100;
/**
* @note exceptions for missing libraries/drivers should be thrown in initConnection()
// Disregard deprecated DBO_IGNORE flag (T189999)
$this->flags &= ~self::DBO_IGNORE;
- $this->sessionVars = $params['variables'];
+ $this->connectionVariables = $params['variables'];
$this->srvCache = $params['srvCache'] ?? new HashBagOStuff();
* @param string $dbName Database name
* @param string|null $schema Database schema name
* @param string $tablePrefix Table prefix
- * @return bool
* @throws DBConnectionError
*/
abstract protected function open( $server, $user, $password, $dbName, $schema, $tablePrefix );
return $res;
}
- public function trxLevel() {
- return $this->trxLevel;
+ final public function trxLevel() {
+ return ( $this->trxShortId != '' ) ? 1 : 0;
}
public function trxTimestamp() {
- return $this->trxLevel ? $this->trxTimestamp : null;
+ return $this->trxLevel() ? $this->trxTimestamp : null;
}
/**
}
public function writesPending() {
- return $this->trxLevel && $this->trxDoneWrites;
+ return $this->trxLevel() && $this->trxDoneWrites;
}
public function writesOrCallbacksPending() {
- return $this->trxLevel && (
+ return $this->trxLevel() && (
$this->trxDoneWrites ||
$this->trxIdleCallbacks ||
$this->trxPreCommitCallbacks ||
- $this->trxEndCallbacks
+ $this->trxEndCallbacks ||
+ $this->trxSectionCancelCallbacks
);
}
public function preCommitCallbacksPending() {
- return $this->trxLevel && $this->trxPreCommitCallbacks;
+ return $this->trxLevel() && $this->trxPreCommitCallbacks;
}
/**
}
public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
return false;
} elseif ( !$this->trxDoneWrites ) {
return 0.0;
$applyTime = max( $this->trxWriteAdjDuration - $rttAdjTotal, 0 );
// For omitted queries, make them count as something at least
$omitted = $this->trxWriteQueryCount - $this->trxWriteAdjQueryCount;
- $applyTime += self::TINY_WRITE_SEC * $omitted;
+ $applyTime += self::$TINY_WRITE_SEC * $omitted;
return $applyTime;
}
public function pendingWriteCallers() {
- return $this->trxLevel ? $this->trxWriteCallers : [];
+ return $this->trxLevel() ? $this->trxWriteCallers : [];
}
public function pendingWriteRowsAffected() {
foreach ( [
$this->trxIdleCallbacks,
$this->trxPreCommitCallbacks,
- $this->trxEndCallbacks
+ $this->trxEndCallbacks,
+ $this->trxSectionCancelCallbacks
] as $callbacks ) {
foreach ( $callbacks as $callback ) {
$fnames[] = $callback[1];
}
public function isOpen() {
- return $this->opened;
+ return (bool)$this->conn;
}
public function setFlag( $flag, $remember = self::REMEMBER_NOTHING ) {
final public function close() {
$exception = null; // error to throw after disconnecting
- $wasOpen = $this->opened;
+ $wasOpen = (bool)$this->conn;
// This should mostly do nothing if the connection is already closed
if ( $this->conn ) {
// Roll back any dangling transaction first
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
if ( $this->trxAtomicLevels ) {
// Cannot let incomplete atomic sections be committed
$levels = $this->flatAtomicSectionList();
}
$this->conn = false;
- $this->opened = false;
// Throw any unexpected errors after having disconnected
if ( $exception instanceof Exception ) {
*
* @throws DBUnexpectedError
*/
- protected function assertHasConnectionHandle() {
+ final protected function assertHasConnectionHandle() {
if ( !$this->isOpen() ) {
throw new DBUnexpectedError( $this, "DB connection was already closed." );
}
/**
* Make sure that this server is not marked as a replica nor read-only as a sanity check
*
- * @throws DBUnexpectedError
+ * @throws DBReadOnlyRoleError
+ * @throws DBReadOnlyError
*/
protected function assertIsWritableMaster() {
if ( $this->getLBInfo( 'replica' ) === true ) {
*/
abstract protected function closeConnection();
- /**
- * @deprecated since 1.32
- * @param string $error Fallback message, if none is given by DB
- * @throws DBConnectionError
- */
- public function reportConnectionError( $error = 'Unknown error' ) {
- call_user_func( $this->deprecationLogger, 'Use of ' . __METHOD__ . ' is deprecated.' );
- throw new DBConnectionError( $this, $this->lastError() ?: $error );
- }
-
/**
* Run a query and return a DBMS-dependent wrapper or boolean
*
+ * This is meant to handle the basic command of actually sending a query to the
+ * server via the driver. No implicit transaction, reconnection, nor retry logic
+ * should happen here. The higher level query() method is designed to handle those
+ * sorts of concerns. This method should not trigger such higher level methods.
+ *
+ * The lastError() and lastErrno() methods should meaningfully reflect what error,
+ * if any, occured during the last call to this method. Methods like executeQuery(),
+ * query(), select(), insert(), update(), delete(), and upsert() implement their calls
+ * to doQuery() such that an immediately subsequent call to lastError()/lastErrno()
+ * meaningfully reflects any error that occured during that public query method call.
+ *
* For SELECT queries, this returns either:
* - a) A driver-specific value/resource, only on success. This can be iterated
* over by calling fetchObject()/fetchRow() until there are no more rows.
- * Alternatively, the result can be passed to resultObject() to obtain a
- * ResultWrapper instance which can then be iterated over via "foreach".
+ * Alternatively, the result can be passed to resultObject() to obtain an
+ * IResultWrapper instance which can then be iterated over via "foreach".
* - b) False, on any query failure
*
* For non-SELECT queries, this returns either:
// for all queries within a request. Use cases:
// - Treating these as writes would trigger ChronologyProtector (see method doc).
// - We use this method to reject writes to replicas, but we need to allow
- // use of transactions on replicas for read snapshots. This fine given
+ // use of transactions on replicas for read snapshots. This is fine given
// that transactions by themselves don't make changes, only actual writes
// within the transaction matter, which we still detect.
return !preg_match(
- '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SAVEPOINT|RELEASE|SET|SHOW|EXPLAIN|\(SELECT)\b/i',
+ '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SAVEPOINT|RELEASE|SET|SHOW|EXPLAIN|USE|\(SELECT)\b/i',
$sql
);
}
protected function isTransactableQuery( $sql ) {
return !in_array(
$this->getQueryVerb( $sql ),
- [ 'BEGIN', 'ROLLBACK', 'COMMIT', 'SET', 'SHOW', 'CREATE', 'ALTER' ],
+ [ 'BEGIN', 'ROLLBACK', 'COMMIT', 'SET', 'SHOW', 'CREATE', 'ALTER', 'USE', 'SHOW' ],
true
);
}
/**
* @param string $sql A SQL query
* @param bool $pseudoPermanent Treat any table from CREATE TEMPORARY as pseudo-permanent
- * @return int|null A self::TEMP_* constant for temp table operations or null otherwise
+ * @return array A n-tuple of:
+ * - int|null: A self::TEMP_* constant for temp table operations or null otherwise
+ * - string|null: The name of the new temporary table $sql creates, or null
+ * - string|null: The name of the temporary table that $sql drops, or null
*/
- protected function registerTempTableWrite( $sql, $pseudoPermanent ) {
+ protected function getTempWrites( $sql, $pseudoPermanent ) {
static $qt = '[`"\']?(\w+)[`"\']?'; // quoted table
if ( preg_match(
$sql,
$matches
) ) {
- $type = $pseudoPermanent ? self::TEMP_PSEUDO_PERMANENT : self::TEMP_NORMAL;
- $this->sessionTempTables[$matches[1]] = $type;
+ $type = $pseudoPermanent ? self::$TEMP_PSEUDO_PERMANENT : self::$TEMP_NORMAL;
- return $type;
+ return [ $type, $matches[1], null ];
} elseif ( preg_match(
'/^DROP\s+(?:TEMPORARY\s+)?TABLE\s+(?:IF\s+EXISTS\s+)?' . $qt . '/i',
$sql,
$matches
) ) {
- $type = $this->sessionTempTables[$matches[1]] ?? null;
- unset( $this->sessionTempTables[$matches[1]] );
-
- return $type;
+ return [ $this->sessionTempTables[$matches[1]] ?? null, null, $matches[1] ];
} elseif ( preg_match(
'/^TRUNCATE\s+(?:TEMPORARY\s+)?TABLE\s+(?:IF\s+EXISTS\s+)?' . $qt . '/i',
$sql,
$matches
) ) {
- return $this->sessionTempTables[$matches[1]] ?? null;
+ return [ $this->sessionTempTables[$matches[1]] ?? null, null, null ];
} elseif ( preg_match(
'/^(?:(?:INSERT|REPLACE)\s+(?:\w+\s+)?INTO|UPDATE|DELETE\s+FROM)\s+' . $qt . '/i',
$sql,
$matches
) ) {
- return $this->sessionTempTables[$matches[1]] ?? null;
+ return [ $this->sessionTempTables[$matches[1]] ?? null, null, null ];
}
- return null;
+ return [ null, null, null ];
}
- public function query( $sql, $fname = __METHOD__, $flags = 0 ) {
- $this->assertTransactionStatus( $sql, $fname );
- $this->assertHasConnectionHandle();
+ /**
+ * @param IResultWrapper|bool $ret
+ * @param int|null $tmpType TEMP_NORMAL or TEMP_PSEUDO_PERMANENT
+ * @param string|null $tmpNew Name of created temp table
+ * @param string|null $tmpDel Name of dropped temp table
+ */
+ protected function registerTempWrites( $ret, $tmpType, $tmpNew, $tmpDel ) {
+ if ( $ret !== false ) {
+ if ( $tmpNew !== null ) {
+ $this->sessionTempTables[$tmpNew] = $tmpType;
+ }
+ if ( $tmpDel !== null ) {
+ unset( $this->sessionTempTables[$tmpDel] );
+ }
+ }
+ }
+ public function query( $sql, $fname = __METHOD__, $flags = 0 ) {
$flags = (int)$flags; // b/c; this field used to be a bool
- $ignoreErrors = $this->hasFlags( $flags, self::QUERY_SILENCE_ERRORS );
+ // Sanity check that the SQL query is appropriate in the current context and is
+ // allowed for an outside caller (e.g. does not break transaction/session tracking).
+ $this->assertQueryIsCurrentlyAllowed( $sql, $fname );
- $priorTransaction = $this->trxLevel;
- $priorWritesPending = $this->writesOrCallbacksPending();
+ // Send the query to the server and fetch any corresponding errors
+ list( $ret, $err, $errno, $unignorable ) = $this->executeQuery( $sql, $fname, $flags );
+ if ( $ret === false ) {
+ $ignoreErrors = $this->hasFlags( $flags, self::QUERY_SILENCE_ERRORS );
+ // Throw an error unless both the ignore flag was set and a rollback is not needed
+ $this->reportQueryError( $err, $errno, $sql, $fname, $ignoreErrors && !$unignorable );
+ }
+
+ return $this->resultObject( $ret );
+ }
+
+ /**
+ * Execute a query, retrying it if there is a recoverable connection loss
+ *
+ * This is similar to query() except:
+ * - It does not prevent all non-ROLLBACK queries if there is a corrupted transaction
+ * - It does not disallow raw queries that are supposed to use dedicated IDatabase methods
+ * - It does not throw exceptions for common error cases
+ *
+ * This is meant for internal use with Database subclasses.
+ *
+ * @param string $sql Original SQL query
+ * @param string $fname Name of the calling function
+ * @param int $flags Bitfield of class QUERY_* constants
+ * @return array An n-tuple of:
+ * - mixed|bool: An object, resource, or true on success; false on failure
+ * - string: The result of calling lastError()
+ * - int: The result of calling lastErrno()
+ * - bool: Whether a rollback is needed to allow future non-rollback queries
+ * @throws DBUnexpectedError
+ */
+ final protected function executeQuery( $sql, $fname, $flags ) {
+ $this->assertHasConnectionHandle();
+
+ $priorTransaction = $this->trxLevel();
if ( $this->isWriteQuery( $sql ) ) {
- # In theory, non-persistent writes are allowed in read-only mode, but due to things
- # like https://bugs.mysql.com/bug.php?id=33669 that might not work anyway...
+ // In theory, non-persistent writes are allowed in read-only mode, but due to things
+ // like https://bugs.mysql.com/bug.php?id=33669 that might not work anyway...
$this->assertIsWritableMaster();
- # Do not treat temporary table writes as "meaningful writes" that need committing.
- # Profile them as reads. Integration tests can override this behavior via $flags.
+ // Do not treat temporary table writes as "meaningful writes" since they are only
+ // visible to one session and are not permanent. Profile them as reads. Integration
+ // tests can override this behavior via $flags.
$pseudoPermanent = $this->hasFlags( $flags, self::QUERY_PSEUDO_PERMANENT );
- $tableType = $this->registerTempTableWrite( $sql, $pseudoPermanent );
- $isEffectiveWrite = ( $tableType !== self::TEMP_NORMAL );
- # DBConnRef uses QUERY_REPLICA_ROLE to enforce the replica role for raw SQL queries
- if ( $isEffectiveWrite && $this->hasFlags( $flags, self::QUERY_REPLICA_ROLE ) ) {
+ list( $tmpType, $tmpNew, $tmpDel ) = $this->getTempWrites( $sql, $pseudoPermanent );
+ $isPermWrite = ( $tmpType !== self::$TEMP_NORMAL );
+ // DBConnRef uses QUERY_REPLICA_ROLE to enforce the replica role for raw SQL queries
+ if ( $isPermWrite && $this->hasFlags( $flags, self::QUERY_REPLICA_ROLE ) ) {
throw new DBReadOnlyRoleError( $this, "Cannot write; target role is DB_REPLICA" );
}
} else {
- $isEffectiveWrite = false;
+ // No permanent writes in this query
+ $isPermWrite = false;
+ // No temporary tables written to either
+ list( $tmpType, $tmpNew, $tmpDel ) = [ null, null, null ];
}
- # Add trace comment to the begin of the sql string, right after the operator.
- # Or, for one-word queries (like "BEGIN" or COMMIT") add it to the end (T44598)
+ // Add trace comment to the begin of the sql string, right after the operator.
+ // Or, for one-word queries (like "BEGIN" or COMMIT") add it to the end (T44598)
$commentedSql = preg_replace( '/\s|$/', " /* $fname {$this->agent} */ ", $sql, 1 );
- # Send the query to the server and fetch any corresponding errors
- $ret = $this->attemptQuery( $sql, $commentedSql, $isEffectiveWrite, $fname );
- $lastError = $this->lastError();
- $lastErrno = $this->lastErrno();
-
- $recoverableSR = false; // recoverable statement rollback?
- $recoverableCL = false; // recoverable connection loss?
+ // Send the query to the server and fetch any corresponding errors.
+ // This also doubles as a "ping" to see if the connection was dropped.
+ list( $ret, $err, $errno, $recoverableSR, $recoverableCL, $reconnected ) =
+ $this->executeQueryAttempt( $sql, $commentedSql, $isPermWrite, $fname, $flags );
- if ( $ret === false && $this->wasConnectionLoss() ) {
- # Check if no meaningful session state was lost
- $recoverableCL = $this->canRecoverFromDisconnect( $sql, $priorWritesPending );
- # Update session state tracking and try to restore the connection
- $reconnected = $this->replaceLostConnection( __METHOD__ );
- # Silently resend the query to the server if it is safe and possible
- if ( $recoverableCL && $reconnected ) {
- $ret = $this->attemptQuery( $sql, $commentedSql, $isEffectiveWrite, $fname );
- $lastError = $this->lastError();
- $lastErrno = $this->lastErrno();
-
- if ( $ret === false && $this->wasConnectionLoss() ) {
- # Query probably causes disconnects; reconnect and do not re-run it
- $this->replaceLostConnection( __METHOD__ );
- } else {
- $recoverableCL = false; // connection does not need recovering
- $recoverableSR = $this->wasKnownStatementRollbackError();
- }
- }
- } else {
- $recoverableSR = $this->wasKnownStatementRollbackError();
+ // Check if the query failed due to a recoverable connection loss
+ $allowRetry = !$this->hasFlags( $flags, self::QUERY_NO_RETRY );
+ if ( $ret === false && $recoverableCL && $reconnected && $allowRetry ) {
+ // Silently resend the query to the server since it is safe and possible
+ list( $ret, $err, $errno, $recoverableSR, $recoverableCL ) =
+ $this->executeQueryAttempt( $sql, $commentedSql, $isPermWrite, $fname, $flags );
}
+ // Register creation and dropping of temporary tables
+ $this->registerTempWrites( $ret, $tmpType, $tmpNew, $tmpDel );
+
+ $corruptedTrx = false;
+
if ( $ret === false ) {
if ( $priorTransaction ) {
if ( $recoverableSR ) {
# We're ignoring an error that caused just the current query to be aborted.
# But log the cause so we can log a deprecation notice if a caller actually
# does ignore it.
- $this->trxStatusIgnoredCause = [ $lastError, $lastErrno, $fname ];
+ $this->trxStatusIgnoredCause = [ $err, $errno, $fname ];
} elseif ( !$recoverableCL ) {
# Either the query was aborted or all queries after BEGIN where aborted.
# In the first case, the only options going forward are (a) ROLLBACK, or
# (b) ROLLBACK TO SAVEPOINT (if one was set). If the later case, the only
# option is ROLLBACK, since the snapshots would have been released.
+ $corruptedTrx = true; // cannot recover
$this->trxStatus = self::STATUS_TRX_ERROR;
$this->trxStatusCause =
- $this->getQueryExceptionAndLog( $lastError, $lastErrno, $sql, $fname );
- $ignoreErrors = false; // cannot recover
+ $this->getQueryExceptionAndLog( $err, $errno, $sql, $fname );
$this->trxStatusIgnoredCause = null;
}
}
-
- $this->reportQueryError( $lastError, $lastErrno, $sql, $fname, $ignoreErrors );
}
- return $this->resultObject( $ret );
+ return [ $ret, $err, $errno, $corruptedTrx ];
}
/**
- * Wrapper for query() that also handles profiling, logging, and affected row count updates
+ * Wrapper for doQuery() that handles DBO_TRX, profiling, logging, affected row count
+ * tracking, and reconnects (without retry) on query failure due to connection loss
*
* @param string $sql Original SQL query
* @param string $commentedSql SQL query with debugging/trace comment
- * @param bool $isEffectiveWrite Whether the query is a (non-temporary table) write
+ * @param bool $isPermWrite Whether the query is a (non-temporary table) write
* @param string $fname Name of the calling function
- * @return bool|IResultWrapper True for a successful write query, ResultWrapper
- * object for a successful read query, or false on failure
+ * @param int $flags Bitfield of class QUERY_* constants
+ * @return array An n-tuple of:
+ * - mixed|bool: An object, resource, or true on success; false on failure
+ * - string: The result of calling lastError()
+ * - int: The result of calling lastErrno()
+ * - bool: Whether a statement rollback error occured
+ * - bool: Whether a disconnect *both* happened *and* was recoverable
+ * - bool: Whether a reconnection attempt was *both* made *and* succeeded
+ * @throws DBUnexpectedError
*/
- private function attemptQuery( $sql, $commentedSql, $isEffectiveWrite, $fname ) {
- $this->beginIfImplied( $sql, $fname );
+ private function executeQueryAttempt( $sql, $commentedSql, $isPermWrite, $fname, $flags ) {
+ $priorWritesPending = $this->writesOrCallbacksPending();
+
+ if ( ( $flags & self::QUERY_IGNORE_DBO_TRX ) == 0 ) {
+ $this->beginIfImplied( $sql, $fname );
+ }
// Keep track of whether the transaction has write queries pending
- if ( $isEffectiveWrite ) {
+ if ( $isPermWrite ) {
$this->lastWriteTime = microtime( true );
- if ( $this->trxLevel && !$this->trxDoneWrites ) {
+ if ( $this->trxLevel() && !$this->trxDoneWrites ) {
$this->trxDoneWrites = true;
$this->trxProfiler->transactionWritingIn(
$this->server, $this->getDomainID(), $this->trxShortId );
$this->affectedRowCount = null;
$this->lastQuery = $sql;
$ret = $this->doQuery( $commentedSql );
+ $lastError = $this->lastError();
+ $lastErrno = $this->lastErrno();
+
$this->affectedRowCount = $this->affectedRows();
unset( $ps ); // profile out (if set)
$queryRuntime = max( microtime( true ) - $startTime, 0.0 );
+ $recoverableSR = false; // recoverable statement rollback?
+ $recoverableCL = false; // recoverable connection loss?
+ $reconnected = false; // reconnection both attempted and succeeded?
+
if ( $ret !== false ) {
$this->lastPing = $startTime;
- if ( $isEffectiveWrite && $this->trxLevel ) {
+ if ( $isPermWrite && $this->trxLevel() ) {
$this->updateTrxWriteQueryTime( $sql, $queryRuntime, $this->affectedRows() );
$this->trxWriteCallers[] = $fname;
}
+ } elseif ( $this->wasConnectionError( $lastErrno ) ) {
+ # Check if no meaningful session state was lost
+ $recoverableCL = $this->canRecoverFromDisconnect( $sql, $priorWritesPending );
+ # Update session state tracking and try to restore the connection
+ $reconnected = $this->replaceLostConnection( __METHOD__ );
+ } else {
+ # Check if only the last query was rolled back
+ $recoverableSR = $this->wasKnownStatementRollbackError();
}
- if ( $sql === self::PING_QUERY ) {
- $this->rttEstimate = $queryRuntime;
+ if ( $sql === self::$PING_QUERY ) {
+ $this->lastRoundTripEstimate = $queryRuntime;
}
$this->trxProfiler->recordQueryCompletion(
$generalizedSql,
$startTime,
- $isEffectiveWrite,
- $isEffectiveWrite ? $this->affectedRows() : $this->numRows( $ret )
+ $isPermWrite,
+ $isPermWrite ? $this->affectedRows() : $this->numRows( $ret )
);
// Avoid the overhead of logging calls unless debug mode is enabled
);
}
- return $ret;
+ return [ $ret, $lastError, $lastErrno, $recoverableSR, $recoverableCL, $reconnected ];
}
/**
*/
private function beginIfImplied( $sql, $fname ) {
if (
- !$this->trxLevel &&
+ !$this->trxLevel() &&
$this->getFlag( self::DBO_TRX ) &&
$this->isTransactableQuery( $sql )
) {
private function updateTrxWriteQueryTime( $sql, $runtime, $affected ) {
// Whether this is indicative of replica DB runtime (except for RBR or ws_repl)
$indicativeOfReplicaRuntime = true;
- if ( $runtime > self::SLOW_WRITE_SEC ) {
+ if ( $runtime > self::$SLOW_WRITE_SEC ) {
$verb = $this->getQueryVerb( $sql );
// insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
if ( $verb === 'INSERT' ) {
- $indicativeOfReplicaRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS;
+ $indicativeOfReplicaRuntime = $this->affectedRows() > self::$SMALL_WRITE_ROWS;
} elseif ( $verb === 'REPLACE' ) {
- $indicativeOfReplicaRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS / 2;
+ $indicativeOfReplicaRuntime = $this->affectedRows() > self::$SMALL_WRITE_ROWS / 2;
}
}
* @param string $fname
* @throws DBTransactionStateError
*/
- private function assertTransactionStatus( $sql, $fname ) {
+ private function assertQueryIsCurrentlyAllowed( $sql, $fname ) {
$verb = $this->getQueryVerb( $sql );
if ( $verb === 'USE' ) {
throw new DBUnexpectedError( $this, "Got USE query; use selectDomain() instead." );
# Dropped connections also mean that named locks are automatically released.
# Only allow error suppression in autocommit mode or when the lost transaction
# didn't matter anyway (aside from DBO_TRX snapshot loss).
- if ( $this->namedLocksHeld ) {
+ if ( $this->sessionNamedLocks ) {
return false; // possible critical section violation
} elseif ( $this->sessionTempTables ) {
return false; // tables might be queried latter
$this->sessionTempTables = [];
// https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock
// https://www.postgresql.org/docs/9.4/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
- $this->namedLocksHeld = [];
+ $this->sessionNamedLocks = [];
// Session loss implies transaction loss
- $this->trxLevel = 0;
+ $oldTrxShortId = $this->consumeTrxShortId();
$this->trxAtomicCounter = 0;
$this->trxIdleCallbacks = []; // T67263; transaction already lost
$this->trxPreCommitCallbacks = []; // T67263; transaction already lost
$this->trxProfiler->transactionWritingOut(
$this->server,
$this->getDomainID(),
- $this->trxShortId,
+ $oldTrxShortId,
$this->pendingWriteQueryDuration( self::ESTIMATE_TOTAL ),
$this->trxWriteAffectedRows
);
}
}
+ /**
+ * Reset the transaction ID and return the old one
+ *
+ * @return string The old transaction ID or the empty string if there wasn't one
+ */
+ private function consumeTrxShortId() {
+ $old = $this->trxShortId;
+ $this->trxShortId = '';
+
+ return $old;
+ }
+
/**
* Checks whether the cause of the error is detected to be a timeout.
*
public function lockForUpdate(
$table, $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
) {
- if ( !$this->trxLevel && !$this->getFlag( self::DBO_TRX ) ) {
+ if ( !$this->trxLevel() && !$this->getFlag( self::DBO_TRX ) ) {
throw new DBUnexpectedError(
$this,
__METHOD__ . ': no transaction is active nor is DBO_TRX set'
$s );
}
- public function buildLike() {
- $params = func_get_args();
-
- if ( count( $params ) > 0 && is_array( $params[0] ) ) {
- $params = $params[0];
+ public function buildLike( $param, ...$params ) {
+ if ( is_array( $param ) ) {
+ $params = $param;
+ } else {
+ $params = func_get_args();
}
$s = '';
public function textFieldSize( $table, $field ) {
$table = $this->tableName( $table );
- $sql = "SHOW COLUMNS FROM $table LIKE \"$field\";";
+ $sql = "SHOW COLUMNS FROM $table LIKE \"$field\"";
$res = $this->query( $sql, __METHOD__ );
$row = $this->fetchObject( $res );
public function deadlockLoop() {
$args = func_get_args();
$function = array_shift( $args );
- $tries = self::DEADLOCK_TRIES;
+ $tries = self::$DEADLOCK_TRIES;
$this->begin( __METHOD__ );
} catch ( DBQueryError $e ) {
if ( $this->wasDeadlock() ) {
// Retry after a randomized delay
- usleep( mt_rand( self::DEADLOCK_DELAY_MIN, self::DEADLOCK_DELAY_MAX ) );
+ usleep( mt_rand( self::$DEADLOCK_DELAY_MIN, self::$DEADLOCK_DELAY_MAX ) );
} else {
// Throw the error back up
throw $e;
}
final public function onTransactionResolution( callable $callback, $fname = __METHOD__ ) {
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
throw new DBUnexpectedError( $this, "No transaction is active." );
}
$this->trxEndCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
}
final public function onTransactionCommitOrIdle( callable $callback, $fname = __METHOD__ ) {
- if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+ if ( !$this->trxLevel() && $this->getTransactionRoundId() ) {
// Start an implicit transaction similar to how query() does
$this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
$this->trxAutomatic = true;
}
$this->trxIdleCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
$this->runOnTransactionIdleCallbacks( self::TRIGGER_IDLE );
}
}
}
final public function onTransactionPreCommitOrIdle( callable $callback, $fname = __METHOD__ ) {
- if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+ if ( !$this->trxLevel() && $this->getTransactionRoundId() ) {
// Start an implicit transaction similar to how query() does
$this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
$this->trxAutomatic = true;
}
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
$this->trxPreCommitCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
} else {
// No transaction is active nor will start implicitly, so make one for this callback
}
}
+ final public function onAtomicSectionCancel( callable $callback, $fname = __METHOD__ ) {
+ if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
+ throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
+ }
+ $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
+ }
+
/**
* @return AtomicSectionIdentifier|null ID of the topmost atomic section level
*/
private function currentAtomicSectionId() {
- if ( $this->trxLevel && $this->trxAtomicLevels ) {
+ if ( $this->trxLevel() && $this->trxAtomicLevels ) {
$levelInfo = end( $this->trxAtomicLevels );
return $levelInfo[1];
}
/**
+ * Hoist callback ownership for callbacks in a section to a parent section.
+ * All callbacks should have an owner that is present in trxAtomicLevels.
* @param AtomicSectionIdentifier $old
* @param AtomicSectionIdentifier $new
*/
$this->trxEndCallbacks[$key][2] = $new;
}
}
+ foreach ( $this->trxSectionCancelCallbacks as $key => $info ) {
+ if ( $info[2] === $old ) {
+ $this->trxSectionCancelCallbacks[$key][2] = $new;
+ }
+ }
}
/**
+ * Update callbacks that were owned by cancelled atomic sections.
+ *
+ * Callbacks for "on commit" should never be run if they're owned by a
+ * section that won't be committed.
+ *
+ * Callbacks for "on resolution" need to reflect that the section was
+ * rolled back, even if the transaction as a whole commits successfully.
+ *
+ * Callbacks for "on section cancel" should already have been consumed,
+ * but errors during the cancellation itself can prevent that while still
+ * destroying the section. Hoist any such callbacks to the new top section,
+ * which we assume will itself have to be cancelled or rolled back to
+ * resolve the error.
+ *
* @param AtomicSectionIdentifier[] $sectionIds ID of an actual savepoint
+ * @param AtomicSectionIdentifier|null $newSectionId New top section ID.
* @throws UnexpectedValueException
*/
- private function modifyCallbacksForCancel( array $sectionIds ) {
+ private function modifyCallbacksForCancel(
+ array $sectionIds, AtomicSectionIdentifier $newSectionId = null
+ ) {
// Cancel the "on commit" callbacks owned by this savepoint
$this->trxIdleCallbacks = array_filter(
$this->trxIdleCallbacks,
if ( in_array( $entry[2], $sectionIds, true ) ) {
$callback = $entry[0];
$this->trxEndCallbacks[$key][0] = function () use ( $callback ) {
+ // @phan-suppress-next-line PhanInfiniteRecursion No recursion at all here, phan is confused
return $callback( self::TRIGGER_ROLLBACK, $this );
};
+ // This "on resolution" callback no longer belongs to a section.
+ $this->trxEndCallbacks[$key][2] = null;
+ }
+ }
+ // Hoist callback ownership for section cancel callbacks to the new top section
+ foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) {
+ if ( in_array( $entry[2], $sectionIds, true ) ) {
+ $this->trxSectionCancelCallbacks[$key][2] = $newSectionId;
}
}
}
* @throws Exception
*/
public function runOnTransactionIdleCallbacks( $trigger ) {
- if ( $this->trxLevel ) { // sanity
+ if ( $this->trxLevel() ) { // sanity
throw new DBUnexpectedError( $this, __METHOD__ . ': a transaction is still open.' );
}
);
$this->trxIdleCallbacks = []; // consumed (and recursion guard)
$this->trxEndCallbacks = []; // consumed (recursion guard)
+
+ // Only run trxSectionCancelCallbacks on rollback, not commit.
+ // But always consume them.
+ if ( $trigger === self::TRIGGER_ROLLBACK ) {
+ $callbacks = array_merge( $callbacks, $this->trxSectionCancelCallbacks );
+ }
+ $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
+
foreach ( $callbacks as $callback ) {
++$count;
list( $phpCallback ) = $callback;
return $count;
}
+ /**
+ * Actually run any "atomic section cancel" callbacks.
+ *
+ * @param int $trigger IDatabase::TRIGGER_* constant
+ * @param AtomicSectionIdentifier[]|null $sectionIds Section IDs to cancel,
+ * null on transaction rollback
+ */
+ private function runOnAtomicSectionCancelCallbacks(
+ $trigger, array $sectionIds = null
+ ) {
+ /** @var Exception|Throwable $e */
+ $e = null; // first exception
+
+ $notCancelled = [];
+ do {
+ $callbacks = $this->trxSectionCancelCallbacks;
+ $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
+ foreach ( $callbacks as $entry ) {
+ if ( $sectionIds === null || in_array( $entry[2], $sectionIds, true ) ) {
+ try {
+ $entry[0]( $trigger, $this );
+ } catch ( Exception $ex ) {
+ ( $this->errorLogger )( $ex );
+ $e = $e ?: $ex;
+ } catch ( Throwable $ex ) {
+ // @todo: Log?
+ $e = $e ?: $ex;
+ }
+ } else {
+ $notCancelled[] = $entry;
+ }
+ }
+ } while ( count( $this->trxSectionCancelCallbacks ) );
+ $this->trxSectionCancelCallbacks = $notCancelled;
+
+ if ( $e !== null ) {
+ throw $e; // re-throw any first Exception/Throwable
+ }
+ }
+
/**
* Actually run any "transaction listener" callbacks.
*
) {
$savepointId = $cancelable === self::ATOMIC_CANCELABLE ? self::$NOT_APPLICABLE : null;
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
$this->begin( $fname, self::TRANSACTION_INTERNAL ); // sets trxAutomatic
// If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result
// in all changes being in one transaction to keep requests transactional.
}
final public function endAtomic( $fname = __METHOD__ ) {
- if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+ if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
}
final public function cancelAtomic(
$fname = __METHOD__, AtomicSectionIdentifier $sectionId = null
) {
- if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+ if ( !$this->trxLevel() || !$this->trxAtomicLevels ) {
throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
}
- $excisedFnames = [];
- if ( $sectionId !== null ) {
- // Find the (last) section with the given $sectionId
- $pos = -1;
- foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
- if ( $asId === $sectionId ) {
- $pos = $i;
+ $excisedIds = [];
+ $newTopSection = $this->currentAtomicSectionId();
+ try {
+ $excisedFnames = [];
+ if ( $sectionId !== null ) {
+ // Find the (last) section with the given $sectionId
+ $pos = -1;
+ foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
+ if ( $asId === $sectionId ) {
+ $pos = $i;
+ }
}
+ if ( $pos < 0 ) {
+ throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" );
+ }
+ // Remove all descendant sections and re-index the array
+ $len = count( $this->trxAtomicLevels );
+ for ( $i = $pos + 1; $i < $len; ++$i ) {
+ $excisedFnames[] = $this->trxAtomicLevels[$i][0];
+ $excisedIds[] = $this->trxAtomicLevels[$i][1];
+ }
+ $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
+ $newTopSection = $this->currentAtomicSectionId();
}
- if ( $pos < 0 ) {
- throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" );
- }
- // Remove all descendant sections and re-index the array
- $excisedIds = [];
- $len = count( $this->trxAtomicLevels );
- for ( $i = $pos + 1; $i < $len; ++$i ) {
- $excisedFnames[] = $this->trxAtomicLevels[$i][0];
- $excisedIds[] = $this->trxAtomicLevels[$i][1];
- }
- $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
- $this->modifyCallbacksForCancel( $excisedIds );
- }
- // Check if the current section matches $fname
- $pos = count( $this->trxAtomicLevels ) - 1;
- list( $savedFname, $savedSectionId, $savepointId ) = $this->trxAtomicLevels[$pos];
+ // Check if the current section matches $fname
+ $pos = count( $this->trxAtomicLevels ) - 1;
+ list( $savedFname, $savedSectionId, $savepointId ) = $this->trxAtomicLevels[$pos];
- if ( $excisedFnames ) {
- $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname) " .
- "and descendants " . implode( ', ', $excisedFnames ) );
- } else {
- $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname)" );
- }
+ if ( $excisedFnames ) {
+ $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname) " .
+ "and descendants " . implode( ', ', $excisedFnames ) );
+ } else {
+ $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname)" );
+ }
- if ( $savedFname !== $fname ) {
- throw new DBUnexpectedError(
- $this,
- "Invalid atomic section ended (got $fname but expected $savedFname)."
- );
- }
+ if ( $savedFname !== $fname ) {
+ throw new DBUnexpectedError(
+ $this,
+ "Invalid atomic section ended (got $fname but expected $savedFname)."
+ );
+ }
- // Remove the last section (no need to re-index the array)
- array_pop( $this->trxAtomicLevels );
- $this->modifyCallbacksForCancel( [ $savedSectionId ] );
+ // Remove the last section (no need to re-index the array)
+ array_pop( $this->trxAtomicLevels );
+ $excisedIds[] = $savedSectionId;
+ $newTopSection = $this->currentAtomicSectionId();
- if ( $savepointId !== null ) {
- // Rollback the transaction to the state just before this atomic section
- if ( $savepointId === self::$NOT_APPLICABLE ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
- } else {
- $this->doRollbackToSavepoint( $savepointId, $fname );
- $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
- $this->trxStatusIgnoredCause = null;
+ if ( $savepointId !== null ) {
+ // Rollback the transaction to the state just before this atomic section
+ if ( $savepointId === self::$NOT_APPLICABLE ) {
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ // Note: rollback() will run trxSectionCancelCallbacks
+ } else {
+ $this->doRollbackToSavepoint( $savepointId, $fname );
+ $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
+ $this->trxStatusIgnoredCause = null;
+
+ // Run trxSectionCancelCallbacks now.
+ $this->runOnAtomicSectionCancelCallbacks( self::TRIGGER_CANCEL, $excisedIds );
+ }
+ } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
+ // Put the transaction into an error state if it's not already in one
+ $this->trxStatus = self::STATUS_TRX_ERROR;
+ $this->trxStatusCause = new DBUnexpectedError(
+ $this,
+ "Uncancelable atomic section canceled (got $fname)."
+ );
}
- } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
- // Put the transaction into an error state if it's not already in one
- $this->trxStatus = self::STATUS_TRX_ERROR;
- $this->trxStatusCause = new DBUnexpectedError(
- $this,
- "Uncancelable atomic section canceled (got $fname)."
- );
+ } finally {
+ // Fix up callbacks owned by the sections that were just cancelled.
+ // All callbacks should have an owner that is present in trxAtomicLevels.
+ $this->modifyCallbacksForCancel( $excisedIds, $newTopSection );
}
$this->affectedRowCount = 0; // for the sake of consistency
}
// Protect against mismatched atomic section, transaction nesting, and snapshot loss
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
if ( $this->trxAtomicLevels ) {
$levels = $this->flatAtomicSectionList();
$msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open.";
$this->assertHasConnectionHandle();
$this->doBegin( $fname );
+ $this->trxShortId = sprintf( '%06x', mt_rand( 0, 0xffffff ) );
$this->trxStatus = self::STATUS_TRX_OK;
$this->trxStatusIgnoredCause = null;
$this->trxAtomicCounter = 0;
$this->trxDoneWrites = false;
$this->trxAutomaticAtomic = false;
$this->trxAtomicLevels = [];
- $this->trxShortId = sprintf( '%06x', mt_rand( 0, 0xffffff ) );
$this->trxWriteDuration = 0.0;
$this->trxWriteQueryCount = 0;
$this->trxWriteAffectedRows = 0;
*
* @see Database::begin()
* @param string $fname
+ * @throws DBError
*/
protected function doBegin( $fname ) {
$this->query( 'BEGIN', $fname );
- $this->trxLevel = 1;
}
final public function commit( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) {
throw new DBUnexpectedError( $this, "$fname: invalid flush parameter '$flush'." );
}
- if ( $this->trxLevel && $this->trxAtomicLevels ) {
+ if ( $this->trxLevel() && $this->trxAtomicLevels ) {
// There are still atomic sections open; this cannot be ignored
$levels = $this->flatAtomicSectionList();
throw new DBUnexpectedError(
}
if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
- if ( !$this->trxLevel ) {
+ if ( !$this->trxLevel() ) {
return; // nothing to do
} elseif ( !$this->trxAutomatic ) {
throw new DBUnexpectedError(
"$fname: Flushing an explicit transaction, getting out of sync."
);
}
- } elseif ( !$this->trxLevel ) {
+ } elseif ( !$this->trxLevel() ) {
$this->queryLogger->error(
"$fname: No transaction to commit, something got out of sync." );
return; // nothing to do
$writeTime = $this->pendingWriteQueryDuration( self::ESTIMATE_DB_APPLY );
$this->doCommit( $fname );
+ $oldTrxShortId = $this->consumeTrxShortId();
$this->trxStatus = self::STATUS_TRX_NONE;
if ( $this->trxDoneWrites ) {
$this->trxProfiler->transactionWritingOut(
$this->server,
$this->getDomainID(),
- $this->trxShortId,
+ $oldTrxShortId,
$writeTime,
$this->trxWriteAffectedRows
);
*
* @see Database::commit()
* @param string $fname
+ * @throws DBError
*/
protected function doCommit( $fname ) {
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
$this->query( 'COMMIT', $fname );
- $this->trxLevel = 0;
}
}
- final public function rollback( $fname = __METHOD__, $flush = '' ) {
- $trxActive = $this->trxLevel;
+ final public function rollback( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) {
+ $trxActive = $this->trxLevel();
if ( $flush !== self::FLUSHING_INTERNAL
&& $flush !== self::FLUSHING_ALL_PEERS
$this->assertHasConnectionHandle();
$this->doRollback( $fname );
+ $oldTrxShortId = $this->consumeTrxShortId();
$this->trxStatus = self::STATUS_TRX_NONE;
$this->trxAtomicLevels = [];
// Estimate the RTT via a query now that trxStatus is OK
$this->trxProfiler->transactionWritingOut(
$this->server,
$this->getDomainID(),
- $this->trxShortId,
+ $oldTrxShortId,
$writeTime,
$this->trxWriteAffectedRows
);
*
* @see Database::rollback()
* @param string $fname
+ * @throws DBError
*/
protected function doRollback( $fname ) {
- if ( $this->trxLevel ) {
+ if ( $this->trxLevel() ) {
# Disconnects cause rollback anyway, so ignore those errors
$ignoreErrors = true;
$this->query( 'ROLLBACK', $fname, $ignoreErrors );
- $this->trxLevel = 0;
}
}
}
public function explicitTrxActive() {
- return $this->trxLevel && ( $this->trxAtomicLevels || !$this->trxAutomatic );
+ return $this->trxLevel() && ( $this->trxAtomicLevels || !$this->trxAutomatic );
}
public function duplicateTableStructure(
abstract protected function fetchAffectedRowCount();
/**
- * Take the result from a query, and wrap it in a ResultWrapper if
- * necessary. Boolean values are passed through as is, to indicate success
- * of write queries or failure.
+ * Take a query result and wrap it in an iterable result wrapper if necessary.
+ * Booleans are passed through as-is to indicate success/failure of write queries.
*
* Once upon a time, Database::query() returned a bare MySQL result
* resource, and it was necessary to call this function to convert it to
*/
protected function resultObject( $result ) {
if ( !$result ) {
- return false;
- } elseif ( $result instanceof ResultWrapper ) {
+ return false; // failed query
+ } elseif ( $result instanceof IResultWrapper ) {
return $result;
} elseif ( $result === true ) {
- // Successful write query
- return $result;
+ return $result; // succesful write query
} else {
return new ResultWrapper( $this, $result );
}
public function ping( &$rtt = null ) {
// Avoid hitting the server if it was hit recently
- if ( $this->isOpen() && ( microtime( true ) - $this->lastPing ) < self::PING_TTL ) {
- if ( !func_num_args() || $this->rttEstimate > 0 ) {
- $rtt = $this->rttEstimate;
+ if ( $this->isOpen() && ( microtime( true ) - $this->lastPing ) < self::$PING_TTL ) {
+ if ( !func_num_args() || $this->lastRoundTripEstimate > 0 ) {
+ $rtt = $this->lastRoundTripEstimate;
return true; // don't care about $rtt
}
}
// This will reconnect if possible or return false if not
$this->clearFlag( self::DBO_TRX, self::REMEMBER_PRIOR );
- $ok = ( $this->query( self::PING_QUERY, __METHOD__, true ) !== false );
+ $ok = ( $this->query( self::$PING_QUERY, __METHOD__, true ) !== false );
$this->restoreFlags( self::RESTORE_PRIOR );
if ( $ok ) {
- $rtt = $this->rttEstimate;
+ $rtt = $this->lastRoundTripEstimate;
}
return $ok;
*/
protected function replaceLostConnection( $fname ) {
$this->closeConnection();
- $this->opened = false;
$this->conn = false;
$this->handleSessionLossPreconnect();
* @since 1.27
*/
final protected function getRecordedTransactionLagStatus() {
- return ( $this->trxLevel && $this->trxReplicaLag !== null )
+ return ( $this->trxLevel() && $this->trxReplicaLag !== null )
? [ 'lag' => $this->trxReplicaLag, 'since' => $this->trxTimestamp() ]
: null;
}
}
public function getLag() {
+ if ( $this->getLBInfo( 'master' ) ) {
+ return 0; // this is the master
+ } elseif ( $this->getLBInfo( 'is static' ) ) {
+ return 0; // static dataset
+ }
+
+ return $this->doGetLag();
+ }
+
+ protected function doGetLag() {
return 0;
}
// RDBMs methods for checking named locks may or may not count this thread itself.
// In MySQL, IS_FREE_LOCK() returns 0 if the thread already has the lock. This is
// the behavior choosen by the interface for this method.
- return !isset( $this->namedLocksHeld[$lockName] );
+ return !isset( $this->sessionNamedLocks[$lockName] );
}
public function lock( $lockName, $method, $timeout = 5 ) {
- $this->namedLocksHeld[$lockName] = 1;
+ $this->sessionNamedLocks[$lockName] = 1;
return true;
}
public function unlock( $lockName, $method ) {
- unset( $this->namedLocksHeld[$lockName] );
+ unset( $this->sessionNamedLocks[$lockName] );
return true;
}
return $this->conn;
}
- /**
- * @since 1.19
- * @return string
- */
public function __toString() {
- return (string)$this->conn;
+ // spl_object_id is PHP >= 7.2
+ $id = function_exists( 'spl_object_id' )
+ ? spl_object_id( $this )
+ : spl_object_hash( $this );
+
+ $description = $this->getType() . ' object #' . $id;
+ if ( is_resource( $this->conn ) ) {
+ $description .= ' (' . (string)$this->conn . ')'; // "resource id #<ID>"
+ } elseif ( is_object( $this->conn ) ) {
+ // spl_object_id is PHP >= 7.2
+ $handleId = function_exists( 'spl_object_id' )
+ ? spl_object_id( $this->conn )
+ : spl_object_hash( $this->conn );
+ $description .= " (handle id #$handleId)";
+ }
+
+ return $description;
}
/**
if ( $this->isOpen() ) {
// Open a new connection resource without messing with the old one
- $this->opened = false;
$this->conn = false;
$this->trxEndCallbacks = []; // don't copy
+ $this->trxSectionCancelCallbacks = []; // don't copy
$this->handleSessionLossPreconnect(); // no trx or locks anymore
$this->open(
$this->server,
* Run a few simple sanity checks and close dangling connections
*/
public function __destruct() {
- if ( $this->trxLevel && $this->trxDoneWrites ) {
+ if ( $this->trxLevel() && $this->trxDoneWrites ) {
trigger_error( "Uncommitted DB writes (transaction from {$this->trxFname})." );
}
$this->closeConnection();
Wikimedia\restoreWarnings();
$this->conn = false;
- $this->opened = false;
}
}
}