* @ingroup Cache
*/
class SqlBagOStuff extends BagOStuff {
- /** @var array */
+ /** @var array[] (server index => server config) */
protected $serverInfos;
- /** @var array */
- protected $serverNames;
+ /** @var string[] (server index => tag/host name) */
+ protected $serverTags;
/** @var int */
protected $numServers;
/** @var int */
/** @var array */
protected $conns;
/** @var array UNIX timestamps */
- protected $connFailureTimes = array();
+ protected $connFailureTimes = [];
/** @var array Exceptions */
- protected $connFailureErrors = array();
+ protected $connFailureErrors = [];
/**
* Constructor. Parameters are:
* - server: A server info structure in the format required by each
* element in $wgDBServers.
*
- * - servers: An array of server info structures describing a set of
- * database servers to distribute keys to. If this is
- * specified, the "server" option will be ignored.
+ * - servers: An array of server info structures describing a set of database servers
+ * to distribute keys to. If this is specified, the "server" option will be
+ * ignored. If string keys are used, then they will be used for consistent
+ * hashing *instead* of the host name (from the server config). This is useful
+ * when a cluster is replicated to another site (with different host names)
+ * but each server has a corresponding replica in the other cluster.
*
* - purgePeriod: The average number of object cache requests in between
* garbage collection operations, where expired entries
*/
public function __construct( $params ) {
parent::__construct( $params );
+
+ $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
+
if ( isset( $params['servers'] ) ) {
- $this->serverInfos = $params['servers'];
- $this->numServers = count( $this->serverInfos );
- $this->serverNames = array();
- foreach ( $this->serverInfos as $i => $info ) {
- $this->serverNames[$i] = isset( $info['host'] ) ? $info['host'] : "#$i";
+ $this->serverInfos = [];
+ $this->serverTags = [];
+ $this->numServers = count( $params['servers'] );
+ $index = 0;
+ foreach ( $params['servers'] as $tag => $info ) {
+ $this->serverInfos[$index] = $info;
+ if ( is_string( $tag ) ) {
+ $this->serverTags[$index] = $tag;
+ } else {
+ $this->serverTags[$index] = isset( $info['host'] ) ? $info['host'] : "#$index";
+ }
+ ++$index;
}
} elseif ( isset( $params['server'] ) ) {
- $this->serverInfos = array( $params['server'] );
+ $this->serverInfos = [ $params['server'] ];
$this->numServers = count( $this->serverInfos );
} else {
$this->serverInfos = false;
$tableIndex = 0;
}
if ( $this->numServers > 1 ) {
- $sortedServers = $this->serverNames;
+ $sortedServers = $this->serverTags;
ArrayUtils::consistentHashSort( $sortedServers, $key );
reset( $sortedServers );
$serverIndex = key( $sortedServers );
} else {
$serverIndex = 0;
}
- return array( $serverIndex, $this->getTableNameByShard( $tableIndex ) );
+ return [ $serverIndex, $this->getTableNameByShard( $tableIndex ) ];
}
/**
}
protected function getWithToken( $key, &$casToken, $flags = 0 ) {
- $values = $this->getMulti( array( $key ) );
+ $values = $this->getMulti( [ $key ] );
if ( array_key_exists( $key, $values ) ) {
$casToken = $values[$key];
return $values[$key];
}
public function getMulti( array $keys, $flags = 0 ) {
- $values = array(); // array of (key => value)
+ $values = []; // array of (key => value)
- $keysByTable = array();
+ $keysByTable = [];
foreach ( $keys as $key ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
$keysByTable[$serverIndex][$tableName][] = $key;
$this->garbageCollect(); // expire old entries if any
- $dataRows = array();
+ $dataRows = [];
foreach ( $keysByTable as $serverIndex => $serverKeys ) {
try {
$db = $this->getDB( $serverIndex );
foreach ( $serverKeys as $tableName => $tableKeys ) {
$res = $db->select( $tableName,
- array( 'keyname', 'value', 'exptime' ),
- array( 'keyname' => $tableKeys ),
+ [ 'keyname', 'value', 'exptime' ],
+ [ 'keyname' => $tableKeys ],
__METHOD__,
// Approximate write-on-the-fly BagOStuff API via blocking.
// This approximation fails if a ROLLBACK happens (which is rare).
// We do not want to flush the TRX as that can break callers.
- $db->trxLevel() ? array( 'LOCK IN SHARE MODE' ) : array()
+ $db->trxLevel() ? [ 'LOCK IN SHARE MODE' ] : []
);
if ( $res === false ) {
continue;
}
public function setMulti( array $data, $expiry = 0 ) {
- $keysByTable = array();
+ $keysByTable = [];
foreach ( $data as $key => $value ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
$keysByTable[$serverIndex][$tableName][] = $key;
$encExpiry = $db->timestamp( $exptime );
}
foreach ( $serverKeys as $tableName => $tableKeys ) {
- $rows = array();
+ $rows = [];
foreach ( $tableKeys as $key ) {
- $rows[] = array(
+ $rows[] = [
'keyname' => $key,
'value' => $db->encodeBlob( $this->serialize( $data[$key] ) ),
'exptime' => $encExpiry,
- );
+ ];
}
try {
$db->replace(
$tableName,
- array( 'keyname' ),
+ [ 'keyname' ],
$rows,
__METHOD__
);
}
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
- $ok = $this->setMulti( array( $key => $value ), $exptime );
+ $ok = $this->setMulti( [ $key => $value ], $exptime );
if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
$ok = $ok && $this->waitForSlaves();
}
// delete/insert to avoid clashes with conflicting keynames
$db->update(
$tableName,
- array(
+ [
'keyname' => $key,
'value' => $db->encodeBlob( $this->serialize( $value ) ),
'exptime' => $encExpiry
- ),
- array(
+ ],
+ [
'keyname' => $key,
'value' => $db->encodeBlob( $this->serialize( $casToken ) )
- ),
+ ],
__METHOD__
);
} catch ( DBQueryError $e ) {
$db = $this->getDB( $serverIndex );
$db->delete(
$tableName,
- array( 'keyname' => $key ),
+ [ 'keyname' => $key ],
__METHOD__ );
} catch ( DBError $e ) {
$this->handleWriteError( $e, $serverIndex );
$step = intval( $step );
$row = $db->selectRow(
$tableName,
- array( 'value', 'exptime' ),
- array( 'keyname' => $key ),
+ [ 'value', 'exptime' ],
+ [ 'keyname' => $key ],
__METHOD__,
- array( 'FOR UPDATE' ) );
+ [ 'FOR UPDATE' ] );
if ( $row === false ) {
// Missing
return null;
}
- $db->delete( $tableName, array( 'keyname' => $key ), __METHOD__ );
+ $db->delete( $tableName, [ 'keyname' => $key ], __METHOD__ );
if ( $this->isExpired( $db, $row->exptime ) ) {
// Expired, do not reinsert
$oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) );
$newValue = $oldValue + $step;
$db->insert( $tableName,
- array(
+ [
'keyname' => $key,
'value' => $db->encodeBlob( $this->serialize( $newValue ) ),
'exptime' => $row->exptime
- ), __METHOD__, 'IGNORE' );
+ ], __METHOD__, 'IGNORE' );
if ( $db->affectedRows() == 0 ) {
// Race condition. See bug 28611
return $newValue;
}
- public function merge( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
- if ( !is_callable( $callback ) ) {
- throw new Exception( "Got invalid callback." );
- }
-
+ public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
$ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts );
if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
$ok = $ok && $this->waitForSlaves();
$db = $this->getDB( $serverIndex );
$dbTimestamp = $db->timestamp( $timestamp );
$totalSeconds = false;
- $baseConds = array( 'exptime < ' . $db->addQuotes( $dbTimestamp ) );
+ $baseConds = [ 'exptime < ' . $db->addQuotes( $dbTimestamp ) ];
for ( $i = 0; $i < $this->shards; $i++ ) {
$maxExpTime = false;
while ( true ) {
}
$rows = $db->select(
$this->getTableNameByShard( $i ),
- array( 'keyname', 'exptime' ),
+ [ 'keyname', 'exptime' ],
$conds,
__METHOD__,
- array( 'LIMIT' => 100, 'ORDER BY' => 'exptime' ) );
+ [ 'LIMIT' => 100, 'ORDER BY' => 'exptime' ] );
if ( $rows === false || !$rows->numRows() ) {
break;
}
- $keys = array();
+ $keys = [];
$row = $rows->current();
$minExpTime = $row->exptime;
if ( $totalSeconds === false ) {
$db->delete(
$this->getTableNameByShard( $i ),
- array(
+ [
'exptime >= ' . $db->addQuotes( $minExpTime ),
'exptime < ' . $db->addQuotes( $dbTimestamp ),
'keyname' => $keys
- ),
+ ],
__METHOD__ );
if ( $progressCallback ) {
if ( !$this->serverInfos ) {
// Main LB is used; wait for any slaves to catch up
try {
- wfGetLBFactory()->waitForReplication( array( 'wiki' => wfWikiID() ) );
+ wfGetLBFactory()->waitForReplication( [ 'wiki' => wfWikiID() ] );
return true;
} catch ( DBReplicationWaitError $e ) {
return false;