private $connectionAttempted = false;
/** @var int */
private $maxLag = self::MAX_LAG_DEFAULT;
+ /** @var string Stage of the current transaction round in the transaction round life-cycle */
+ private $trxRoundStage = self::ROUND_CURSORY;
/** @var int Warn when this many connection are held */
const CONN_HELD_WARN_THRESHOLD = 10;
const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
+ /** @var string Transaction round, explicit or implicit, has not finished writing */
+ const ROUND_CURSORY = 'cursory';
+ /** @var string Transaction round writes are complete and ready for pre-commit checks */
+ const ROUND_FINALIZED = 'finalized';
+ /** @var string Transaction round passed final pre-commit checks */
+ const ROUND_APPROVED = 'approved';
+ /** @var string Transaction round was committed and post-commit callbacks must be run */
+ const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
+ /** @var string Transaction round was rolled back and post-rollback callbacks must be run */
+ const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
+ /** @var string Transaction round encountered an error */
+ const ROUND_ERROR = 'error';
+
public function __construct( array $params ) {
if ( !isset( $params['servers'] ) ) {
throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
if ( isset( $params['chronologyCallback'] ) ) {
$this->chronologyCallback = $params['chronologyCallback'];
}
+
+ if ( isset( $params['roundStage'] ) ) {
+ if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
+ $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
+ } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
+ $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
+ }
+ }
}
/**
return ( $name != '' ) ? $name : 'localhost';
}
+ public function getServerInfo( $i ) {
+ if ( isset( $this->servers[$i] ) ) {
+ return $this->servers[$i];
+ } else {
+ return false;
+ }
+ }
+
public function getServerType( $i ) {
return isset( $this->servers[$i]['type'] ) ? $this->servers[$i]['type'] : 'unknown';
}
}
public function commitAll( $fname = __METHOD__ ) {
- $failures = [];
-
- $restore = ( $this->trxRoundId !== false );
- $this->trxRoundId = false;
- $this->forEachOpenConnection(
- function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
- try {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- } catch ( DBError $e ) {
- call_user_func( $this->errorLogger, $e );
- $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
- }
- if ( $restore && $conn->getLBInfo( 'master' ) ) {
- $this->undoTransactionRoundFlags( $conn );
- }
- }
- );
-
- if ( $failures ) {
- throw new DBExpectedError(
- null,
- "Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
- );
- }
+ $this->commitMasterChanges( $fname );
+ $this->flushMasterSnapshots( $fname );
+ $this->flushReplicaSnapshots( $fname );
}
public function finalizeMasterChanges() {
+ $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
+
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+ // Loop until callbacks stop adding callbacks on other connections
+ $total = 0;
+ do {
+ $count = 0; // callbacks execution attempts
+ $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
+ // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
+ // Any error should cause all (peer) transactions to be rolled back together.
+ $count += $conn->runOnTransactionPreCommitCallbacks();
+ } );
+ $total += $count;
+ } while ( $count > 0 );
+ // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
$this->forEachOpenMasterConnection( function ( Database $conn ) {
- // Any error should cause all DB transactions to be rolled back together
- $conn->setTrxEndCallbackSuppression( false );
- $conn->runOnTransactionPreCommitCallbacks();
- // Defer post-commit callbacks until COMMIT finishes for all DBs
$conn->setTrxEndCallbackSuppression( true );
} );
+ $this->trxRoundStage = self::ROUND_FINALIZED;
+
+ return $total;
}
public function approveMasterChanges( array $options ) {
+ $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
+
$limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
+
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
$this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
// If atomic sections or explicit transactions are still open, some caller must have
// caught an exception but failed to properly rollback any changes. Detect that and
);
}
} );
+ $this->trxRoundStage = self::ROUND_APPROVED;
}
public function beginMasterChanges( $fname = __METHOD__ ) {
"$fname: Transaction round '{$this->trxRoundId}' already started."
);
}
- $this->trxRoundId = $fname;
+ $this->assertTransactionRoundStage( self::ROUND_CURSORY );
- $failures = [];
- $this->forEachOpenMasterConnection(
- function ( Database $conn ) use ( $fname, &$failures ) {
- $conn->setTrxEndCallbackSuppression( true );
- try {
- $conn->flushSnapshot( $fname );
- } catch ( DBError $e ) {
- call_user_func( $this->errorLogger, $e );
- $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
- }
- $conn->setTrxEndCallbackSuppression( false );
- $this->applyTransactionRoundFlags( $conn );
- }
- );
+ // Clear any empty transactions (no writes/callbacks) from the implicit round
+ $this->flushMasterSnapshots( $fname );
- if ( $failures ) {
- throw new DBExpectedError(
- null,
- "$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
- );
- }
+ $this->trxRoundId = $fname;
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+ // Mark applicable handles as participating in this explicit transaction round.
+ // For each of these handles, any writes and callbacks will be tied to a single
+ // transaction. The (peer) handles will reject begin()/commit() calls unless they
+ // are part of an en masse commit or an en masse rollback.
+ $this->forEachOpenMasterConnection( function ( Database $conn ) {
+ $this->applyTransactionRoundFlags( $conn );
+ } );
+ $this->trxRoundStage = self::ROUND_CURSORY;
}
public function commitMasterChanges( $fname = __METHOD__ ) {
+ $this->assertTransactionRoundStage( self::ROUND_APPROVED );
+
$failures = [];
/** @noinspection PhpUnusedLocalVariableInspection */
$restore = ( $this->trxRoundId !== false );
$this->trxRoundId = false;
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+ // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
+ // Note that callbacks should already be suppressed due to finalizeMasterChanges().
$this->forEachOpenMasterConnection(
- function ( IDatabase $conn ) use ( $fname, $restore, &$failures ) {
+ function ( IDatabase $conn ) use ( $fname, &$failures ) {
try {
- if ( $conn->writesOrCallbacksPending() ) {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- } elseif ( $restore ) {
- $conn->flushSnapshot( $fname );
- }
+ $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
} catch ( DBError $e ) {
call_user_func( $this->errorLogger, $e );
$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
}
- if ( $restore ) {
- $this->undoTransactionRoundFlags( $conn );
- }
}
);
-
if ( $failures ) {
- throw new DBExpectedError(
+ throw new DBTransactionError(
null,
"$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
);
}
+ if ( $restore ) {
+ // Unmark handles as participating in this explicit transaction round
+ $this->forEachOpenMasterConnection( function ( Database $conn ) {
+ $this->undoTransactionRoundFlags( $conn );
+ } );
+ }
+ $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
}
- public function runMasterPostTrxCallbacks( $type ) {
+ public function runMasterTransactionIdleCallbacks() {
+ if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+ $type = IDatabase::TRIGGER_COMMIT;
+ } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+ $type = IDatabase::TRIGGER_ROLLBACK;
+ } else {
+ throw new DBTransactionError(
+ null,
+ "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+ );
+ }
+
+ $oldStage = $this->trxRoundStage;
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+
+ // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
+ $this->forEachOpenMasterConnection( function ( Database $conn ) {
+ $conn->setTrxEndCallbackSuppression( false );
+ } );
+
$e = null; // first exception
+ // Loop until callbacks stop adding callbacks on other connections
+ do {
+ // Run any pending callbacks for each connection...
+ $count = 0; // callback execution attempts
+ $this->forEachOpenMasterConnection(
+ function ( Database $conn ) use ( $type, &$e, &$count ) {
+ if ( $conn->trxLevel() ) {
+ return; // retry in the next iteration, after commit() is called
+ }
+ try {
+ $count += $conn->runOnTransactionIdleCallbacks( $type );
+ } catch ( Exception $ex ) {
+ $e = $e ?: $ex;
+ }
+ }
+ );
+ // Clear out any active transactions left over from callbacks...
+ $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e ) {
+ if ( $conn->writesPending() ) {
+ // A callback from another handle wrote to this one and DBO_TRX is set
+ $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
+ $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
+ $this->queryLogger->warning(
+ __METHOD__ . ": found writes pending ($fnames).",
+ [
+ 'db_server' => $conn->getServer(),
+ 'db_name' => $conn->getDBname()
+ ]
+ );
+ } elseif ( $conn->trxLevel() ) {
+ // A callback from another handle read from this one and DBO_TRX is set,
+ // which can easily happen if there is only one DB (no replicas)
+ $this->queryLogger->debug( __METHOD__ . ": found empty transaction." );
+ }
+ try {
+ $conn->commit( __METHOD__, $conn::FLUSHING_ALL_PEERS );
+ } catch ( Exception $ex ) {
+ $e = $e ?: $ex;
+ }
+ } );
+ } while ( $count > 0 );
+
+ $this->trxRoundStage = $oldStage;
+
+ return $e;
+ }
+
+ public function runMasterTransactionListenerCallbacks() {
+ if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
+ $type = IDatabase::TRIGGER_COMMIT;
+ } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
+ $type = IDatabase::TRIGGER_ROLLBACK;
+ } else {
+ throw new DBTransactionError(
+ null,
+ "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
+ );
+ }
+
+ $e = null;
+
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
$this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
- $conn->setTrxEndCallbackSuppression( false );
- // Callbacks run in AUTO-COMMIT mode, so make sure no transactions are pending...
- if ( $conn->writesPending() ) {
- // This happens if onTransactionIdle() callbacks write to *other* handles
- // (which already finished their callbacks). Let any callbacks run in the final
- // commitMasterChanges() in LBFactory::shutdown(), when the transaction is gone.
- $this->queryLogger->warning( __METHOD__ . ": found writes pending." );
- return;
- } elseif ( $conn->trxLevel() ) {
- // This happens for single-DB setups where DB_REPLICA uses the master DB,
- // thus leaving an implicit read-only transaction open at this point. It
- // also happens if onTransactionIdle() callbacks leave implicit transactions
- // open on *other* DBs (which is slightly improper). Let these COMMIT on the
- // next call to commitMasterChanges(), possibly in LBFactory::shutdown().
- return;
- }
- try {
- $conn->runOnTransactionIdleCallbacks( $type );
- } catch ( Exception $ex ) {
- $e = $e ?: $ex;
- }
try {
$conn->runTransactionListenerCallbacks( $type );
} catch ( Exception $ex ) {
$e = $e ?: $ex;
}
} );
+ $this->trxRoundStage = self::ROUND_CURSORY;
return $e;
}
public function rollbackMasterChanges( $fname = __METHOD__ ) {
$restore = ( $this->trxRoundId !== false );
$this->trxRoundId = false;
- $this->forEachOpenMasterConnection(
- function ( IDatabase $conn ) use ( $fname, $restore ) {
- $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
- if ( $restore ) {
- $this->undoTransactionRoundFlags( $conn );
- }
- }
- );
+ $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
+ $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+ $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
+ } );
+ if ( $restore ) {
+ // Unmark handles as participating in this explicit transaction round
+ $this->forEachOpenMasterConnection( function ( Database $conn ) {
+ $this->undoTransactionRoundFlags( $conn );
+ } );
+ }
+ $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
}
- public function suppressTransactionEndCallbacks() {
- $this->forEachOpenMasterConnection( function ( Database $conn ) {
- $conn->setTrxEndCallbackSuppression( true );
- } );
+ /**
+ * @param string|string[] $stage
+ */
+ private function assertTransactionRoundStage( $stage ) {
+ $stages = (array)$stage;
+
+ if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
+ $stageList = implode(
+ '/',
+ array_map( function ( $v ) {
+ return "'$v'";
+ }, $stages )
+ );
+ throw new DBTransactionError(
+ null,
+ "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
+ );
+ }
}
/**
* transaction rounds and remain in auto-commit mode. Such behavior might be desired
* when a DB server is used for something like simple key/value storage.
*
- * @param IDatabase $conn
+ * @param Database $conn
*/
- private function applyTransactionRoundFlags( IDatabase $conn ) {
+ private function applyTransactionRoundFlags( Database $conn ) {
if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
return; // transaction rounds do not apply to these connections
}
}
/**
- * @param IDatabase $conn
+ * @param Database $conn
*/
- private function undoTransactionRoundFlags( IDatabase $conn ) {
+ private function undoTransactionRoundFlags( Database $conn ) {
if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
return; // transaction rounds do not apply to these connections
}
}
public function flushReplicaSnapshots( $fname = __METHOD__ ) {
- $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) {
- $conn->flushSnapshot( __METHOD__ );
+ $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
+ $conn->flushSnapshot( $fname );
} );
}
+ public function flushMasterSnapshots( $fname = __METHOD__ ) {
+ $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
+ $conn->flushSnapshot( $fname );
+ } );
+ }
+
+ /**
+ * @return string
+ * @since 1.32
+ */
+ public function getTransactionRoundStage() {
+ return $this->trxRoundStage;
+ }
+
public function hasMasterConnection() {
return $this->isOpen( $this->getWriterIndex() );
}
}
}
+/**
+ * @deprecated since 1.29
+ */
class_alias( LoadBalancer::class, 'LoadBalancer' );