/** @var callable[] */
private $replicationWaitCallbacks = [];
- /** @var mixed */
+ /** var int An identifier for this class instance */
+ private $id;
+ /** @var int|null Ticket used to delegate transaction ownership */
private $ticket;
/** @var string|bool String if a requested DBO_TRX transaction round is active */
private $trxRoundId = false;
$this->defaultGroup = $conf['defaultGroup'] ?? null;
$this->secret = $conf['secret'] ?? '';
+ $this->id = mt_rand();
$this->ticket = mt_rand();
}
}
public function flushReplicaSnapshots( $fname = __METHOD__ ) {
+ if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
+ $this->queryLogger->warning(
+ "$fname: transaction round '{$this->trxRoundId}' still running",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
+ }
$this->forEachLBCallMethod( 'flushReplicaSnapshots', [ $fname ] );
}
if ( $this->trxRoundId !== false ) {
throw new DBTransactionError(
null,
- "$fname: transaction round '{$this->trxRoundId}' already started."
+ "$fname: transaction round '{$this->trxRoundId}' already started"
);
}
$this->trxRoundId = $fname;
// Set DBO_TRX flags on all appropriate DBs
- $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname, $this->id ] );
$this->trxRoundStage = self::ROUND_CURSORY;
}
if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
throw new DBTransactionError(
null,
- "$fname: transaction round '{$this->trxRoundId}' still running."
+ "$fname: transaction round '{$this->trxRoundId}' still running"
);
}
/** @noinspection PhpUnusedLocalVariableInspection */
// Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
do {
$count = 0; // number of callbacks executed this iteration
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$count ) {
- $count += $lb->finalizeMasterChanges();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$count, $fname ) {
+ $count += $lb->finalizeMasterChanges( $fname, $this->id );
} );
} while ( $count > 0 );
$this->trxRoundId = false;
// Perform pre-commit checks, aborting on failure
- $this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
+ $this->forEachLBCallMethod( 'approveMasterChanges', [ $options, $fname, $this->id ] );
// Log the DBs and methods involved in multi-DB transactions
$this->logIfMultiDbTransaction();
// Actually perform the commit on all master DB connections and revert DBO_TRX
- $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname, $this->id ] );
// Run all post-commit callbacks in a separate step
$this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
$e = $this->executePostTransactionCallbacks();
$this->trxRoundStage = self::ROUND_ROLLING_BACK;
$this->trxRoundId = false;
// Actually perform the rollback on all master DB connections and revert DBO_TRX
- $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname, $this->id ] );
// Run all post-commit callbacks in a separate step
$this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
$this->executePostTransactionCallbacks();
* @return Exception|null
*/
private function executePostTransactionCallbacks() {
+ $fname = __METHOD__;
// Run all post-commit callbacks until new ones stop getting added
$e = null; // first callback exception
do {
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterTransactionIdleCallbacks();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e, $fname ) {
+ $ex = $lb->runMasterTransactionIdleCallbacks( $fname, $this->id );
$e = $e ?: $ex;
} );
} while ( $this->hasMasterChanges() );
// Run all listener callbacks once
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterTransactionListenerCallbacks();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e, $fname ) {
+ $ex = $lb->runMasterTransactionListenerCallbacks( $fname, $this->id );
$e = $e ?: $ex;
} );
// time needed to wait on the next clusters.
$masterPositions = array_fill( 0, count( $lbs ), false );
foreach ( $lbs as $i => $lb ) {
- if ( $lb->getServerCount() <= 1 ) {
- // T29975 - Don't try to wait for replica DBs if there are none
- // Prevents permission error when getting master position
- continue;
- } elseif ( $opts['ifWritesSince']
- && $lb->lastMasterChangeTimestamp() < $opts['ifWritesSince']
+ if ( !$lb->hasStreamingReplicaServers() ) {
+ continue; // T29975: no replication; avoid getMasterPos() permissions errors
+ } elseif (
+ $opts['ifWritesSince'] &&
+ $lb->lastMasterChangeTimestamp() < $opts['ifWritesSince']
) {
continue; // no writes since the last wait
}
public function getEmptyTransactionTicket( $fname ) {
if ( $this->hasMasterChanges() ) {
- $this->queryLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" .
- ( new RuntimeException() )->getTraceAsString() );
+ $this->queryLogger->error(
+ __METHOD__ . ": $fname does not have outer scope",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
return null;
}
final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
if ( $ticket !== $this->ticket ) {
- $this->perfLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" .
- ( new RuntimeException() )->getTraceAsString() );
+ $this->perfLogger->error(
+ __METHOD__ . ": $fname does not have outer scope",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
return false;
}
// The transaction owner and any caller with the empty transaction ticket can commit
// so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError.
if ( $this->trxRoundId !== false && $fname !== $this->trxRoundId ) {
- $this->queryLogger->info( "$fname: committing on behalf of {$this->trxRoundId}." );
+ $this->queryLogger->info( "$fname: committing on behalf of {$this->trxRoundId}" );
$fnameEffective = $this->trxRoundId;
} else {
$fnameEffective = $fname;
} elseif ( $this->memStash instanceof EmptyBagOStuff ) {
// No where to store any DB positions and wait for them to appear
$this->chronProt->setEnabled( false );
- $this->replLogger->info( 'Cannot use ChronologyProtector with EmptyBagOStuff.' );
+ $this->replLogger->info( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
}
- $this->replLogger->debug( __METHOD__ . ': using request info ' .
- json_encode( $this->requestInfo, JSON_PRETTY_PRINT ) );
+ $this->replLogger->debug(
+ __METHOD__ . ': request info ' .
+ json_encode( $this->requestInfo, JSON_PRETTY_PRINT )
+ );
return $this->chronProt;
}
// being called later (but before the first connection attempt) (T192611)
$this->getChronologyProtector()->applySessionReplicationPosition( $lb );
},
- 'roundStage' => $initStage
+ 'roundStage' => $initStage,
+ 'ownerId' => $this->id
];
}
*/
protected function initLoadBalancer( ILoadBalancer $lb ) {
if ( $this->trxRoundId !== false ) {
- $lb->beginMasterChanges( $this->trxRoundId ); // set DBO_TRX
+ $lb->beginMasterChanges( $this->trxRoundId, $this->id ); // set DBO_TRX
}
$lb->setTableAliases( $this->tableAliases );
public function appendShutdownCPIndexAsQuery( $url, $index ) {
$usedCluster = 0;
$this->forEachLB( function ( ILoadBalancer $lb ) use ( &$usedCluster ) {
- $usedCluster |= ( $lb->getServerCount() > 1 );
+ $usedCluster |= $lb->hasStreamingReplicaServers();
} );
if ( !$usedCluster ) {
public function setRequestInfo( array $info ) {
if ( $this->chronProt ) {
- throw new LogicException( 'ChronologyProtector already initialized.' );
+ throw new LogicException( 'ChronologyProtector already initialized' );
}
$this->requestInfo = $info + $this->requestInfo;