use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
use Wikimedia\ScopedCallback;
use Wikimedia\Timestamp\ConvertibleTimestamp;
use Wikimedia;
const SLOW_WRITE_SEC = 0.500;
const SMALL_WRITE_ROWS = 100;
+ /** @var string Whether lock granularity is on the level of the entire database */
+ const ATTR_DB_LEVEL_LOCKING = 'db-level-locking';
+
/** @var string SQL query */
protected $lastQuery = '';
/** @var float|bool UNIX timestamp of last write query */
protected $password;
/** @var string */
protected $dbName;
- /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
+ /** @var array[] Map of (table => (dbname, schema, prefix) map) */
protected $tableAliases = [];
+ /** @var string[] Map of (index alias => index) */
+ protected $indexAliases = [];
/** @var bool Whether this PHP instance is for a CLI script */
protected $cliMode;
/** @var string Agent name for query profiling */
/** @var TransactionProfiler */
protected $trxProfiler;
+ /** @var int */
+ protected $nonNativeInsertSelectBatchSize = 10000;
+
/**
* Constructor and database handle and attempt to connect to the DB server
*
$this->queryLogger = $params['queryLogger'];
$this->errorLogger = $params['errorLogger'];
+ if ( isset( $params['nonNativeInsertSelectBatchSize'] ) ) {
+ $this->nonNativeInsertSelectBatchSize = $params['nonNativeInsertSelectBatchSize'];
+ }
+
// Set initial dummy domain until open() sets the final DB/prefix
$this->currentDomain = DatabaseDomain::newUnspecified();
*
* This also connects to the database immediately upon object construction
*
- * @param string $dbType A possible DB type (sqlite, mysql, postgres)
+ * @param string $dbType A possible DB type (sqlite, mysql, postgres,...)
* @param array $p Parameter map with keys:
* - host : The hostname of the DB server
* - user : The name of the database user the client operates under
* - cliMode: Whether to consider the execution context that of a CLI script.
* - agent: Optional name used to identify the end-user in query profiling/logging.
* - srvCache: Optional BagOStuff instance to an APC-style cache.
+ * - nonNativeInsertSelectBatchSize: Optional batch size for non-native INSERT SELECT emulation.
* @return Database|null If the database driver or extension cannot be found
* @throws InvalidArgumentException If the database driver or extension cannot be found
* @since 1.18
*/
final public static function factory( $dbType, $p = [] ) {
+ $class = self::getClass( $dbType, isset( $p['driver'] ) ? $p['driver'] : null );
+
+ if ( class_exists( $class ) && is_subclass_of( $class, IDatabase::class ) ) {
+ // Resolve some defaults for b/c
+ $p['host'] = isset( $p['host'] ) ? $p['host'] : false;
+ $p['user'] = isset( $p['user'] ) ? $p['user'] : false;
+ $p['password'] = isset( $p['password'] ) ? $p['password'] : false;
+ $p['dbname'] = isset( $p['dbname'] ) ? $p['dbname'] : false;
+ $p['flags'] = isset( $p['flags'] ) ? $p['flags'] : 0;
+ $p['variables'] = isset( $p['variables'] ) ? $p['variables'] : [];
+ $p['tablePrefix'] = isset( $p['tablePrefix'] ) ? $p['tablePrefix'] : '';
+ $p['schema'] = isset( $p['schema'] ) ? $p['schema'] : '';
+ $p['cliMode'] = isset( $p['cliMode'] )
+ ? $p['cliMode']
+ : ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
+ $p['agent'] = isset( $p['agent'] ) ? $p['agent'] : '';
+ if ( !isset( $p['connLogger'] ) ) {
+ $p['connLogger'] = new NullLogger();
+ }
+ if ( !isset( $p['queryLogger'] ) ) {
+ $p['queryLogger'] = new NullLogger();
+ }
+ $p['profiler'] = isset( $p['profiler'] ) ? $p['profiler'] : null;
+ if ( !isset( $p['trxProfiler'] ) ) {
+ $p['trxProfiler'] = new TransactionProfiler();
+ }
+ if ( !isset( $p['errorLogger'] ) ) {
+ $p['errorLogger'] = function ( Exception $e ) {
+ trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
+ };
+ }
+
+ $conn = new $class( $p );
+ } else {
+ $conn = null;
+ }
+
+ return $conn;
+ }
+
+ /**
+ * @param string $dbType A possible DB type (sqlite, mysql, postgres,...)
+ * @param string|null $driver Optional name of a specific DB client driver
+ * @return array Map of (Database::ATTRIBUTE_* constant => value) for all such constants
+ * @throws InvalidArgumentException
+ * @since 1.31
+ */
+ final public static function attributesFromType( $dbType, $driver = null ) {
+ static $defaults = [ self::ATTR_DB_LEVEL_LOCKING => false ];
+
+ $class = self::getClass( $dbType, $driver );
+
+ return call_user_func( [ $class, 'getAttributes' ] ) + $defaults;
+ }
+
+ /**
+ * @param string $dbType A possible DB type (sqlite, mysql, postgres,...)
+ * @param string|null $driver Optional name of a specific DB client driver
+ * @return string Database subclass name to use
+ * @throws InvalidArgumentException
+ */
+ private static function getClass( $dbType, $driver = null ) {
// For database types with built-in support, the below maps type to IDatabase
// implementations. For types with multipe driver implementations (PHP extensions),
// an array can be used, keyed by extension name. In case of an array, the
$dbType = strtolower( $dbType );
$class = false;
+
if ( isset( $builtinTypes[$dbType] ) ) {
$possibleDrivers = $builtinTypes[$dbType];
if ( is_string( $possibleDrivers ) ) {
$class = $possibleDrivers;
} else {
- if ( !empty( $p['driver'] ) ) {
- if ( !isset( $possibleDrivers[$p['driver']] ) ) {
+ if ( (string)$driver !== '' ) {
+ if ( !isset( $possibleDrivers[$driver] ) ) {
throw new InvalidArgumentException( __METHOD__ .
- " type '$dbType' does not support driver '{$p['driver']}'" );
+ " type '$dbType' does not support driver '{$driver}'" );
} else {
- $class = $possibleDrivers[$p['driver']];
+ $class = $possibleDrivers[$driver];
}
} else {
foreach ( $possibleDrivers as $posDriver => $possibleClass ) {
" no viable database extension found for type '$dbType'" );
}
- if ( class_exists( $class ) && is_subclass_of( $class, IDatabase::class ) ) {
- // Resolve some defaults for b/c
- $p['host'] = isset( $p['host'] ) ? $p['host'] : false;
- $p['user'] = isset( $p['user'] ) ? $p['user'] : false;
- $p['password'] = isset( $p['password'] ) ? $p['password'] : false;
- $p['dbname'] = isset( $p['dbname'] ) ? $p['dbname'] : false;
- $p['flags'] = isset( $p['flags'] ) ? $p['flags'] : 0;
- $p['variables'] = isset( $p['variables'] ) ? $p['variables'] : [];
- $p['tablePrefix'] = isset( $p['tablePrefix'] ) ? $p['tablePrefix'] : '';
- $p['schema'] = isset( $p['schema'] ) ? $p['schema'] : '';
- $p['cliMode'] = isset( $p['cliMode'] )
- ? $p['cliMode']
- : ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
- $p['agent'] = isset( $p['agent'] ) ? $p['agent'] : '';
- if ( !isset( $p['connLogger'] ) ) {
- $p['connLogger'] = new \Psr\Log\NullLogger();
- }
- if ( !isset( $p['queryLogger'] ) ) {
- $p['queryLogger'] = new \Psr\Log\NullLogger();
- }
- $p['profiler'] = isset( $p['profiler'] ) ? $p['profiler'] : null;
- if ( !isset( $p['trxProfiler'] ) ) {
- $p['trxProfiler'] = new TransactionProfiler();
- }
- if ( !isset( $p['errorLogger'] ) ) {
- $p['errorLogger'] = function ( Exception $e ) {
- trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
- };
- }
-
- $conn = new $class( $p );
- } else {
- $conn = null;
- }
+ return $class;
+ }
- return $conn;
+ /**
+ * @return array Map of (Database::ATTRIBUTE_* constant => value
+ * @since 1.31
+ */
+ protected static function getAttributes() {
+ return [];
}
/**
$this->queryLogger->warning( $msg, $params +
[ 'trace' => ( new RuntimeException() )->getTraceAsString() ] );
- if ( !$recoverable ) {
- # Callers may catch the exception and continue to use the DB
- $this->reportQueryError( $lastError, $lastErrno, $sql, $fname );
- } else {
+ if ( $recoverable ) {
# Should be safe to silently retry the query
$ret = $this->doProfiledQuery( $sql, $commentedSql, $isNonTempWrite, $fname );
+ } else {
+ # Callers may catch the exception and continue to use the DB
+ $this->reportQueryError( $lastError, $lastErrno, $sql, $fname );
}
} else {
$msg = __METHOD__ . ': lost connection to {dbserver} permanently';
*/
private function handleSessionLoss() {
$this->trxLevel = 0;
- $this->trxIdleCallbacks = []; // T67263
- $this->trxPreCommitCallbacks = []; // T67263
+ $this->trxIdleCallbacks = []; // T67263; transaction already lost
+ $this->trxPreCommitCallbacks = []; // T67263; transaction already lost
$this->sessionTempTables = [];
$this->namedLocksHeld = [];
+
+ // Note: if callback suppression is set then some *Callbacks arrays are not cleared here
+ $e = null;
try {
// Handle callbacks in trxEndCallbacks
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ } catch ( Exception $ex ) {
+ // Already logged; move on...
+ $e = $e ?: $ex;
+ }
+ try {
+ // Handle callbacks in trxRecurringCallbacks
$this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
- return null;
- } catch ( Exception $e ) {
+ } catch ( Exception $ex ) {
// Already logged; move on...
- return $e;
+ $e = $e ?: $ex;
}
+
+ return $e;
}
/**
list( $startOpts, $useIndex, $preLimitTail, $postLimitTail, $ignoreIndex ) =
$this->makeSelectOptions( $options );
- if ( !empty( $conds ) ) {
- if ( is_array( $conds ) ) {
- $conds = $this->makeList( $conds, self::LIST_AND );
- }
+ if ( is_array( $conds ) ) {
+ $conds = $this->makeList( $conds, self::LIST_AND );
+ }
+
+ if ( $conds === null || $conds === false ) {
+ $this->queryLogger->warning(
+ __METHOD__
+ . ' called from '
+ . $fname
+ . ' with incorrect parameters: $conds must be a string or an array'
+ );
+ $conds = '';
+ }
+
+ if ( $conds === '' ) {
+ $sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex $preLimitTail";
+ } elseif ( is_string( $conds ) ) {
$sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex " .
"WHERE $conds $preLimitTail";
} else {
- $sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex $preLimitTail";
+ throw new DBUnexpectedError( $this, __METHOD__ . ' called with incorrect parameters' );
}
if ( isset( $options['LIMIT'] ) ) {
return '(' . $this->selectSQLText( $table, $fld, $conds, null, [], $join_conds ) . ')';
}
+ public function buildSubstring( $input, $startPosition, $length = null ) {
+ $this->assertBuildSubstringParams( $startPosition, $length );
+ $functionBody = "$input FROM $startPosition";
+ if ( $length !== null ) {
+ $functionBody .= " FOR $length";
+ }
+ return 'SUBSTRING(' . $functionBody . ')';
+ }
+
+ /**
+ * Check type and bounds for parameters to self::buildSubstring()
+ *
+ * All supported databases have substring functions that behave the same for
+ * positive $startPosition and non-negative $length, but behaviors differ when
+ * given 0 or negative $startPosition or negative $length. The simplest
+ * solution to that is to just forbid those values.
+ *
+ * @param int $startPosition
+ * @param int|null $length
+ * @since 1.31
+ */
+ protected function assertBuildSubstringParams( $startPosition, $length ) {
+ if ( !is_int( $startPosition ) || $startPosition <= 0 ) {
+ throw new InvalidArgumentException(
+ '$startPosition must be a positive integer'
+ );
+ }
+ if ( !( is_int( $length ) && $length >= 0 || $length === null ) ) {
+ throw new InvalidArgumentException(
+ '$length must be null or an integer greater than or equal to 0'
+ );
+ }
+ }
+
public function buildStringCast( $field ) {
return $field;
}
+ public function buildIntegerCast( $field ) {
+ return 'CAST( ' . $field . ' AS INTEGER )';
+ }
+
public function databasesAreIndependent() {
return false;
}
}
// We can't separate explicit JOIN clauses with ',', use ' ' for those
- $implicitJoins = !empty( $ret ) ? implode( ',', $ret ) : "";
- $explicitJoins = !empty( $retJOIN ) ? implode( ' ', $retJOIN ) : "";
+ $implicitJoins = $ret ? implode( ',', $ret ) : "";
+ $explicitJoins = $retJOIN ? implode( ' ', $retJOIN ) : "";
// Compile our final table clause
return implode( ' ', [ $implicitJoins, $explicitJoins ] );
* @return string
*/
protected function indexName( $index ) {
- return $index;
+ return isset( $this->indexAliases[$index] )
+ ? $this->indexAliases[$index]
+ : $index;
}
public function addQuotes( $s ) {
$rows = [ $rows ];
}
- $useTrx = !$this->trxLevel;
- if ( $useTrx ) {
- $this->begin( $fname, self::TRANSACTION_INTERNAL );
- }
try {
+ $this->startAtomic( $fname );
$affectedRowCount = 0;
foreach ( $rows as $row ) {
// Delete rows which collide with this one
$this->insert( $table, $row, $fname );
$affectedRowCount += $this->affectedRows();
}
+ $this->endAtomic( $fname );
+ $this->affectedRowCount = $affectedRowCount;
} catch ( Exception $e ) {
- if ( $useTrx ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
- }
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
throw $e;
}
- if ( $useTrx ) {
- $this->commit( $fname, self::FLUSHING_INTERNAL );
- }
-
- $this->affectedRowCount = $affectedRowCount;
}
/**
}
$affectedRowCount = 0;
- $useTrx = !$this->trxLevel;
- if ( $useTrx ) {
- $this->begin( $fname, self::TRANSACTION_INTERNAL );
- }
try {
+ $this->startAtomic( $fname );
# Update any existing conflicting row(s)
if ( $where !== false ) {
$ok = $this->update( $table, $set, $where, $fname );
# Now insert any non-conflicting row(s)
$ok = $this->insert( $table, $rows, $fname, [ 'IGNORE' ] ) && $ok;
$affectedRowCount += $this->affectedRows();
+ $this->endAtomic( $fname );
+ $this->affectedRowCount = $affectedRowCount;
} catch ( Exception $e ) {
- if ( $useTrx ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
- }
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
throw $e;
}
- if ( $useTrx ) {
- $this->commit( $fname, self::FLUSHING_INTERNAL );
- }
- $this->affectedRowCount = $affectedRowCount;
return $ok;
}
return $this->query( $sql, $fname );
}
- public function insertSelect(
+ final public function insertSelect(
$destTable, $srcTable, $varMap, $conds,
$fname = __METHOD__, $insertOptions = [], $selectOptions = [], $selectJoinConds = []
) {
- if ( $this->cliMode ) {
+ static $hints = [ 'NO_AUTO_COLUMNS' ];
+
+ $insertOptions = (array)$insertOptions;
+ $selectOptions = (array)$selectOptions;
+
+ if ( $this->cliMode && $this->isInsertSelectSafe( $insertOptions, $selectOptions ) ) {
// For massive migrations with downtime, we don't want to select everything
// into memory and OOM, so do all this native on the server side if possible.
return $this->nativeInsertSelect(
$varMap,
$conds,
$fname,
- $insertOptions,
+ array_diff( $insertOptions, $hints ),
$selectOptions,
$selectJoinConds
);
$varMap,
$conds,
$fname,
- $insertOptions,
+ array_diff( $insertOptions, $hints ),
$selectOptions,
$selectJoinConds
);
}
+ /**
+ * @param array $insertOptions INSERT options
+ * @param array $selectOptions SELECT options
+ * @return bool Whether an INSERT SELECT with these options will be replication safe
+ * @since 1.31
+ */
+ protected function isInsertSelectSafe( array $insertOptions, array $selectOptions ) {
+ return true;
+ }
+
/**
* Implementation of insertSelect() based on select() and insert()
*
return false;
}
- $rows = [];
- foreach ( $res as $row ) {
- $rows[] = (array)$row;
+ try {
+ $affectedRowCount = 0;
+ $this->startAtomic( $fname );
+ $rows = [];
+ $ok = true;
+ foreach ( $res as $row ) {
+ $rows[] = (array)$row;
+
+ // Avoid inserts that are too huge
+ if ( count( $rows ) >= $this->nonNativeInsertSelectBatchSize ) {
+ $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+ if ( !$ok ) {
+ break;
+ }
+ $affectedRowCount += $this->affectedRows();
+ $rows = [];
+ }
+ }
+ if ( $rows && $ok ) {
+ $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+ if ( $ok ) {
+ $affectedRowCount += $this->affectedRows();
+ }
+ }
+ if ( $ok ) {
+ $this->endAtomic( $fname );
+ $this->affectedRowCount = $affectedRowCount;
+ } else {
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ }
+ return $ok;
+ } catch ( Exception $e ) {
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ throw $e;
}
-
- return $this->insert( $destTable, $rows, $fname, $insertOptions );
}
/**
} catch ( Exception $e ) {
// already logged; let LoadBalancer move on during mass-rollback
}
+
+ $this->affectedRowCount = 0; // for the sake of consistency
}
/**
$this->tableAliases = $aliases;
}
+ public function setIndexAliases( array $aliases ) {
+ $this->indexAliases = $aliases;
+ }
+
/**
* @return bool Whether a DB user is required to access the DB
* @since 1.28