* @ingroup Cache
*/
+use \MediaWiki\MediaWikiServices;
+
/**
* Class to store objects in the database
*
/** @var int */
protected $syncTimeout = 3;
+ /** @var LoadBalancer|null */
+ protected $separateMainLB;
/** @var array */
protected $conns;
/** @var array UNIX timestamps */
$this->serverInfos = [ $params['server'] ];
$this->numServers = count( $this->serverInfos );
} else {
+ // Default to using the main wiki's database servers
$this->serverInfos = false;
$this->numServers = 1;
}
$this->slaveOnly = !empty( $params['slaveOnly'] );
}
+ protected function getSeparateMainLB() {
+ global $wgDBtype;
+
+ if ( $wgDBtype === 'mysql' && $this->usesMainDB() ) {
+ if ( !$this->separateMainLB ) {
+ // We must keep a separate connection to MySQL in order to avoid deadlocks
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $this->separateMainLB = $lbFactory->newMainLB();
+ }
+ return $this->separateMainLB;
+ } else {
+ // However, SQLite has an opposite behavior. And PostgreSQL needs to know
+ // if we are in transaction or not (@TODO: find some PostgreSQL work-around).
+ return null;
+ }
+ }
+
/**
* Get a connection to the specified database
*
$db = DatabaseBase::factory( $type, $info );
$db->clearFlag( DBO_TRX );
} else {
- // We must keep a separate connection to MySQL in order to avoid deadlocks
- // However, SQLite has an opposite behavior. And PostgreSQL needs to know
- // if we are in transaction or not (@TODO: find some work-around).
$index = $this->slaveOnly ? DB_SLAVE : DB_MASTER;
- if ( wfGetDB( $index )->getType() == 'mysql' ) {
- $lb = wfGetLBFactory()->newMainLB();
- $db = $lb->getConnection( $index );
+ if ( $this->getSeparateMainLB() ) {
+ $db = $this->getSeparateMainLB()->getConnection( $index );
$db->clearFlag( DBO_TRX ); // auto-commit mode
} else {
$db = wfGetDB( $index );
+ // Can't mess with transaction rounds (DBO_TRX) :(
}
}
$this->logger->debug( sprintf( "Connection %s will be used for SqlBagOStuff", $db ) );
if ( isset( $dataRows[$key] ) ) { // HIT?
$row = $dataRows[$key];
$this->debug( "get: retrieved data; expiry time is " . $row->exptime );
+ $db = null;
try {
$db = $this->getDB( $row->serverIndex );
if ( $this->isExpired( $db, $row->exptime ) ) { // MISS
$values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) );
}
} catch ( DBQueryError $e ) {
- $this->handleWriteError( $e, $row->serverIndex );
+ $this->handleWriteError( $e, $db, $row->serverIndex );
}
} else { // MISS
$this->debug( 'get: no matching rows' );
$result = true;
$exptime = (int)$expiry;
foreach ( $keysByTable as $serverIndex => $serverKeys ) {
+ $db = null;
try {
$db = $this->getDB( $serverIndex );
} catch ( DBError $e ) {
- $this->handleWriteError( $e, $serverIndex );
+ $this->handleWriteError( $e, $db, $serverIndex );
$result = false;
continue;
}
__METHOD__
);
} catch ( DBError $e ) {
- $this->handleWriteError( $e, $serverIndex );
+ $this->handleWriteError( $e, $db, $serverIndex );
$result = false;
}
protected function cas( $casToken, $key, $value, $exptime = 0 ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+ $db = null;
try {
$db = $this->getDB( $serverIndex );
$exptime = intval( $exptime );
__METHOD__
);
} catch ( DBQueryError $e ) {
- $this->handleWriteError( $e, $serverIndex );
+ $this->handleWriteError( $e, $db, $serverIndex );
return false;
}
public function delete( $key ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+ $db = null;
try {
$db = $this->getDB( $serverIndex );
$db->delete(
[ 'keyname' => $key ],
__METHOD__ );
} catch ( DBError $e ) {
- $this->handleWriteError( $e, $serverIndex );
+ $this->handleWriteError( $e, $db, $serverIndex );
return false;
}
public function incr( $key, $step = 1 ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+ $db = null;
try {
$db = $this->getDB( $serverIndex );
$step = intval( $step );
$newValue = null;
}
} catch ( DBError $e ) {
- $this->handleWriteError( $e, $serverIndex );
+ $this->handleWriteError( $e, $db, $serverIndex );
return null;
}
public function changeTTL( $key, $expiry = 0 ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+ $db = null;
try {
$db = $this->getDB( $serverIndex );
$db->update(
return false;
}
} catch ( DBError $e ) {
- $this->handleWriteError( $e, $serverIndex );
+ $this->handleWriteError( $e, $db, $serverIndex );
return false;
}
*/
public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) {
for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
+ $db = null;
try {
$db = $this->getDB( $serverIndex );
$dbTimestamp = $db->timestamp( $timestamp );
}
}
} catch ( DBError $e ) {
- $this->handleWriteError( $e, $serverIndex );
+ $this->handleWriteError( $e, $db, $serverIndex );
return false;
}
}
*/
public function deleteAll() {
for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
+ $db = null;
try {
$db = $this->getDB( $serverIndex );
for ( $i = 0; $i < $this->shards; $i++ ) {
$db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
}
} catch ( DBError $e ) {
- $this->handleWriteError( $e, $serverIndex );
+ $this->handleWriteError( $e, $db, $serverIndex );
return false;
}
}
* Handle a DBQueryError which occurred during a write operation.
*
* @param DBError $exception
+ * @param IDatabase|null $db DB handle or null if connection failed
* @param int $serverIndex
+ * @throws Exception
*/
- protected function handleWriteError( DBError $exception, $serverIndex ) {
- if ( $exception instanceof DBConnectionError ) {
+ protected function handleWriteError( DBError $exception, IDatabase $db = null, $serverIndex ) {
+ if ( !$db ) {
$this->markServerDown( $exception, $serverIndex );
- }
- if ( $exception->db && $exception->db->wasReadOnlyError() ) {
- if ( $exception->db->trxLevel() ) {
- try {
- $exception->db->rollback( __METHOD__ );
- } catch ( DBError $e ) {
- }
+ } elseif ( $db->wasReadOnlyError() ) {
+ if ( $db->trxLevel() && $this->usesMainDB() ) {
+ // Errors like deadlocks and connection drops already cause rollback.
+ // For consistency, we have no choice but to throw an error and trigger
+ // complete rollback if the main DB is also being used as the cache DB.
+ throw $exception;
}
}
* @param DBError $exception
* @param int $serverIndex
*/
- protected function markServerDown( $exception, $serverIndex ) {
+ protected function markServerDown( DBError $exception, $serverIndex ) {
unset( $this->conns[$serverIndex] ); // bug T103435
if ( isset( $this->connFailureTimes[$serverIndex] ) ) {
}
}
+ /**
+ * @return bool Whether the main DB is used, e.g. wfGetDB( DB_MASTER )
+ */
+ protected function usesMainDB() {
+ return !$this->serverInfos;
+ }
+
protected function waitForSlaves() {
- if ( !$this->serverInfos ) {
+ if ( $this->usesMainDB() ) {
+ $lb = $this->getSeparateMainLB()
+ ?: MediaWikiServices::getInstance()->getDBLoadBalancer();
// Main LB is used; wait for any slaves to catch up
try {
- wfGetLBFactory()->waitForReplication( [ 'wiki' => wfWikiID() ] );
- return true;
+ $pos = $lb->getMasterPos();
+ if ( $pos ) {
+ return $lb->waitForAll( $pos, 3 );
+ }
} catch ( DBReplicationWaitError $e ) {
return false;
}
- } else {
- // Custom DB server list; probably doesn't use replication
- return true;
}
+
+ // Custom DB server list; probably doesn't use replication
+ return true;
}
}