private $mLoadMonitor;
/** @var BagOStuff */
private $srvCache;
+ /** @var WANObjectCache */
+ private $wanCache;
/** @var bool|DatabaseBase Database connection that caused a problem */
private $mErrorConnection;
const MAX_LAG = 10;
/** @var integer Max time to wait for a slave to catch up (e.g. ChronologyProtector) */
const POS_WAIT_TIMEOUT = 10;
+ /** @var integer Seconds to cache master server read-only status */
+ const TTL_CACHE_READONLY = 5;
/**
* @var boolean
}
$this->srvCache = ObjectCache::getLocalServerInstance();
+ $this->wanCache = ObjectCache::getMainWANInstance();
if ( isset( $params['trxProfiler'] ) ) {
$this->trxProfiler = $params['trxProfiler'];
if ( $masterOnly ) {
# Make master-requested DB handles inherit any read-only mode setting
- $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $wiki ) );
+ $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $wiki, $conn ) );
}
return $conn;
* Close all open connections
*/
public function closeAll() {
- foreach ( $this->mConns as $conns2 ) {
- foreach ( $conns2 as $conns3 ) {
- /** @var DatabaseBase $conn */
- foreach ( $conns3 as $conn ) {
- $conn->close();
- }
- }
- }
+ $this->forEachOpenConnection( function ( DatabaseBase $conn ) {
+ $conn->close();
+ } );
+
$this->mConns = [
'local' => [],
'foreignFree' => [],
* @param string $fname Caller name
*/
public function commitAll( $fname = __METHOD__ ) {
- foreach ( $this->mConns as $conns2 ) {
- foreach ( $conns2 as $conns3 ) {
- /** @var DatabaseBase[] $conns3 */
- foreach ( $conns3 as $conn ) {
- if ( $conn->trxLevel() ) {
- $conn->commit( $fname, 'flush' );
- }
- }
+ $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
+ $conn->commit( $fname, 'flush' );
+ } );
+ }
+
+ /**
+ * Perform all pre-commit callbacks that remain part of the atomic transactions
+ * and disable any post-commit callbacks until runMasterPostCommitCallbacks()
+ * @since 1.28
+ */
+ public function runMasterPreCommitCallbacks() {
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
+ // Any error will cause all DB transactions to be rolled back together.
+ $conn->runOnTransactionPreCommitCallbacks();
+ // Defer post-commit callbacks until COMMIT finishes for all DBs.
+ $conn->setPostCommitCallbackSupression( true );
+ } );
+ }
+
+ /**
+ * Perform all pre-commit checks for things like replication safety
+ * @param array $options Includes:
+ * - maxWriteDuration : max write query duration time in seconds
+ * @throws DBTransactionError
+ * @since 1.28
+ */
+ public function approveMasterChanges( array $options ) {
+ $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $limit ) {
+ // Assert that the time to replicate the transaction will be sane.
+ // If this fails, then all DB transactions will be rollback back together.
+ $time = $conn->pendingWriteQueryDuration();
+ if ( $limit > 0 && $time > $limit ) {
+ throw new DBTransactionError(
+ $conn,
+ wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
+ );
}
- }
+ } );
}
/**
- * Issue COMMIT only on master, only if queries were done on connection
+ * Issue COMMIT on all master connections where writes where done
* @param string $fname Caller name
*/
public function commitMasterChanges( $fname = __METHOD__ ) {
- $masterIndex = $this->getWriterIndex();
- foreach ( $this->mConns as $conns2 ) {
- if ( empty( $conns2[$masterIndex] ) ) {
- continue;
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->commit( $fname, 'flush' );
}
- /** @var DatabaseBase $conn */
- foreach ( $conns2[$masterIndex] as $conn ) {
- if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
- $conn->commit( $fname, 'flush' );
- }
- }
- }
+ } );
+ }
+
+ /**
+ * Issue all pending post-commit callbacks
+ * @since 1.28
+ */
+ public function runMasterPostCommitCallbacks() {
+ $this->forEachOpenMasterConnection( function ( DatabaseBase $db ) {
+ $db->setPostCommitCallbackSupression( false );
+ $db->runOnTransactionIdleCallbacks( IDatabase::TRIGGER_COMMIT );
+ } );
}
/**
/**
* @note This method may trigger a DB connection if not yet done
* @param string|bool $wiki Wiki ID, or false for the current wiki
+ * @param DatabaseBase|null DB master connection; used to avoid loops [optional]
* @return string|bool Reason the master is read-only or false if it is not
* @since 1.27
*/
- public function getReadOnlyReason( $wiki = false ) {
+ public function getReadOnlyReason( $wiki = false, DatabaseBase $conn = null ) {
if ( $this->readOnlyReason !== false ) {
return $this->readOnlyReason;
} elseif ( $this->getLaggedSlaveMode( $wiki ) ) {
return 'The database has been automatically locked ' .
'while the slave database servers catch up to the master.';
}
+ } elseif ( $this->masterRunningReadOnly( $wiki, $conn ) ) {
+ return 'The database master is running in read-only mode.';
}
return false;
}
+ /**
+ * @param string $wiki Wiki ID, or false for the current wiki
+ * @param DatabaseBase|null DB master connectionl used to avoid loops [optional]
+ * @return bool
+ */
+ private function masterRunningReadOnly( $wiki, DatabaseBase $conn = null ) {
+ $cache = $this->wanCache;
+ $masterServer = $this->getServerName( $this->getWriterIndex() );
+
+ return (bool)$cache->getWithSetCallback(
+ $cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
+ self::TTL_CACHE_READONLY,
+ function () use ( $wiki, $conn ) {
+ $this->trxProfiler->setSilenced( true );
+ try {
+ $dbw = $conn ?: $this->getConnection( DB_MASTER, [], $wiki );
+ $readOnly = (int)$dbw->serverIsReadOnly();
+ } catch ( DBError $e ) {
+ $readOnly = 0;
+ }
+ $this->trxProfiler->setSilenced( false );
+ return $readOnly;
+ },
+ [ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
+ );
+ }
+
/**
* Disables/enables lag checks
* @param null|bool $mode
*/
public function pingAll() {
$success = true;
- foreach ( $this->mConns as $conns2 ) {
- foreach ( $conns2 as $conns3 ) {
- /** @var DatabaseBase[] $conns3 */
- foreach ( $conns3 as $conn ) {
- if ( !$conn->ping() ) {
- $success = false;
- }
- }
+ $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( &$success ) {
+ if ( !$conn->ping() ) {
+ $success = false;
}
- }
+ } );
return $success;
}
* @param array $params
*/
public function forEachOpenConnection( $callback, array $params = [] ) {
- foreach ( $this->mConns as $conns2 ) {
- foreach ( $conns2 as $conns3 ) {
- foreach ( $conns3 as $conn ) {
+ foreach ( $this->mConns as $connsByServer ) {
+ foreach ( $connsByServer as $serverConns ) {
+ foreach ( $serverConns as $conn ) {
+ $mergedParams = array_merge( [ $conn ], $params );
+ call_user_func_array( $callback, $mergedParams );
+ }
+ }
+ }
+ }
+
+ /**
+ * Call a function with each open connection object to a master
+ * @param callable $callback
+ * @param array $params
+ * @since 1.28
+ */
+ public function forEachOpenMasterConnection( $callback, array $params = [] ) {
+ $masterIndex = $this->getWriterIndex();
+ foreach ( $this->mConns as $connsByServer ) {
+ if ( isset( $connsByServer[$masterIndex] ) ) {
+ /** @var DatabaseBase $conn */
+ foreach ( $connsByServer[$masterIndex] as $conn ) {
$mergedParams = array_merge( [ $conn ], $params );
call_user_func_array( $callback, $mergedParams );
}
$lagTimes = $this->getLagTimes( $wiki );
foreach ( $lagTimes as $i => $lag ) {
- if ( $lag > $maxLag ) {
+ if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
$maxLag = $lag;
$host = $this->mServers[$i]['host'];
$maxIndex = $i;
}
$pos = $pos ?: $this->getConnection( DB_MASTER )->getMasterPos();
- if ( !$pos ) {
+ if ( !( $pos instanceof DBMasterPos ) ) {
return false; // something is misconfigured
}