use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
use Wikimedia\ScopedCallback;
use Wikimedia\Timestamp\ConvertibleTimestamp;
use Wikimedia;
use BagOStuff;
use HashBagOStuff;
+use LogicException;
use InvalidArgumentException;
+use UnexpectedValueException;
use Exception;
use RuntimeException;
const SLOW_WRITE_SEC = 0.500;
const SMALL_WRITE_ROWS = 100;
+ /** @var string Whether lock granularity is on the level of the entire database */
+ const ATTR_DB_LEVEL_LOCKING = 'db-level-locking';
+
+ /** @var int New Database instance will not be connected yet when returned */
+ const NEW_UNCONNECTED = 0;
+ /** @var int New Database instance will already be connected when returned */
+ const NEW_CONNECTED = 1;
+
/** @var string SQL query */
protected $lastQuery = '';
/** @var float|bool UNIX timestamp of last write query */
protected $lastWriteTime = false;
/** @var string|bool */
protected $phpError = false;
- /** @var string */
+ /** @var string Server that this instance is currently connected to */
protected $server;
- /** @var string */
+ /** @var string User that this instance is currently connected under the name of */
protected $user;
- /** @var string */
+ /** @var string Password used to establish the current connection */
protected $password;
- /** @var string */
+ /** @var string Database that this instance is currently connected to */
protected $dbName;
- /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
+ /** @var array[] Map of (table => (dbname, schema, prefix) map) */
protected $tableAliases = [];
+ /** @var string[] Map of (index alias => index) */
+ protected $indexAliases = [];
/** @var bool Whether this PHP instance is for a CLI script */
protected $cliMode;
/** @var string Agent name for query profiling */
protected $agent;
-
+ /** @var array Parameters used by initConnection() to establish a connection */
+ protected $connectionParams = [];
/** @var BagOStuff APC cache */
protected $srvCache;
/** @var LoggerInterface */
/** @var integer|null Rows affected by the last query to query() or its CRUD wrappers */
protected $affectedRowCount;
+ /**
+ * @var int Transaction status
+ */
+ protected $trxStatus = self::STATUS_TRX_NONE;
+ /**
+ * @var Exception|null The last error that caused the status to become STATUS_TRX_ERROR
+ */
+ protected $trxStatusCause;
/**
* Either 1 if a transaction is active or 0 otherwise.
* The other Trx fields may not be meaningfull if this is 0.
* @see Database::trxLevel
*/
private $trxAutomatic = false;
+ /**
+ * Counter for atomic savepoint identifiers. Reset when a new transaction begins.
+ *
+ * @var int
+ */
+ private $trxAtomicCounter = 0;
/**
* Array of levels of atomicity within transactions
*
/** @var TransactionProfiler */
protected $trxProfiler;
+ /** @var int */
+ protected $nonNativeInsertSelectBatchSize = 10000;
+
+ /** @var int Transaction is in a error state requiring a full or savepoint rollback */
+ const STATUS_TRX_ERROR = 1;
+ /** @var int Transaction is active and in a normal state */
+ const STATUS_TRX_OK = 2;
+ /** @var int No transaction is active */
+ const STATUS_TRX_NONE = 3;
+
/**
- * Constructor and database handle and attempt to connect to the DB server
- *
- * IDatabase classes should not be constructed directly in external
- * code. Database::factory() should be used instead.
- *
+ * @note: exceptions for missing libraries/drivers should be thrown in initConnection()
* @param array $params Parameters passed from Database::factory()
*/
- function __construct( array $params ) {
- $server = $params['host'];
- $user = $params['user'];
- $password = $params['password'];
- $dbName = $params['dbname'];
+ protected function __construct( array $params ) {
+ foreach ( [ 'host', 'user', 'password', 'dbname' ] as $name ) {
+ $this->connectionParams[$name] = $params[$name];
+ }
$this->schema = $params['schema'];
$this->tablePrefix = $params['tablePrefix'];
$this->flags |= self::DBO_TRX;
}
}
+ // Disregard deprecated DBO_IGNORE flag (T189999)
+ $this->flags &= ~self::DBO_IGNORE;
$this->sessionVars = $params['variables'];
$this->queryLogger = $params['queryLogger'];
$this->errorLogger = $params['errorLogger'];
+ if ( isset( $params['nonNativeInsertSelectBatchSize'] ) ) {
+ $this->nonNativeInsertSelectBatchSize = $params['nonNativeInsertSelectBatchSize'];
+ }
+
// Set initial dummy domain until open() sets the final DB/prefix
$this->currentDomain = DatabaseDomain::newUnspecified();
+ }
- if ( $user ) {
- $this->open( $server, $user, $password, $dbName );
- } elseif ( $this->requiresDatabaseUser() ) {
- throw new InvalidArgumentException( "No database user provided." );
+ /**
+ * Initialize the connection to the database over the wire (or to local files)
+ *
+ * @throws LogicException
+ * @throws InvalidArgumentException
+ * @throws DBConnectionError
+ * @since 1.31
+ */
+ final public function initConnection() {
+ if ( $this->isOpen() ) {
+ throw new LogicException( __METHOD__ . ': already connected.' );
}
-
+ // Establish the connection
+ $this->doInitConnection();
// Set the domain object after open() sets the relevant fields
if ( $this->dbName != '' ) {
// Domains with server scope but a table prefix are not used by IDatabase classes
}
}
+ /**
+ * Actually connect to the database over the wire (or to local files)
+ *
+ * @throws InvalidArgumentException
+ * @throws DBConnectionError
+ * @since 1.31
+ */
+ protected function doInitConnection() {
+ if ( strlen( $this->connectionParams['user'] ) ) {
+ $this->open(
+ $this->connectionParams['host'],
+ $this->connectionParams['user'],
+ $this->connectionParams['password'],
+ $this->connectionParams['dbname']
+ );
+ } else {
+ throw new InvalidArgumentException( "No database user provided." );
+ }
+ }
+
/**
* Construct a Database subclass instance given a database type and parameters
*
* This also connects to the database immediately upon object construction
*
- * @param string $dbType A possible DB type (sqlite, mysql, postgres)
+ * @param string $dbType A possible DB type (sqlite, mysql, postgres,...)
* @param array $p Parameter map with keys:
* - host : The hostname of the DB server
* - user : The name of the database user the client operates under
* - cliMode: Whether to consider the execution context that of a CLI script.
* - agent: Optional name used to identify the end-user in query profiling/logging.
* - srvCache: Optional BagOStuff instance to an APC-style cache.
+ * - nonNativeInsertSelectBatchSize: Optional batch size for non-native INSERT SELECT emulation.
+ * @param int $connect One of the class constants (NEW_CONNECTED, NEW_UNCONNECTED) [optional]
* @return Database|null If the database driver or extension cannot be found
* @throws InvalidArgumentException If the database driver or extension cannot be found
* @since 1.18
*/
- final public static function factory( $dbType, $p = [] ) {
+ final public static function factory( $dbType, $p = [], $connect = self::NEW_CONNECTED ) {
+ $class = self::getClass( $dbType, isset( $p['driver'] ) ? $p['driver'] : null );
+
+ if ( class_exists( $class ) && is_subclass_of( $class, IDatabase::class ) ) {
+ // Resolve some defaults for b/c
+ $p['host'] = isset( $p['host'] ) ? $p['host'] : false;
+ $p['user'] = isset( $p['user'] ) ? $p['user'] : false;
+ $p['password'] = isset( $p['password'] ) ? $p['password'] : false;
+ $p['dbname'] = isset( $p['dbname'] ) ? $p['dbname'] : false;
+ $p['flags'] = isset( $p['flags'] ) ? $p['flags'] : 0;
+ $p['variables'] = isset( $p['variables'] ) ? $p['variables'] : [];
+ $p['tablePrefix'] = isset( $p['tablePrefix'] ) ? $p['tablePrefix'] : '';
+ $p['schema'] = isset( $p['schema'] ) ? $p['schema'] : '';
+ $p['cliMode'] = isset( $p['cliMode'] )
+ ? $p['cliMode']
+ : ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
+ $p['agent'] = isset( $p['agent'] ) ? $p['agent'] : '';
+ if ( !isset( $p['connLogger'] ) ) {
+ $p['connLogger'] = new NullLogger();
+ }
+ if ( !isset( $p['queryLogger'] ) ) {
+ $p['queryLogger'] = new NullLogger();
+ }
+ $p['profiler'] = isset( $p['profiler'] ) ? $p['profiler'] : null;
+ if ( !isset( $p['trxProfiler'] ) ) {
+ $p['trxProfiler'] = new TransactionProfiler();
+ }
+ if ( !isset( $p['errorLogger'] ) ) {
+ $p['errorLogger'] = function ( Exception $e ) {
+ trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
+ };
+ }
+
+ /** @var Database $conn */
+ $conn = new $class( $p );
+ if ( $connect == self::NEW_CONNECTED ) {
+ $conn->initConnection();
+ }
+ } else {
+ $conn = null;
+ }
+
+ return $conn;
+ }
+
+ /**
+ * @param string $dbType A possible DB type (sqlite, mysql, postgres,...)
+ * @param string|null $driver Optional name of a specific DB client driver
+ * @return array Map of (Database::ATTRIBUTE_* constant => value) for all such constants
+ * @throws InvalidArgumentException
+ * @since 1.31
+ */
+ final public static function attributesFromType( $dbType, $driver = null ) {
+ static $defaults = [ self::ATTR_DB_LEVEL_LOCKING => false ];
+
+ $class = self::getClass( $dbType, $driver );
+
+ return call_user_func( [ $class, 'getAttributes' ] ) + $defaults;
+ }
+
+ /**
+ * @param string $dbType A possible DB type (sqlite, mysql, postgres,...)
+ * @param string|null $driver Optional name of a specific DB client driver
+ * @return string Database subclass name to use
+ * @throws InvalidArgumentException
+ */
+ private static function getClass( $dbType, $driver = null ) {
// For database types with built-in support, the below maps type to IDatabase
// implementations. For types with multipe driver implementations (PHP extensions),
// an array can be used, keyed by extension name. In case of an array, the
$dbType = strtolower( $dbType );
$class = false;
+
if ( isset( $builtinTypes[$dbType] ) ) {
$possibleDrivers = $builtinTypes[$dbType];
if ( is_string( $possibleDrivers ) ) {
$class = $possibleDrivers;
} else {
- if ( !empty( $p['driver'] ) ) {
- if ( !isset( $possibleDrivers[$p['driver']] ) ) {
+ if ( (string)$driver !== '' ) {
+ if ( !isset( $possibleDrivers[$driver] ) ) {
throw new InvalidArgumentException( __METHOD__ .
- " type '$dbType' does not support driver '{$p['driver']}'" );
+ " type '$dbType' does not support driver '{$driver}'" );
} else {
- $class = $possibleDrivers[$p['driver']];
+ $class = $possibleDrivers[$driver];
}
} else {
foreach ( $possibleDrivers as $posDriver => $possibleClass ) {
" no viable database extension found for type '$dbType'" );
}
- if ( class_exists( $class ) && is_subclass_of( $class, IDatabase::class ) ) {
- // Resolve some defaults for b/c
- $p['host'] = isset( $p['host'] ) ? $p['host'] : false;
- $p['user'] = isset( $p['user'] ) ? $p['user'] : false;
- $p['password'] = isset( $p['password'] ) ? $p['password'] : false;
- $p['dbname'] = isset( $p['dbname'] ) ? $p['dbname'] : false;
- $p['flags'] = isset( $p['flags'] ) ? $p['flags'] : 0;
- $p['variables'] = isset( $p['variables'] ) ? $p['variables'] : [];
- $p['tablePrefix'] = isset( $p['tablePrefix'] ) ? $p['tablePrefix'] : '';
- $p['schema'] = isset( $p['schema'] ) ? $p['schema'] : '';
- $p['cliMode'] = isset( $p['cliMode'] )
- ? $p['cliMode']
- : ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
- $p['agent'] = isset( $p['agent'] ) ? $p['agent'] : '';
- if ( !isset( $p['connLogger'] ) ) {
- $p['connLogger'] = new \Psr\Log\NullLogger();
- }
- if ( !isset( $p['queryLogger'] ) ) {
- $p['queryLogger'] = new \Psr\Log\NullLogger();
- }
- $p['profiler'] = isset( $p['profiler'] ) ? $p['profiler'] : null;
- if ( !isset( $p['trxProfiler'] ) ) {
- $p['trxProfiler'] = new TransactionProfiler();
- }
- if ( !isset( $p['errorLogger'] ) ) {
- $p['errorLogger'] = function ( Exception $e ) {
- trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
- };
- }
-
- $conn = new $class( $p );
- } else {
- $conn = null;
- }
+ return $class;
+ }
- return $conn;
+ /**
+ * @return array Map of (Database::ATTRIBUTE_* constant => value
+ * @since 1.31
+ */
+ protected static function getAttributes() {
+ return [];
}
/**
return $res;
}
- /**
- * Turns on (false) or off (true) the automatic generation and sending
- * of a "we're sorry, but there has been a database error" page on
- * database errors. Default is on (false). When turned off, the
- * code should use lastErrno() and lastError() to handle the
- * situation as appropriate.
- *
- * Do not use this function outside of the Database classes.
- *
- * @param null|bool $ignoreErrors
- * @return bool The previous value of the flag.
- */
- protected function ignoreErrors( $ignoreErrors = null ) {
- $res = $this->getFlag( self::DBO_IGNORE );
- if ( $ignoreErrors !== null ) {
- // setFlag()/clearFlag() do not allow DBO_IGNORE changes for sanity
- if ( $ignoreErrors ) {
- $this->flags |= self::DBO_IGNORE;
- } else {
- $this->flags &= ~self::DBO_IGNORE;
- }
- }
-
- return $res;
- }
-
public function trxLevel() {
return $this->trxLevel;
}
return $this->trxLevel ? $this->trxTimestamp : null;
}
+ /**
+ * @return int One of the STATUS_TRX_* class constants
+ * @since 1.31
+ */
+ public function trxStatus() {
+ return $this->trxStatus;
+ }
+
public function tablePrefix( $prefix = null ) {
$old = $this->tablePrefix;
if ( $prefix !== null ) {
public function writesOrCallbacksPending() {
return $this->trxLevel && (
- $this->trxDoneWrites || $this->trxIdleCallbacks || $this->trxPreCommitCallbacks
+ $this->trxDoneWrites ||
+ $this->trxIdleCallbacks ||
+ $this->trxPreCommitCallbacks ||
+ $this->trxEndCallbacks
);
}
+ /**
+ * @return string|null
+ */
+ final protected function getTransactionRoundId() {
+ // If transaction round participation is enabled, see if one is active
+ if ( $this->getFlag( self::DBO_TRX ) ) {
+ $id = $this->getLBInfo( 'trxRoundId' );
+
+ return is_string( $id ) ? $id : null;
+ }
+
+ return null;
+ }
+
public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
if ( !$this->trxLevel ) {
return false;
return $fnames;
}
+ /**
+ * @return string
+ */
+ private function flatAtomicSectionList() {
+ return array_reduce( $this->trxAtomicLevels, function ( $accum, $v ) {
+ return $accum === null ? $v[0] : "$accum, " . $v[0];
+ } );
+ }
+
public function isOpen() {
return $this->opened;
}
public function setFlag( $flag, $remember = self::REMEMBER_NOTHING ) {
if ( ( $flag & self::DBO_IGNORE ) ) {
- throw new \UnexpectedValueException( "Modifying DBO_IGNORE is not allowed." );
+ throw new UnexpectedValueException( "Modifying DBO_IGNORE is not allowed." );
}
if ( $remember === self::REMEMBER_PRIOR ) {
public function clearFlag( $flag, $remember = self::REMEMBER_NOTHING ) {
if ( ( $flag & self::DBO_IGNORE ) ) {
- throw new \UnexpectedValueException( "Modifying DBO_IGNORE is not allowed." );
+ throw new UnexpectedValueException( "Modifying DBO_IGNORE is not allowed." );
}
if ( $remember === self::REMEMBER_PRIOR ) {
);
}
- public function close() {
+ final public function close() {
+ $exception = null; // error to throw after disconnecting
+
if ( $this->conn ) {
- if ( $this->trxLevel() ) {
- $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
+ // Resolve any dangling transaction first
+ if ( $this->trxLevel ) {
+ // Meaningful transactions should ideally have been resolved by now
+ if ( $this->writesOrCallbacksPending() ) {
+ $this->queryLogger->warning(
+ __METHOD__ . ": writes or callbacks still pending.",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
+ // Cannot let incomplete atomic sections be committed
+ if ( $this->trxAtomicLevels ) {
+ $levels = $this->flatAtomicSectionList();
+ $exception = new DBUnexpectedError(
+ $this,
+ __METHOD__ . ": atomic sections $levels are still open."
+ );
+ // Check if it is possible to properly commit and trigger callbacks
+ } elseif ( $this->trxEndCallbacksSuppressed ) {
+ $exception = new DBUnexpectedError(
+ $this,
+ __METHOD__ . ': callbacks are suppressed; cannot properly commit.'
+ );
+ }
+ }
+ // Commit or rollback the changes and run any callbacks as needed
+ if ( $this->trxStatus === self::STATUS_TRX_OK && !$exception ) {
+ $this->commit( __METHOD__, self::TRANSACTION_INTERNAL );
+ } else {
+ $this->rollback( __METHOD__, self::TRANSACTION_INTERNAL );
+ }
}
-
+ // Close the actual connection in the binding handle
$closed = $this->closeConnection();
$this->conn = false;
- } elseif (
- $this->trxIdleCallbacks ||
- $this->trxPreCommitCallbacks ||
- $this->trxEndCallbacks
- ) { // sanity
- throw new RuntimeException( "Transaction callbacks still pending." );
} else {
- $closed = true;
+ $closed = true; // already closed; nothing to do
}
+
$this->opened = false;
+ // Throw any unexpected errors after having disconnected
+ if ( $exception instanceof Exception ) {
+ throw $exception;
+ }
+
+ // Sanity check that no callbacks are dangling
+ if (
+ $this->trxIdleCallbacks || $this->trxPreCommitCallbacks || $this->trxEndCallbacks
+ ) {
+ throw new RuntimeException(
+ "Transaction callbacks are still pending:\n" .
+ implode( ', ', $this->pendingWriteAndCallbackCallers() )
+ );
+ }
+
return $closed;
}
*/
abstract protected function closeConnection();
+ /**
+ * @param string $error Fallback error message, used if none is given by DB
+ * @throws DBConnectionError
+ */
public function reportConnectionError( $error = 'Unknown error' ) {
$myError = $this->lastError();
if ( $myError ) {
}
/**
- * The DBMS-dependent part of query()
+ * Run a query and return a DBMS-dependent wrapper (that has all IResultWrapper methods)
+ *
+ * This might return things, such as mysqli_result, that do not formally implement
+ * IResultWrapper, but nonetheless implement all of its methods correctly
*
* @param string $sql SQL query.
- * @return ResultWrapper|bool Result object to feed to fetchObject,
- * fetchRow, ...; or false on failure
+ * @return IResultWrapper|bool Iterator to feed to fetchObject/fetchRow; false on failure
*/
abstract protected function doQuery( $sql );
}
public function query( $sql, $fname = __METHOD__, $tempIgnore = false ) {
+ $this->assertTransactionStatus( $sql, $fname );
+
$priorWritesPending = $this->writesOrCallbacksPending();
$this->lastQuery = $sql;
# Avoid fatals if close() was called
$this->assertOpen();
- # Send the query to the server
+ # Send the query to the server and fetch any corresponding errors
$ret = $this->doProfiledQuery( $sql, $commentedSql, $isNonTempWrite, $fname );
+ $lastError = $this->lastError();
+ $lastErrno = $this->lastErrno();
# Try reconnecting if the connection was lost
- if ( false === $ret && $this->wasErrorReissuable() ) {
+ if ( $ret === false && $this->wasConnectionLoss() ) {
+ # Check if any meaningful session state was lost
$recoverable = $this->canRecoverFromDisconnect( $sql, $priorWritesPending );
- # Stash the last error values before anything might clear them
- $lastError = $this->lastError();
- $lastErrno = $this->lastErrno();
- # Update state tracking to reflect transaction loss due to disconnection
- $this->handleSessionLoss();
- if ( $this->reconnect() ) {
- $msg = __METHOD__ . ': lost connection to {dbserver}; reconnected';
- $params = [ 'dbserver' => $this->getServer() ];
- $this->connLogger->warning( $msg, $params );
- $this->queryLogger->warning( $msg, $params +
- [ 'trace' => ( new RuntimeException() )->getTraceAsString() ] );
-
- if ( !$recoverable ) {
- # Callers may catch the exception and continue to use the DB
- $this->reportQueryError( $lastError, $lastErrno, $sql, $fname );
- } else {
- # Should be safe to silently retry the query
- $ret = $this->doProfiledQuery( $sql, $commentedSql, $isNonTempWrite, $fname );
+ # Update session state tracking and try to restore the connection
+ $reconnected = $this->replaceLostConnection( __METHOD__ );
+ # Silently resend the query to the server if it is safe and possible
+ if ( $reconnected && $recoverable ) {
+ $ret = $this->doProfiledQuery( $sql, $commentedSql, $isNonTempWrite, $fname );
+ $lastError = $this->lastError();
+ $lastErrno = $this->lastErrno();
+
+ if ( $ret === false && $this->wasConnectionLoss() ) {
+ # Query probably causes disconnects; reconnect and do not re-run it
+ $this->replaceLostConnection( __METHOD__ );
}
- } else {
- $msg = __METHOD__ . ': lost connection to {dbserver} permanently';
- $this->connLogger->error( $msg, [ 'dbserver' => $this->getServer() ] );
}
}
- if ( false === $ret ) {
- # Deadlocks cause the entire transaction to abort, not just the statement.
- # https://dev.mysql.com/doc/refman/5.7/en/innodb-error-handling.html
- # https://www.postgresql.org/docs/9.1/static/explicit-locking.html
- if ( $this->wasDeadlock() ) {
+ if ( $ret === false ) {
+ if ( $this->trxLevel && !$this->wasKnownStatementRollbackError() ) {
+ # Either the query was aborted or all queries after BEGIN where aborted.
if ( $this->explicitTrxActive() || $priorWritesPending ) {
- $tempIgnore = false; // not recoverable
+ # In the first case, the only options going forward are (a) ROLLBACK, or
+ # (b) ROLLBACK TO SAVEPOINT (if one was set). If the later case, the only
+ # option is ROLLBACK, since the snapshots would have been released.
+ $this->trxStatus = self::STATUS_TRX_ERROR;
+ $this->trxStatusCause =
+ $this->makeQueryException( $lastError, $lastErrno, $sql, $fname );
+ $tempIgnore = false; // cannot recover
+ } else {
+ # Nothing prior was there to lose from the transaction
+ $this->trxStatus = self::STATUS_TRX_OK;
}
- # Update state tracking to reflect transaction loss
- $this->handleSessionLoss();
}
- $this->reportQueryError(
- $this->lastError(), $this->lastErrno(), $sql, $fname, $tempIgnore );
+ $this->reportQueryError( $lastError, $lastErrno, $sql, $fname, $tempIgnore );
}
- $res = $this->resultObject( $ret );
-
- return $res;
+ return $this->resultObject( $ret );
}
/**
}
}
+ /**
+ * @param string $sql
+ * @param string $fname
+ * @throws DBTransactionStateError
+ */
+ private function assertTransactionStatus( $sql, $fname ) {
+ if (
+ $this->trxStatus < self::STATUS_TRX_OK &&
+ $this->getQueryVerb( $sql ) !== 'ROLLBACK' // transaction/savepoint
+ ) {
+ throw new DBTransactionStateError(
+ $this,
+ "Cannot execute query from $fname while transaction status is ERROR. ",
+ [],
+ $this->trxStatusCause
+ );
+ }
+ }
+
/**
* Determine whether or not it is safe to retry queries after a database
* connection is lost
# didn't matter anyway (aside from DBO_TRX snapshot loss).
if ( $this->namedLocksHeld ) {
return false; // possible critical section violation
+ } elseif ( $this->sessionTempTables ) {
+ return false; // tables might be queried latter
} elseif ( $sql === 'COMMIT' ) {
return !$priorWritesPending; // nothing written anyway? (T127428)
} elseif ( $sql === 'ROLLBACK' ) {
return true; // transaction lost...which is also what was requested :)
} elseif ( $this->explicitTrxActive() ) {
- return false; // don't drop atomocity
+ return false; // don't drop atomocity and explicit snapshots
} elseif ( $priorWritesPending ) {
return false; // prior writes lost from implicit transaction
}
}
/**
- * Clean things up after transaction loss due to disconnection
- *
- * @return null|Exception
+ * Clean things up after session (and thus transaction) loss
*/
private function handleSessionLoss() {
- $this->trxLevel = 0;
- $this->trxIdleCallbacks = []; // T67263
- $this->trxPreCommitCallbacks = []; // T67263
+ // Clean up tracking of session-level things...
+ // https://dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
+ // https://www.postgresql.org/docs/9.2/static/sql-createtable.html (ignoring ON COMMIT)
$this->sessionTempTables = [];
+ // https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_get-lock
+ // https://www.postgresql.org/docs/9.4/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
$this->namedLocksHeld = [];
+ // Session loss implies transaction loss
+ $this->handleTransactionLoss();
+ }
+
+ /**
+ * Clean things up after transaction loss
+ */
+ private function handleTransactionLoss() {
+ $this->trxLevel = 0;
+ $this->trxAtomicCounter = 0;
+ $this->trxIdleCallbacks = []; // T67263; transaction already lost
+ $this->trxPreCommitCallbacks = []; // T67263; transaction already lost
try {
- // Handle callbacks in trxEndCallbacks
+ // Handle callbacks in trxEndCallbacks, e.g. onTransactionResolution().
+ // If callback suppression is set then the array will remain unhandled.
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ } catch ( Exception $ex ) {
+ // Already logged; move on...
+ }
+ try {
+ // Handle callbacks in trxRecurringCallbacks, e.g. setTransactionListener()
$this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
- return null;
- } catch ( Exception $e ) {
+ } catch ( Exception $ex ) {
// Already logged; move on...
- return $e;
}
}
return false;
}
+ /**
+ * Report a query error. Log the error, and if neither the object ignore
+ * flag nor the $tempIgnore flag is set, throw a DBQueryError.
+ *
+ * @param string $error
+ * @param int $errno
+ * @param string $sql
+ * @param string $fname
+ * @param bool $tempIgnore
+ * @throws DBQueryError
+ */
public function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) {
- if ( $this->ignoreErrors() || $tempIgnore ) {
+ if ( $tempIgnore ) {
$this->queryLogger->debug( "SQL ERROR (ignored): $error\n" );
} else {
- $sql1line = mb_substr( str_replace( "\n", "\\n", $sql ), 0, 5 * 1024 );
- $this->queryLogger->error(
- "{fname}\t{db_server}\t{errno}\t{error}\t{sql1line}",
- $this->getLogContext( [
- 'method' => __METHOD__,
- 'errno' => $errno,
- 'error' => $error,
- 'sql1line' => $sql1line,
- 'fname' => $fname,
- ] )
- );
- $this->queryLogger->debug( "SQL ERROR: " . $error . "\n" );
- $wasQueryTimeout = $this->wasQueryTimeout( $error, $errno );
- if ( $wasQueryTimeout ) {
- throw new DBQueryTimeoutError( $this, $error, $errno, $sql, $fname );
- } else {
- throw new DBQueryError( $this, $error, $errno, $sql, $fname );
- }
+ $exception = $this->makeQueryException( $error, $errno, $sql, $fname );
+
+ throw $exception;
}
}
+ /**
+ * @param string $error
+ * @param string|int $errno
+ * @param string $sql
+ * @param string $fname
+ * @return DBError
+ */
+ private function makeQueryException( $error, $errno, $sql, $fname ) {
+ $sql1line = mb_substr( str_replace( "\n", "\\n", $sql ), 0, 5 * 1024 );
+ $this->queryLogger->error(
+ "{fname}\t{db_server}\t{errno}\t{error}\t{sql1line}",
+ $this->getLogContext( [
+ 'method' => __METHOD__,
+ 'errno' => $errno,
+ 'error' => $error,
+ 'sql1line' => $sql1line,
+ 'fname' => $fname,
+ ] )
+ );
+ $this->queryLogger->debug( "SQL ERROR: " . $error . "\n" );
+ $wasQueryTimeout = $this->wasQueryTimeout( $error, $errno );
+ if ( $wasQueryTimeout ) {
+ $e = new DBQueryTimeoutError( $this, $error, $errno, $sql, $fname );
+ } else {
+ $e = new DBQueryError( $this, $error, $errno, $sql, $fname );
+ }
+
+ return $e;
+ }
+
public function freeResult( $res ) {
}
list( $startOpts, $useIndex, $preLimitTail, $postLimitTail, $ignoreIndex ) =
$this->makeSelectOptions( $options );
- if ( !empty( $conds ) ) {
- if ( is_array( $conds ) ) {
- $conds = $this->makeList( $conds, self::LIST_AND );
- }
+ if ( is_array( $conds ) ) {
+ $conds = $this->makeList( $conds, self::LIST_AND );
+ }
+
+ if ( $conds === null || $conds === false ) {
+ $this->queryLogger->warning(
+ __METHOD__
+ . ' called from '
+ . $fname
+ . ' with incorrect parameters: $conds must be a string or an array'
+ );
+ $conds = '';
+ }
+
+ if ( $conds === '' ) {
+ $sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex $preLimitTail";
+ } elseif ( is_string( $conds ) ) {
$sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex " .
"WHERE $conds $preLimitTail";
} else {
- $sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex $preLimitTail";
+ throw new DBUnexpectedError( $this, __METHOD__ . ' called with incorrect parameters' );
}
if ( isset( $options['LIMIT'] ) ) {
}
public function estimateRowCount(
- $table, $vars = '*', $conds = '', $fname = __METHOD__, $options = []
+ $table, $var = '*', $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
) {
- $rows = 0;
- $res = $this->select( $table, [ 'rowcount' => 'COUNT(*)' ], $conds, $fname, $options );
-
- if ( $res ) {
- $row = $this->fetchRow( $res );
- $rows = ( isset( $row['rowcount'] ) ) ? (int)$row['rowcount'] : 0;
+ $conds = $this->normalizeConditions( $conds, $fname );
+ $column = $this->extractSingleFieldFromList( $var );
+ if ( is_string( $column ) && !in_array( $column, [ '*', '1' ] ) ) {
+ $conds[] = "$column IS NOT NULL";
}
- return $rows;
+ $res = $this->select(
+ $table, [ 'rowcount' => 'COUNT(*)' ], $conds, $fname, $options, $join_conds
+ );
+ $row = $res ? $this->fetchRow( $res ) : [];
+
+ return isset( $row['rowcount'] ) ? (int)$row['rowcount'] : 0;
}
public function selectRowCount(
- $tables, $vars = '*', $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
+ $tables, $var = '*', $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
) {
- $rows = 0;
- $sql = $this->selectSQLText( $tables, '1', $conds, $fname, $options, $join_conds );
- // The identifier quotes is primarily for MSSQL.
- $rowCountCol = $this->addIdentifierQuotes( "rowcount" );
- $tableName = $this->addIdentifierQuotes( "tmp_count" );
- $res = $this->query( "SELECT COUNT(*) AS $rowCountCol FROM ($sql) $tableName", $fname );
+ $conds = $this->normalizeConditions( $conds, $fname );
+ $column = $this->extractSingleFieldFromList( $var );
+ if ( is_string( $column ) && !in_array( $column, [ '*', '1' ] ) ) {
+ $conds[] = "$column IS NOT NULL";
+ }
+
+ $res = $this->select(
+ [
+ 'tmp_count' => $this->buildSelectSubquery(
+ $tables,
+ '1',
+ $conds,
+ $fname,
+ $options,
+ $join_conds
+ )
+ ],
+ [ 'rowcount' => 'COUNT(*)' ],
+ [],
+ $fname
+ );
+ $row = $res ? $this->fetchRow( $res ) : [];
+
+ return isset( $row['rowcount'] ) ? (int)$row['rowcount'] : 0;
+ }
- if ( $res ) {
- $row = $this->fetchRow( $res );
- $rows = ( isset( $row['rowcount'] ) ) ? (int)$row['rowcount'] : 0;
+ /**
+ * @param array|string $conds
+ * @param string $fname
+ * @return array
+ */
+ final protected function normalizeConditions( $conds, $fname ) {
+ if ( $conds === null || $conds === false ) {
+ $this->queryLogger->warning(
+ __METHOD__
+ . ' called from '
+ . $fname
+ . ' with incorrect parameters: $conds must be a string or an array'
+ );
+ $conds = '';
+ }
+
+ if ( !is_array( $conds ) ) {
+ $conds = ( $conds === '' ) ? [] : [ $conds ];
+ }
+
+ return $conds;
+ }
+
+ /**
+ * @param array|string $var Field parameter in the style of select()
+ * @return string|null Column name or null; ignores aliases
+ * @throws DBUnexpectedError Errors out if multiple columns are given
+ */
+ final protected function extractSingleFieldFromList( $var ) {
+ if ( is_array( $var ) ) {
+ if ( !$var ) {
+ $column = null;
+ } elseif ( count( $var ) == 1 ) {
+ $column = isset( $var[0] ) ? $var[0] : reset( $var );
+ } else {
+ throw new DBUnexpectedError( $this, __METHOD__ . ': got multiple columns.' );
+ }
+ } else {
+ $column = $var;
}
- return $rows;
+ return $column;
}
/**
return '(' . $this->selectSQLText( $table, $fld, $conds, null, [], $join_conds ) . ')';
}
+ public function buildSubstring( $input, $startPosition, $length = null ) {
+ $this->assertBuildSubstringParams( $startPosition, $length );
+ $functionBody = "$input FROM $startPosition";
+ if ( $length !== null ) {
+ $functionBody .= " FOR $length";
+ }
+ return 'SUBSTRING(' . $functionBody . ')';
+ }
+
+ /**
+ * Check type and bounds for parameters to self::buildSubstring()
+ *
+ * All supported databases have substring functions that behave the same for
+ * positive $startPosition and non-negative $length, but behaviors differ when
+ * given 0 or negative $startPosition or negative $length. The simplest
+ * solution to that is to just forbid those values.
+ *
+ * @param int $startPosition
+ * @param int|null $length
+ * @since 1.31
+ */
+ protected function assertBuildSubstringParams( $startPosition, $length ) {
+ if ( !is_int( $startPosition ) || $startPosition <= 0 ) {
+ throw new InvalidArgumentException(
+ '$startPosition must be a positive integer'
+ );
+ }
+ if ( !( is_int( $length ) && $length >= 0 || $length === null ) ) {
+ throw new InvalidArgumentException(
+ '$length must be null or an integer greater than or equal to 0'
+ );
+ }
+ }
+
public function buildStringCast( $field ) {
return $field;
}
+ public function buildIntegerCast( $field ) {
+ return 'CAST( ' . $field . ' AS INTEGER )';
+ }
+
+ public function buildSelectSubquery(
+ $table, $vars, $conds = '', $fname = __METHOD__,
+ $options = [], $join_conds = []
+ ) {
+ return new Subquery(
+ $this->selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds )
+ );
+ }
+
public function databasesAreIndependent() {
return false;
}
}
public function tableName( $name, $format = 'quoted' ) {
+ if ( $name instanceof Subquery ) {
+ throw new DBUnexpectedError(
+ $this,
+ __METHOD__ . ': got Subquery instance when expecting a string.'
+ );
+ }
+
# Skip the entire process when we have a string quoted on both ends.
# Note that we check the end so that we will still quote any use of
# use of `database`.table. But won't break things if someone wants
# any remote case where a word like on may be inside of a table name
# surrounded by symbols which may be considered word breaks.
if ( preg_match( '/(^|\s)(DISTINCT|JOIN|ON|AS)(\s|$)/i', $name ) !== 0 ) {
+ $this->queryLogger->warning(
+ __METHOD__ . ": use of subqueries is not supported this way.",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
+
return $name;
}
/**
* Get an aliased table name
- * e.g. tableName AS newTableName
*
- * @param string $name Table name, see tableName()
- * @param string|bool $alias Alias (optional)
+ * This returns strings like "tableName AS newTableName" for aliased tables
+ * and "(SELECT * from tableA) newTablename" for subqueries (e.g. derived tables)
+ *
+ * @see Database::tableName()
+ * @param string|Subquery $table Table name or object with a 'sql' field
+ * @param string|bool $alias Table alias (optional)
* @return string SQL name for aliased table. Will not alias a table to its own name
*/
- protected function tableNameWithAlias( $name, $alias = false ) {
- if ( !$alias || $alias == $name ) {
- return $this->tableName( $name );
+ protected function tableNameWithAlias( $table, $alias = false ) {
+ if ( is_string( $table ) ) {
+ $quotedTable = $this->tableName( $table );
+ } elseif ( $table instanceof Subquery ) {
+ $quotedTable = (string)$table;
+ } else {
+ throw new InvalidArgumentException( "Table must be a string or Subquery." );
+ }
+
+ if ( !strlen( $alias ) || $alias === $table ) {
+ if ( $table instanceof Subquery ) {
+ throw new InvalidArgumentException( "Subquery table missing alias." );
+ }
+
+ return $quotedTable;
} else {
- return $this->tableName( $name ) . ' ' . $this->addIdentifierQuotes( $alias );
+ return $quotedTable . ' ' . $this->addIdentifierQuotes( $alias );
}
}
if ( is_array( $table ) ) {
// A parenthesized group
if ( count( $table ) > 1 ) {
- $joinedTable = '('
- . $this->tableNamesWithIndexClauseOrJOIN( $table, $use_index, $ignore_index, $join_conds )
- . ')';
+ $joinedTable = '(' .
+ $this->tableNamesWithIndexClauseOrJOIN(
+ $table, $use_index, $ignore_index, $join_conds ) . ')';
} else {
// Degenerate case
$innerTable = reset( $table );
}
// We can't separate explicit JOIN clauses with ',', use ' ' for those
- $implicitJoins = !empty( $ret ) ? implode( ',', $ret ) : "";
- $explicitJoins = !empty( $retJOIN ) ? implode( ' ', $retJOIN ) : "";
+ $implicitJoins = $ret ? implode( ',', $ret ) : "";
+ $explicitJoins = $retJOIN ? implode( ' ', $retJOIN ) : "";
// Compile our final table clause
return implode( ' ', [ $implicitJoins, $explicitJoins ] );
* @return string
*/
protected function indexName( $index ) {
- return $index;
+ return isset( $this->indexAliases[$index] )
+ ? $this->indexAliases[$index]
+ : $index;
}
public function addQuotes( $s ) {
}
}
- return ' LIKE ' . $this->addQuotes( $s ) . ' ESCAPE ' . $this->addQuotes( $escapeChar ) . ' ';
+ return ' LIKE ' .
+ $this->addQuotes( $s ) . ' ESCAPE ' . $this->addQuotes( $escapeChar ) . ' ';
}
public function anyChar() {
$rows = [ $rows ];
}
- $affectedRowCount = 0;
- foreach ( $rows as $row ) {
- // Delete rows which collide with this one
- $indexWhereClauses = [];
- foreach ( $uniqueIndexes as $index ) {
- $indexColumns = (array)$index;
- $indexRowValues = array_intersect_key( $row, array_flip( $indexColumns ) );
- if ( count( $indexRowValues ) != count( $indexColumns ) ) {
- throw new DBUnexpectedError(
- $this,
- 'New record does not provide all values for unique key (' .
+ try {
+ $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
+ $affectedRowCount = 0;
+ foreach ( $rows as $row ) {
+ // Delete rows which collide with this one
+ $indexWhereClauses = [];
+ foreach ( $uniqueIndexes as $index ) {
+ $indexColumns = (array)$index;
+ $indexRowValues = array_intersect_key( $row, array_flip( $indexColumns ) );
+ if ( count( $indexRowValues ) != count( $indexColumns ) ) {
+ throw new DBUnexpectedError(
+ $this,
+ 'New record does not provide all values for unique key (' .
implode( ', ', $indexColumns ) . ')'
- );
- } elseif ( in_array( null, $indexRowValues, true ) ) {
- throw new DBUnexpectedError(
- $this,
- 'New record has a null value for unique key (' .
+ );
+ } elseif ( in_array( null, $indexRowValues, true ) ) {
+ throw new DBUnexpectedError(
+ $this,
+ 'New record has a null value for unique key (' .
implode( ', ', $indexColumns ) . ')'
- );
+ );
+ }
+ $indexWhereClauses[] = $this->makeList( $indexRowValues, LIST_AND );
+ }
+
+ if ( $indexWhereClauses ) {
+ $this->delete( $table, $this->makeList( $indexWhereClauses, LIST_OR ), $fname );
+ $affectedRowCount += $this->affectedRows();
}
- $indexWhereClauses[] = $this->makeList( $indexRowValues, LIST_AND );
- }
- if ( $indexWhereClauses ) {
- $this->delete( $table, $this->makeList( $indexWhereClauses, LIST_OR ), $fname );
+ // Now insert the row
+ $this->insert( $table, $row, $fname );
$affectedRowCount += $this->affectedRows();
}
-
- // Now insert the row
- $this->insert( $table, $row, $fname );
- $affectedRowCount += $this->affectedRows();
+ $this->endAtomic( $fname );
+ $this->affectedRowCount = $affectedRowCount;
+ } catch ( Exception $e ) {
+ $this->cancelAtomic( $fname );
+ throw $e;
}
-
- $this->affectedRowCount = $affectedRowCount;
}
/**
}
$affectedRowCount = 0;
- $useTrx = !$this->trxLevel;
- if ( $useTrx ) {
- $this->begin( $fname, self::TRANSACTION_INTERNAL );
- }
try {
+ $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
# Update any existing conflicting row(s)
if ( $where !== false ) {
$ok = $this->update( $table, $set, $where, $fname );
# Now insert any non-conflicting row(s)
$ok = $this->insert( $table, $rows, $fname, [ 'IGNORE' ] ) && $ok;
$affectedRowCount += $this->affectedRows();
+ $this->endAtomic( $fname );
+ $this->affectedRowCount = $affectedRowCount;
} catch ( Exception $e ) {
- if ( $useTrx ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
- }
+ $this->cancelAtomic( $fname );
throw $e;
}
- if ( $useTrx ) {
- $this->commit( $fname, self::FLUSHING_INTERNAL );
- }
- $this->affectedRowCount = $affectedRowCount;
return $ok;
}
return $this->query( $sql, $fname );
}
- public function insertSelect(
+ final public function insertSelect(
$destTable, $srcTable, $varMap, $conds,
$fname = __METHOD__, $insertOptions = [], $selectOptions = [], $selectJoinConds = []
) {
- if ( $this->cliMode ) {
+ static $hints = [ 'NO_AUTO_COLUMNS' ];
+
+ $insertOptions = (array)$insertOptions;
+ $selectOptions = (array)$selectOptions;
+
+ if ( $this->cliMode && $this->isInsertSelectSafe( $insertOptions, $selectOptions ) ) {
// For massive migrations with downtime, we don't want to select everything
// into memory and OOM, so do all this native on the server side if possible.
return $this->nativeInsertSelect(
$varMap,
$conds,
$fname,
- $insertOptions,
+ array_diff( $insertOptions, $hints ),
$selectOptions,
$selectJoinConds
);
$varMap,
$conds,
$fname,
- $insertOptions,
+ array_diff( $insertOptions, $hints ),
$selectOptions,
$selectJoinConds
);
}
+ /**
+ * @param array $insertOptions INSERT options
+ * @param array $selectOptions SELECT options
+ * @return bool Whether an INSERT SELECT with these options will be replication safe
+ * @since 1.31
+ */
+ protected function isInsertSelectSafe( array $insertOptions, array $selectOptions ) {
+ return true;
+ }
+
/**
* Implementation of insertSelect() based on select() and insert()
*
return false;
}
- $rows = [];
- foreach ( $res as $row ) {
- $rows[] = (array)$row;
+ try {
+ $affectedRowCount = 0;
+ $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
+ $rows = [];
+ $ok = true;
+ foreach ( $res as $row ) {
+ $rows[] = (array)$row;
+
+ // Avoid inserts that are too huge
+ if ( count( $rows ) >= $this->nonNativeInsertSelectBatchSize ) {
+ $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+ if ( !$ok ) {
+ break;
+ }
+ $affectedRowCount += $this->affectedRows();
+ $rows = [];
+ }
+ }
+ if ( $rows && $ok ) {
+ $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+ if ( $ok ) {
+ $affectedRowCount += $this->affectedRows();
+ }
+ }
+ if ( $ok ) {
+ $this->endAtomic( $fname );
+ $this->affectedRowCount = $affectedRowCount;
+ } else {
+ $this->cancelAtomic( $fname );
+ }
+ return $ok;
+ } catch ( Exception $e ) {
+ $this->cancelAtomic( $fname );
+ throw $e;
}
-
- return $this->insert( $destTable, $rows, $fname, $insertOptions );
}
/**
return false;
}
- public function wasErrorReissuable() {
- return false;
+ public function wasConnectionLoss() {
+ return $this->wasConnectionError( $this->lastErrno() );
}
public function wasReadOnlyError() {
return false;
}
+ public function wasErrorReissuable() {
+ return (
+ $this->wasDeadlock() ||
+ $this->wasLockTimeout() ||
+ $this->wasConnectionLoss()
+ );
+ }
+
/**
* Do not use this method outside of Database/DBError classes
*
return false;
}
+ /**
+ * @return bool Whether it is safe to assume the given error only caused statement rollback
+ * @note This is for backwards compatibility for callers catching DBError exceptions in
+ * order to ignore problems like duplicate key errors or foriegn key violations
+ * @since 1.31
+ */
+ protected function wasKnownStatementRollbackError() {
+ return false; // don't know; it could have caused a transaction rollback
+ }
+
public function deadlockLoop() {
$args = func_get_args();
$function = array_shift( $args );
}
final public function onTransactionIdle( callable $callback, $fname = __METHOD__ ) {
+ if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+ // Start an implicit transaction similar to how query() does
+ $this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
+ $this->trxAutomatic = true;
+ }
+
$this->trxIdleCallbacks[] = [ $callback, $fname ];
if ( !$this->trxLevel ) {
$this->runOnTransactionIdleCallbacks( self::TRIGGER_IDLE );
}
final public function onTransactionPreCommitOrIdle( callable $callback, $fname = __METHOD__ ) {
- if ( $this->trxLevel || $this->getFlag( self::DBO_TRX ) ) {
- // As long as DBO_TRX is set, writes will accumulate until the load balancer issues
- // an implicit commit of all peer databases. This is true even if a transaction has
- // not yet been triggered by writes; make sure $callback runs *after* any such writes.
+ if ( !$this->trxLevel && $this->getTransactionRoundId() ) {
+ // Start an implicit transaction similar to how query() does
+ $this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
+ $this->trxAutomatic = true;
+ }
+
+ if ( $this->trxLevel ) {
$this->trxPreCommitCallbacks[] = [ $callback, $fname ];
} else {
// No transaction is active nor will start implicitly, so make one for this callback
- $this->startAtomic( __METHOD__ );
+ $this->startAtomic( __METHOD__, self::ATOMIC_CANCELABLE );
try {
call_user_func( $callback );
$this->endAtomic( __METHOD__ );
} catch ( Exception $e ) {
- $this->rollback( __METHOD__, self::FLUSHING_INTERNAL );
+ $this->cancelAtomic( __METHOD__ );
throw $e;
}
}
}
}
- final public function startAtomic( $fname = __METHOD__ ) {
+ /**
+ * Create a savepoint
+ *
+ * This is used internally to implement atomic sections. It should not be
+ * used otherwise.
+ *
+ * @since 1.31
+ * @param string $identifier Identifier for the savepoint
+ * @param string $fname Calling function name
+ */
+ protected function doSavepoint( $identifier, $fname ) {
+ $this->query( 'SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+ }
+
+ /**
+ * Release a savepoint
+ *
+ * This is used internally to implement atomic sections. It should not be
+ * used otherwise.
+ *
+ * @since 1.31
+ * @param string $identifier Identifier for the savepoint
+ * @param string $fname Calling function name
+ */
+ protected function doReleaseSavepoint( $identifier, $fname ) {
+ $this->query( 'RELEASE SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+ }
+
+ /**
+ * Rollback to a savepoint
+ *
+ * This is used internally to implement atomic sections. It should not be
+ * used otherwise.
+ *
+ * @since 1.31
+ * @param string $identifier Identifier for the savepoint
+ * @param string $fname Calling function name
+ */
+ protected function doRollbackToSavepoint( $identifier, $fname ) {
+ $this->query( 'ROLLBACK TO SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+ }
+
+ final public function startAtomic(
+ $fname = __METHOD__, $cancelable = self::ATOMIC_NOT_CANCELABLE
+ ) {
+ $savepointId = $cancelable === self::ATOMIC_CANCELABLE ? 'n/a' : null;
if ( !$this->trxLevel ) {
$this->begin( $fname, self::TRANSACTION_INTERNAL );
// If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result
if ( !$this->getFlag( self::DBO_TRX ) ) {
$this->trxAutomaticAtomic = true;
}
+ } elseif ( $cancelable === self::ATOMIC_CANCELABLE ) {
+ $savepointId = 'wikimedia_rdbms_atomic' . ++$this->trxAtomicCounter;
+ if ( strlen( $savepointId ) > 30 ) { // 30 == Oracle's identifier length limit (pre 12c)
+ $this->queryLogger->warning(
+ 'There have been an excessively large number of atomic sections in a transaction'
+ . " started by $this->trxFname, reusing IDs (at $fname)",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
+ $this->trxAtomicCounter = 0;
+ $savepointId = 'wikimedia_rdbms_atomic' . ++$this->trxAtomicCounter;
+ }
+ $this->doSavepoint( $savepointId, $fname );
}
- $this->trxAtomicLevels[] = $fname;
+ $this->trxAtomicLevels[] = [ $fname, $savepointId ];
}
final public function endAtomic( $fname = __METHOD__ ) {
if ( !$this->trxLevel ) {
throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
}
- if ( !$this->trxAtomicLevels ||
- array_pop( $this->trxAtomicLevels ) !== $fname
- ) {
+
+ list( $savedFname, $savepointId ) = $this->trxAtomicLevels
+ ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
+ if ( $savedFname !== $fname ) {
throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
}
if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
$this->commit( $fname, self::FLUSHING_INTERNAL );
+ } elseif ( $savepointId && $savepointId !== 'n/a' ) {
+ $this->doReleaseSavepoint( $savepointId, $fname );
+ }
+ }
+
+ final public function cancelAtomic( $fname = __METHOD__ ) {
+ if ( !$this->trxLevel ) {
+ throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
+ }
+
+ list( $savedFname, $savepointId ) = $this->trxAtomicLevels
+ ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
+ if ( $savedFname !== $fname ) {
+ throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
+ }
+ if ( !$savepointId ) {
+ throw new DBUnexpectedError( $this, "Uncancelable atomic section canceled (got $fname)." );
+ }
+
+ if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ } elseif ( $savepointId !== 'n/a' ) {
+ $this->doRollbackToSavepoint( $savepointId, $fname );
+ $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
}
+
+ $this->affectedRowCount = 0; // for the sake of consistency
}
final public function doAtomicSection( $fname, callable $callback ) {
- $this->startAtomic( $fname );
+ $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
try {
$res = call_user_func_array( $callback, [ $this, $fname ] );
} catch ( Exception $e ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ $this->cancelAtomic( $fname );
throw $e;
}
$this->endAtomic( $fname );
// Protect against mismatched atomic section, transaction nesting, and snapshot loss
if ( $this->trxLevel ) {
if ( $this->trxAtomicLevels ) {
- $levels = implode( ', ', $this->trxAtomicLevels );
+ $levels = $this->flatAtomicSectionList();
$msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open.";
throw new DBUnexpectedError( $this, $msg );
} elseif ( !$this->trxAutomatic ) {
$msg = "$fname: Explicit transaction already active (from {$this->trxFname}).";
throw new DBUnexpectedError( $this, $msg );
} else {
- // @TODO: make this an exception at some point
$msg = "$fname: Implicit transaction already active (from {$this->trxFname}).";
- $this->queryLogger->error( $msg );
- return; // join the main transaction set
+ throw new DBUnexpectedError( $this, $msg );
}
} elseif ( $this->getFlag( self::DBO_TRX ) && $mode !== self::TRANSACTION_INTERNAL ) {
- // @TODO: make this an exception at some point
$msg = "$fname: Implicit transaction expected (DBO_TRX set).";
- $this->queryLogger->error( $msg );
- return; // let any writes be in the main transaction
+ throw new DBUnexpectedError( $this, $msg );
}
// Avoid fatals if close() was called
$this->assertOpen();
$this->doBegin( $fname );
+ $this->trxStatus = self::STATUS_TRX_OK;
+ $this->trxAtomicCounter = 0;
$this->trxTimestamp = microtime( true );
$this->trxFname = $fname;
$this->trxDoneWrites = false;
$this->trxWriteAdjQueryCount = 0;
$this->trxWriteCallers = [];
// First SELECT after BEGIN will establish the snapshot in REPEATABLE-READ.
- // Get an estimate of the replica DB lag before then, treating estimate staleness
- // as lag itself just to be safe
- $status = $this->getApproximateLagStatus();
- $this->trxReplicaLag = $status['lag'] + ( microtime( true ) - $status['since'] );
+ // Get an estimate of the replication lag before any such queries.
+ $this->trxReplicaLag = null; // clear cached value first
+ $this->trxReplicaLag = $this->getApproximateLagStatus()['lag'];
// T147697: make explicitTrxActive() return true until begin() finishes. This way, no
// caller will think its OK to muck around with the transaction just because startAtomic()
// has not yet completed (e.g. setting trxAtomicLevels).
final public function commit( $fname = __METHOD__, $flush = '' ) {
if ( $this->trxLevel && $this->trxAtomicLevels ) {
// There are still atomic sections open. This cannot be ignored
- $levels = implode( ', ', $this->trxAtomicLevels );
+ $levels = $this->flatAtomicSectionList();
throw new DBUnexpectedError(
$this,
"$fname: Got COMMIT while atomic sections $levels are still open."
"$fname: No transaction to commit, something got out of sync." );
return; // nothing to do
} elseif ( $this->trxAutomatic ) {
- // @TODO: make this an exception at some point
- $msg = "$fname: Explicit commit of implicit transaction.";
- $this->queryLogger->error( $msg );
- return; // wait for the main transaction set commit round
+ throw new DBUnexpectedError(
+ $this,
+ "$fname: Expected mass commit of all peer transactions (DBO_TRX set)."
+ );
}
}
$this->runOnTransactionPreCommitCallbacks();
$writeTime = $this->pendingWriteQueryDuration( self::ESTIMATE_DB_APPLY );
$this->doCommit( $fname );
+ $this->trxStatus = self::STATUS_TRX_NONE;
if ( $this->trxDoneWrites ) {
$this->lastWriteTime = microtime( true );
$this->trxProfiler->transactionWritingOut(
}
final public function rollback( $fname = __METHOD__, $flush = '' ) {
- if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
- if ( !$this->trxLevel ) {
- return; // nothing to do
- }
- } else {
- if ( !$this->trxLevel ) {
- $this->queryLogger->error(
- "$fname: No transaction to rollback, something got out of sync." );
- return; // nothing to do
- } elseif ( $this->getFlag( self::DBO_TRX ) ) {
+ $trxActive = $this->trxLevel;
+
+ if ( $flush !== self::FLUSHING_INTERNAL && $flush !== self::FLUSHING_ALL_PEERS ) {
+ if ( $this->getFlag( self::DBO_TRX ) ) {
throw new DBUnexpectedError(
$this,
- "$fname: Expected mass rollback of all peer databases (DBO_TRX set)."
+ "$fname: Expected mass rollback of all peer transactions (DBO_TRX set)."
);
}
}
- // Avoid fatals if close() was called
- $this->assertOpen();
+ if ( $trxActive ) {
+ // Avoid fatals if close() was called
+ $this->assertOpen();
- $this->doRollback( $fname );
- $this->trxAtomicLevels = [];
- if ( $this->trxDoneWrites ) {
- $this->trxProfiler->transactionWritingOut(
- $this->server,
- $this->dbName,
- $this->trxShortId
- );
+ $this->doRollback( $fname );
+ $this->trxStatus = self::STATUS_TRX_NONE;
+ $this->trxAtomicLevels = [];
+ if ( $this->trxDoneWrites ) {
+ $this->trxProfiler->transactionWritingOut(
+ $this->server,
+ $this->dbName,
+ $this->trxShortId
+ );
+ }
}
- $this->trxIdleCallbacks = []; // clear
- $this->trxPreCommitCallbacks = []; // clear
- try {
- $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
- } catch ( Exception $e ) {
- // already logged; finish and let LoadBalancer move on during mass-rollback
- }
- try {
- $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
- } catch ( Exception $e ) {
- // already logged; let LoadBalancer move on during mass-rollback
+ // Clear any commit-dependant callbacks. They might even be present
+ // only due to transaction rounds, with no SQL transaction being active
+ $this->trxIdleCallbacks = [];
+ $this->trxPreCommitCallbacks = [];
+
+ if ( $trxActive ) {
+ try {
+ $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ } catch ( Exception $e ) {
+ // already logged; finish and let LoadBalancer move on during mass-rollback
+ }
+ try {
+ $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
+ } catch ( Exception $e ) {
+ // already logged; let LoadBalancer move on during mass-rollback
+ }
+
+ $this->affectedRowCount = 0; // for the sake of consistency
}
}
}
/**
- * Close existing database connection and open a new connection
+ * Close any existing (dead) database connection and open a new connection
*
+ * @param string $fname
* @return bool True if new connection is opened successfully, false if error
*/
- protected function reconnect() {
+ protected function replaceLostConnection( $fname ) {
$this->closeConnection();
$this->opened = false;
$this->conn = false;
$this->open( $this->server, $this->user, $this->password, $this->dbName );
$this->lastPing = microtime( true );
$ok = true;
+
+ $this->connLogger->warning(
+ $fname . ': lost connection to {dbserver}; reconnected',
+ [
+ 'dbserver' => $this->getServer(),
+ 'trace' => ( new RuntimeException() )->getTraceAsString()
+ ]
+ );
} catch ( DBConnectionError $e ) {
$ok = false;
+
+ $this->connLogger->error(
+ $fname . ': lost connection to {dbserver} permanently',
+ [ 'dbserver' => $this->getServer() ]
+ );
}
+ $this->handleSessionLoss();
+
return $ok;
}
public function getSessionLagStatus() {
- return $this->getTransactionLagStatus() ?: $this->getApproximateLagStatus();
+ return $this->getRecordedTransactionLagStatus() ?: $this->getApproximateLagStatus();
}
/**
* is this lag plus transaction duration. If they don't, it is still
* safe to be pessimistic. This returns null if there is no transaction.
*
+ * This returns null if the lag status for this transaction was not yet recorded.
+ *
* @return array|null ('lag': seconds or false on error, 'since': UNIX timestamp of BEGIN)
* @since 1.27
*/
- protected function getTransactionLagStatus() {
- return $this->trxLevel
+ final protected function getRecordedTransactionLagStatus() {
+ return ( $this->trxLevel && $this->trxReplicaLag !== null )
? [ 'lag' => $this->trxReplicaLag, 'since' => $this->trxTimestamp() ]
: null;
}
}
public function lockIsFree( $lockName, $method ) {
- return true;
+ // RDBMs methods for checking named locks may or may not count this thread itself.
+ // In MySQL, IS_FREE_LOCK() returns 0 if the thread already has the lock. This is
+ // the behavior choosen by the interface for this method.
+ return !isset( $this->namedLocksHeld[$lockName] );
}
public function lock( $lockName, $method, $timeout = 5 ) {
$this->tableAliases = $aliases;
}
- /**
- * @return bool Whether a DB user is required to access the DB
- * @since 1.28
- */
- protected function requiresDatabaseUser() {
- return true;
+ public function setIndexAliases( array $aliases ) {
+ $this->indexAliases = $aliases;
}
/**
* This catches broken callers than catch and ignore disconnection exceptions.
* Unlike checking isOpen(), this is safe to call inside of open().
*
- * @return resource|object
+ * @return mixed
* @throws DBUnexpectedError
* @since 1.26
*/