* @ingroup Database
*/
abstract class LBFactory implements DestructibleService {
-
/** @var ChronologyProtector */
protected $chronProt;
-
/** @var TransactionProfiler */
protected $trxProfiler;
-
/** @var LoggerInterface */
- protected $logger;
-
+ protected $trxLogger;
+ /** @var BagOStuff */
+ protected $srvCache;
+ /** @var WANObjectCache */
+ protected $wanCache;
+
+ /** @var mixed */
+ protected $ticket;
/** @var string|bool Reason all LBs are read-only or false if not */
protected $readOnlyReason = false;
/**
* Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
* @param array $conf
+ * @TODO: inject objects via dependency framework
*/
public function __construct( array $conf ) {
if ( isset( $conf['readOnlyReason'] ) && is_string( $conf['readOnlyReason'] ) ) {
$this->readOnlyReason = $conf['readOnlyReason'];
}
-
$this->chronProt = $this->newChronologyProtector();
$this->trxProfiler = Profiler::instance()->getTransactionProfiler();
- $this->logger = LoggerFactory::getInstance( 'DBTransaction' );
+ // Use APC/memcached style caching, but avoids loops with CACHE_DB (T141804)
+ $cache = ObjectCache::getLocalServerInstance();
+ if ( $cache->getQoS( $cache::ATTR_EMULATION ) > $cache::QOS_EMULATION_SQL ) {
+ $this->srvCache = $cache;
+ } else {
+ $this->srvCache = new EmptyBagOStuff();
+ }
+ $wCache = ObjectCache::getMainWANInstance();
+ if ( $wCache->getQoS( $wCache::ATTR_EMULATION ) > $wCache::QOS_EMULATION_SQL ) {
+ $this->wanCache = $wCache;
+ } else {
+ $this->wanCache = WANObjectCache::newEmpty();
+ }
+ $this->trxLogger = LoggerFactory::getInstance( 'DBTransaction' );
+ $this->ticket = mt_rand();
}
/**
* 1. To commit changes to the masters.
* 2. To release the snapshot on all connections, master and slave.
* @param string $fname Caller name
+ * @param array $options Options map:
+ * - maxWriteDuration: abort if more than this much time was spent in write queries
*/
- public function commitAll( $fname = __METHOD__ ) {
- $this->logMultiDbTransaction();
-
- $start = microtime( true );
+ public function commitAll( $fname = __METHOD__, array $options = [] ) {
+ $this->commitMasterChanges( $fname, $options );
$this->forEachLBCallMethod( 'commitAll', [ $fname ] );
- $timeMs = 1000 * ( microtime( true ) - $start );
-
- RequestContext::getMain()->getStats()->timing( "db.commit-all", $timeMs );
}
/**
* - maxWriteDuration: abort if more than this much time was spent in write queries
*/
public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
- $limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
-
- $this->logMultiDbTransaction();
- $this->forEachLB( function ( LoadBalancer $lb ) use ( $limit ) {
- $lb->forEachOpenConnection( function ( IDatabase $db ) use ( $limit ) {
- $time = $db->pendingWriteQueryDuration();
- if ( $limit > 0 && $time > $limit ) {
- throw new DBTransactionError(
- $db,
- wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
- );
- }
- } );
- } );
-
+ // Perform all pre-commit callbacks, aborting on failure
+ $this->forEachLBCallMethod( 'runMasterPreCommitCallbacks' );
+ // Perform all pre-commit checks, aborting on failure
+ $this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
+ // Log the DBs and methods involved in multi-DB transactions
+ $this->logIfMultiDbTransaction();
+ // Actually perform the commit on all master DB connections
+ $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
+ // Run all post-commit callbacks
+ $this->forEachLBCallMethod( 'runMasterPostCommitCallbacks' );
+ // Commit any dangling DBO_TRX transactions from callbacks on one DB to another DB
$this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
}
/**
* Log query info if multi DB transactions are going to be committed now
*/
- private function logMultiDbTransaction() {
+ private function logIfMultiDbTransaction() {
$callersByDB = [];
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$callersByDB ) {
$masterName = $lb->getServerName( $lb->getWriterIndex() );
foreach ( $callersByDB as $db => $callers ) {
$msg .= "$db: " . implode( '; ', $callers ) . "\n";
}
- $this->logger->info( $msg );
+ $this->trxLogger->info( $msg );
}
}
}
}
+ /**
+ * Get a token asserting that no transaction writes are active
+ *
+ * @param string $fname Caller name (e.g. __METHOD__)
+ * @return mixed A value to pass to commitAndWaitForReplication()
+ * @since 1.28
+ */
+ public function getEmptyTransactionTicket( $fname ) {
+ if ( $this->hasMasterChanges() ) {
+ $this->trxLogger->error( __METHOD__ . ": $fname does not have outer scope." );
+ return null;
+ }
+
+ return $this->ticket;
+ }
+
+ /**
+ * Convenience method for safely running commitMasterChanges()/waitForReplication()
+ *
+ * This will commit and wait unless $ticket indicates it is unsafe to do so
+ *
+ * @param string $fname Caller name (e.g. __METHOD__)
+ * @param mixed $ticket Result of getOuterTransactionScopeTicket()
+ * @param array $opts Options to waitForReplication()
+ * @throws DBReplicationWaitError
+ * @since 1.28
+ */
+ public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
+ if ( $ticket !== $this->ticket ) {
+ $logger = LoggerFactory::getInstance( 'DBPerformance' );
+ $logger->error( __METHOD__ . ": cannot commit; $fname does not have outer scope." );
+ return;
+ }
+
+ $this->commitMasterChanges( $fname );
+ $this->waitForReplication( $opts );
+ }
+
/**
* Disable the ChronologyProtector for all load balancers
*
}
} );
}
+
+ /**
+ * Close all open database connections on all open load balancers.
+ * @since 1.28
+ */
+ public function closeAll() {
+ $this->forEachLBCallMethod( 'closeAll', [] );
+ }
+
}
/**