private $errorConnection;
/** @var int The generic (not query grouped) replica DB index */
private $genericReadIndex = -1;
+ /** @var int[] The group replica DB indexes keyed by group */
+ private $readIndexByGroup = [];
/** @var bool|DBMasterPos False if not set */
private $waitForPos;
/** @var bool Whether the generic reader fell back to a lagged replica DB */
/** @var bool Whether any connection has been attempted yet */
private $connectionAttempted = false;
+ /** @var int|null An integer ID of the managing LBFactory instance or null */
+ private $ownerId;
/** @var string|bool String if a requested DBO_TRX transaction round is active */
private $trxRoundId = false;
/** @var string Stage of the current transaction round in the transaction round life-cycle */
}
$this->defaultGroup = $params['defaultGroup'] ?? null;
+ $this->ownerId = $params['ownerId'] ?? null;
}
public function getLocalDomainID() {
if ( count( $this->servers ) == 1 ) {
// Skip the load balancing if there's only one server
return $this->getWriterIndex();
- } elseif ( $group === false && $this->genericReadIndex >= 0 ) {
- // A generic reader index was already selected and "waitForPos" was handled
- return $this->genericReadIndex;
+ }
+
+ $index = $this->getExistingReaderIndex( $group );
+ if ( $index >= 0 ) {
+ // A reader index was already selected and "waitForPos" was handled
+ return $index;
}
if ( $group !== false ) {
$laggedReplicaMode = true;
}
- if ( $this->genericReadIndex < 0 && $this->genericLoads[$i] > 0 && $group === false ) {
- // Cache the generic (ungrouped) reader index for future DB_REPLICA handles
- $this->genericReadIndex = $i;
- // Record if the generic reader index is in "lagged replica DB" mode
- $this->laggedReplicaMode = ( $laggedReplicaMode || $this->laggedReplicaMode );
+ // Cache the reader index for future DB_REPLICA handles
+ $this->setExistingReaderIndex( $group, $i );
+ // Record whether the generic reader index is in "lagged replica DB" mode
+ if ( $group === false && $laggedReplicaMode ) {
+ $this->laggedReplicaMode = true;
}
$serverName = $this->getServerName( $i );
return $i;
}
+ /**
+ * Get the server index chosen by the load balancer for use with the given query group
+ *
+ * @param string|bool $group Query group; use false for the generic group
+ * @return int Server index or -1 if none was chosen
+ */
+ protected function getExistingReaderIndex( $group ) {
+ if ( $group === false ) {
+ $index = $this->genericReadIndex;
+ } else {
+ $index = $this->readIndexByGroup[$group] ?? -1;
+ }
+
+ return $index;
+ }
+
+ /**
+ * Set the server index chosen by the load balancer for use with the given query group
+ *
+ * @param string|bool $group Query group; use false for the generic group
+ * @param int $index The index of a specific server
+ */
+ private function setExistingReaderIndex( $group, $index ) {
+ if ( $index < 0 ) {
+ throw new UnexpectedValueException( "Cannot set a negative read server index" );
+ }
+
+ if ( $group === false ) {
+ $this->genericReadIndex = $index;
+ } else {
+ $this->readIndexByGroup[$group] = $index;
+ }
+ }
+
/**
* @param array $loads List of server weights
* @param string|bool $domain
$conn->close();
}
- public function commitAll( $fname = __METHOD__ ) {
- $this->commitMasterChanges( $fname );
+ public function commitAll( $fname = __METHOD__, $owner = null ) {
+ $this->commitMasterChanges( $fname, $owner );
$this->flushMasterSnapshots( $fname );
$this->flushReplicaSnapshots( $fname );
}
- public function finalizeMasterChanges() {
+ public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
$this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
$this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
return $total;
}
- public function approveMasterChanges( array $options ) {
+ public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
$this->assertTransactionRoundStage( self::ROUND_FINALIZED );
$limit = $options['maxWriteDuration'] ?? 0;
$this->trxRoundStage = self::ROUND_APPROVED;
}
- public function beginMasterChanges( $fname = __METHOD__ ) {
+ public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
if ( $this->trxRoundId !== false ) {
throw new DBTransactionError(
null,
$this->trxRoundStage = self::ROUND_CURSORY;
}
- public function commitMasterChanges( $fname = __METHOD__ ) {
+ public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
$this->assertTransactionRoundStage( self::ROUND_APPROVED );
$failures = [];
$this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
}
- public function runMasterTransactionIdleCallbacks() {
+ public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
$type = IDatabase::TRIGGER_COMMIT;
} elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
return $e;
}
- public function runMasterTransactionListenerCallbacks() {
+ public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
$type = IDatabase::TRIGGER_COMMIT;
} elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
return $e;
}
- public function rollbackMasterChanges( $fname = __METHOD__ ) {
+ public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
+ $this->assertOwnership( $fname, $owner );
+
$restore = ( $this->trxRoundId !== false );
$this->trxRoundId = false;
$this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
/**
* @param string|string[] $stage
+ * @throws DBTransactionError
*/
private function assertTransactionRoundStage( $stage ) {
$stages = (array)$stage;
}
}
+ /**
+ * @param string $fname
+ * @param int|null $owner Owner ID of the caller
+ * @throws DBTransactionError
+ */
+ private function assertOwnership( $fname, $owner ) {
+ if ( $this->ownerId !== null && $owner !== $this->ownerId ) {
+ throw new DBTransactionError(
+ null,
+ "$fname: LoadBalancer is owned by LBFactory #{$this->ownerId} (got '$owner')."
+ );
+ }
+ }
+
/**
* Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round
*