X-Git-Url: https://git.cyclocoop.org/%242?a=blobdiff_plain;f=includes%2Flibs%2Frdbms%2Floadbalancer%2FLoadBalancer.php;h=757e52d910bf161f82caec795fddc38d415d4649;hb=3cbb232c53971a66cb808df2767a2030e42b9e75;hp=1fd21d08c67cb127301ab0552272b059c09a65b2;hpb=58d70885d875a3928d96a2ca0a74dbc0ec5bb8bb;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/libs/rdbms/loadbalancer/LoadBalancer.php b/includes/libs/rdbms/loadbalancer/LoadBalancer.php index 1fd21d08c6..757e52d910 100644 --- a/includes/libs/rdbms/loadbalancer/LoadBalancer.php +++ b/includes/libs/rdbms/loadbalancer/LoadBalancer.php @@ -64,7 +64,7 @@ class LoadBalancer implements ILoadBalancer { /** @var callable Deprecation logger */ private $deprecationLogger; - /** @var DatabaseDomain Local Domain ID and default for selectDB() calls */ + /** @var DatabaseDomain Local DB domain ID and default for selectDB() calls */ private $localDomain; /** @var Database[][][] Map of (connection category => server index => IDatabase[]) */ @@ -73,7 +73,7 @@ class LoadBalancer implements ILoadBalancer { /** @var array[] Map of (server index => server config array) */ private $servers; /** @var float[] Map of (server index => weight) */ - private $loads; + private $genericLoads; /** @var array[] Map of (group => server index => weight) */ private $groupLoads; /** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */ @@ -82,7 +82,7 @@ class LoadBalancer implements ILoadBalancer { private $waitTimeout; /** @var array The LoadMonitor configuration */ private $loadMonitorConfig; - /** @var string Alternate ID string for the domain instead of DatabaseDomain::getId() */ + /** @var string Alternate local DB domain instead of DatabaseDomain::getId() */ private $localDomainIdAlias; /** @var int */ private $maxLag; @@ -103,9 +103,11 @@ class LoadBalancer implements ILoadBalancer { /** @var Database DB connection object that caused a problem */ private $errorConnection; - /** @var int The generic (not query grouped) replica DB index (of $mServers) */ - private $readIndex; - /** @var bool|DBMasterPos False if not set */ + /** @var int The generic (not query grouped) replica DB index */ + private $genericReadIndex = -1; + /** @var int[] The group replica DB indexes keyed by group */ + private $readIndexByGroup = []; + /** @var bool|DBMasterPos Replication sync position or false if not set */ private $waitForPos; /** @var bool Whether the generic reader fell back to a lagged replica DB */ private $laggedReplicaMode = false; @@ -122,6 +124,8 @@ class LoadBalancer implements ILoadBalancer { /** @var bool Whether any connection has been attempted yet */ private $connectionAttempted = false; + /** @var int|null An integer ID of the managing LBFactory instance or null */ + private $ownerId; /** @var string|bool String if a requested DBO_TRX transaction round is active */ private $trxRoundId = false; /** @var string Stage of the current transaction round in the transaction round life-cycle */ @@ -163,7 +167,7 @@ class LoadBalancer implements ILoadBalancer { public function __construct( array $params ) { if ( !isset( $params['servers'] ) ) { - throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' ); + throw new InvalidArgumentException( __CLASS__ . ': missing "servers" parameter' ); } $this->servers = $params['servers']; foreach ( $this->servers as $i => $server ) { @@ -181,7 +185,6 @@ class LoadBalancer implements ILoadBalancer { $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT; - $this->readIndex = -1; $this->conns = [ // Connection were transaction rounds may be applied self::KEY_LOCAL => [], @@ -192,7 +195,7 @@ class LoadBalancer implements ILoadBalancer { self::KEY_FOREIGN_INUSE_NOROUND => [], self::KEY_FOREIGN_FREE_NOROUND => [] ]; - $this->loads = []; + $this->genericLoads = []; $this->waitForPos = false; $this->allowLagged = false; @@ -206,7 +209,7 @@ class LoadBalancer implements ILoadBalancer { $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ]; foreach ( $params['servers'] as $i => $server ) { - $this->loads[$i] = $server['load']; + $this->genericLoads[$i] = $server['load']; if ( isset( $server['groupLoads'] ) ) { foreach ( $server['groupLoads'] as $group => $ratio ) { if ( !isset( $this->groupLoads[$group] ) ) { @@ -250,6 +253,7 @@ class LoadBalancer implements ILoadBalancer { } $this->defaultGroup = $params['defaultGroup'] ?? null; + $this->ownerId = $params['ownerId'] ?? null; } public function getLocalDomainID() { @@ -257,10 +261,58 @@ class LoadBalancer implements ILoadBalancer { } public function resolveDomainID( $domain ) { - return ( $domain !== false ) ? (string)$domain : $this->getLocalDomainID(); + if ( $domain === $this->localDomainIdAlias || $domain === false ) { + // Local connection requested via some backwards-compatibility domain alias + return $this->getLocalDomainID(); + } + + return (string)$domain; + } + + /** + * @param int $flags + * @return bool + */ + private function sanitizeConnectionFlags( $flags ) { + if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) { + // Assuming all servers are of the same type (or similar), which is overwhelmingly + // the case, use the master server information to get the attributes. The information + // for $i cannot be used since it might be DB_REPLICA, which might require connection + // attempts in order to be resolved into a real server index. + $attributes = $this->getServerAttributes( $this->getWriterIndex() ); + if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) { + // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking + // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions + // to reduce lock contention. None of these apply for sqlite and using separate + // connections just causes self-deadlocks. + $flags &= ~self::CONN_TRX_AUTOCOMMIT; + $this->connLogger->info( __METHOD__ . + ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' ); + } + } + + return $flags; } /** + * @param IDatabase $conn + * @param int $flags + * @throws DBUnexpectedError + */ + private function enforceConnectionFlags( IDatabase $conn, $flags ) { + if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) { + if ( $conn->trxLevel() ) { // sanity + throw new DBUnexpectedError( + $conn, + 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction' + ); + } + + $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode + } + } + + /** * Get a LoadMonitor instance * * @return ILoadMonitor @@ -341,13 +393,62 @@ class LoadBalancer implements ILoadBalancer { return ArrayUtils::pickRandom( $loads ); } + /** + * @param int $i + * @param array $groups + * @param string|bool $domain + * @return int The index of a specific server (replica DBs are checked for connectivity) + */ + private function getConnectionIndex( $i, $groups, $domain ) { + // Check one "group" per default: the generic pool + $defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ]; + + $groups = ( $groups === false || $groups === [] ) + ? $defaultGroups + : (array)$groups; + + if ( $i === self::DB_MASTER ) { + $i = $this->getWriterIndex(); + } elseif ( $i === self::DB_REPLICA ) { + # Try to find an available server in any the query groups (in order) + foreach ( $groups as $group ) { + $groupIndex = $this->getReaderIndex( $group, $domain ); + if ( $groupIndex !== false ) { + $i = $groupIndex; + break; + } + } + } + + # Operation-based index + if ( $i === self::DB_REPLICA ) { + $this->lastError = 'Unknown error'; // reset error string + # Try the general server pool if $groups are unavailable. + $i = ( $groups === [ false ] ) + ? false // don't bother with this if that is what was tried above + : $this->getReaderIndex( false, $domain ); + # Couldn't find a working server in getReaderIndex()? + if ( $i === false ) { + $this->lastError = 'No working replica DB server: ' . $this->lastError; + // Throw an exception + $this->reportConnectionError(); + return null; // unreachable due to exception + } + } + + return $i; + } + public function getReaderIndex( $group = false, $domain = false ) { if ( count( $this->servers ) == 1 ) { // Skip the load balancing if there's only one server return $this->getWriterIndex(); - } elseif ( $group === false && $this->readIndex >= 0 ) { - // Shortcut if the generic reader index was already cached - return $this->readIndex; + } + + $index = $this->getExistingReaderIndex( $group ); + if ( $index >= 0 ) { + // A reader index was already selected and "waitForPos" was handled + return $index; } if ( $group !== false ) { @@ -362,36 +463,32 @@ class LoadBalancer implements ILoadBalancer { } } else { // Use the generic load group - $loads = $this->loads; + $loads = $this->genericLoads; } // Scale the configured load ratios according to each server's load and state $this->getLoadMonitor()->scaleLoads( $loads, $domain ); // Pick a server to use, accounting for weights, load, lag, and "waitForPos" + $this->lazyLoadReplicationPositions(); // optimizes server candidate selection list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain ); if ( $i === false ) { - // Replica DB connection unsuccessful + // DB connection unsuccessful return false; } - if ( $this->waitForPos && $i != $this->getWriterIndex() ) { - // Before any data queries are run, wait for the server to catch up to the - // specified position. This is used to improve session consistency. Note that - // when LoadBalancer::waitFor() sets "waitForPos", the waiting triggers here, - // so update laggedReplicaMode as needed for consistency. - if ( !$this->doWait( $i ) ) { - $laggedReplicaMode = true; - } + // If data seen by queries is expected to reflect the transactions committed as of + // or after a given replication position then wait for the DB to apply those changes + if ( $this->waitForPos && $i != $this->getWriterIndex() && !$this->doWait( $i ) ) { + // Data will be outdated compared to what was expected + $laggedReplicaMode = true; } - if ( $this->readIndex <= 0 && $this->loads[$i] > 0 && $group === false ) { - // Cache the generic reader index for future ungrouped DB_REPLICA handles - $this->readIndex = $i; - // Record if the generic reader index is in "lagged replica DB" mode - if ( $laggedReplicaMode ) { - $this->laggedReplicaMode = true; - } + // Cache the reader index for future DB_REPLICA handles + $this->setExistingReaderIndex( $group, $i ); + // Record whether the generic reader index is in "lagged replica DB" mode + if ( $group === false && $laggedReplicaMode ) { + $this->laggedReplicaMode = true; } $serverName = $this->getServerName( $i ); @@ -400,6 +497,40 @@ class LoadBalancer implements ILoadBalancer { return $i; } + /** + * Get the server index chosen by the load balancer for use with the given query group + * + * @param string|bool $group Query group; use false for the generic group + * @return int Server index or -1 if none was chosen + */ + protected function getExistingReaderIndex( $group ) { + if ( $group === false ) { + $index = $this->genericReadIndex; + } else { + $index = $this->readIndexByGroup[$group] ?? -1; + } + + return $index; + } + + /** + * Set the server index chosen by the load balancer for use with the given query group + * + * @param string|bool $group Query group; use false for the generic group + * @param int $index The index of a specific server + */ + private function setExistingReaderIndex( $group, $index ) { + if ( $index < 0 ) { + throw new UnexpectedValueException( "Cannot set a negative read server index" ); + } + + if ( $group === false ) { + $this->genericReadIndex = $index; + } else { + $this->readIndexByGroup[$group] = $index; + } + } + /** * @param array $loads List of server weights * @param string|bool $domain @@ -407,7 +538,7 @@ class LoadBalancer implements ILoadBalancer { */ private function pickReaderIndex( array $loads, $domain = false ) { if ( $loads === [] ) { - throw new InvalidArgumentException( "Empty server array given to LoadBalancer" ); + throw new InvalidArgumentException( "Server configuration array is empty" ); } /** @var int|bool $i Index of selected server */ @@ -423,6 +554,7 @@ class LoadBalancer implements ILoadBalancer { } else { $i = false; if ( $this->waitForPos && $this->waitForPos->asOfTime() ) { + $this->replLogger->debug( __METHOD__ . ": replication positions detected" ); // "chronologyCallback" sets "waitForPos" for session consistency. // This triggers doWait() after connect, so it's especially good to // avoid lagged servers so as to avoid excessive delay in that method. @@ -455,7 +587,7 @@ class LoadBalancer implements ILoadBalancer { $serverName = $this->getServerName( $i ); $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." ); - $conn = $this->openConnection( $i, $domain ); + $conn = $this->getConnection( $i, [], $domain, self::CONN_SILENCE_ERRORS ); if ( !$conn ) { $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" ); unset( $currentLoads[$i] ); // avoid this server next iteration @@ -486,12 +618,10 @@ class LoadBalancer implements ILoadBalancer { try { $this->waitForPos = $pos; // If a generic reader connection was already established, then wait now - $i = $this->readIndex; - if ( $i > 0 ) { - if ( !$this->doWait( $i ) ) { - $this->laggedReplicaMode = true; - } + if ( $this->genericReadIndex > 0 && !$this->doWait( $this->genericReadIndex ) ) { + $this->laggedReplicaMode = true; } + // Otherwise, wait until a connection is established in getReaderIndex() } finally { // Restore the older position if it was higher since this is used for lag-protection $this->setWaitForPositionIfHigher( $oldPos ); @@ -503,10 +633,10 @@ class LoadBalancer implements ILoadBalancer { try { $this->waitForPos = $pos; - $i = $this->readIndex; + $i = $this->genericReadIndex; if ( $i <= 0 ) { // Pick a generic replica DB if there isn't one yet - $readLoads = $this->loads; + $readLoads = $this->genericLoads; unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only $readLoads = array_filter( $readLoads ); // with non-zero load $i = ArrayUtils::pickRandom( $readLoads ); @@ -535,7 +665,7 @@ class LoadBalancer implements ILoadBalancer { $ok = true; for ( $i = 1; $i < $serverCount; $i++ ) { - if ( $this->loads[$i] > 0 ) { + if ( $this->genericLoads[$i] > 0 ) { $start = microtime( true ); $ok = $this->doWait( $i, true, $timeout ) && $ok; $timeout -= intval( microtime( true ) - $start ); @@ -629,20 +759,20 @@ class LoadBalancer implements ILoadBalancer { ); return false; - } else { - $conn = $this->openConnection( $index, self::DOMAIN_ANY ); - if ( !$conn ) { - $this->replLogger->warning( - __METHOD__ . ': failed to connect to {dbserver}', - [ 'dbserver' => $server ] - ); + } + // Open a temporary new connection in order to wait for replication + $conn = $this->getConnection( $index, [], self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS ); + if ( !$conn ) { + $this->replLogger->warning( + __METHOD__ . ': failed to connect to {dbserver}', + [ 'dbserver' => $server ] + ); - return false; - } - // Avoid connection spam in waitForAll() when connections - // are made just for the sake of doing this lag check. - $close = true; + return false; } + // Avoid connection spam in waitForAll() when connections + // are made just for the sake of doing this lag check. + $close = true; } $this->replLogger->info( @@ -688,88 +818,54 @@ class LoadBalancer implements ILoadBalancer { } public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) { - if ( $i === null || $i === false ) { - throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ . - ' with invalid server index' ); + if ( !is_int( $i ) ) { + throw new InvalidArgumentException( "Cannot connect without a server index" ); + } elseif ( $groups && $i > 0 ) { + throw new InvalidArgumentException( "Got query groups with server index #$i" ); } - if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) { - $domain = false; // local connection requested - } - - if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) { - // Assuming all servers are of the same type (or similar), which is overwhelmingly - // the case, use the master server information to get the attributes. The information - // for $i cannot be used since it might be DB_REPLICA, which might require connection - // attempts in order to be resolved into a real server index. - $attributes = $this->getServerAttributes( $this->getWriterIndex() ); - if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) { - // Callers sometimes want to (a) escape REPEATABLE-READ stateness without locking - // rows (e.g. FOR UPDATE) or (b) make small commits during a larger transactions - // to reduce lock contention. None of these apply for sqlite and using separate - // connections just causes self-deadlocks. - $flags &= ~self::CONN_TRX_AUTOCOMMIT; - $this->connLogger->info( __METHOD__ . - ': ignoring CONN_TRX_AUTOCOMMIT to avoid deadlocks.' ); - } - } - - // Check one "group" per default: the generic pool - $defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ]; - - $groups = ( $groups === false || $groups === [] ) - ? $defaultGroups - : (array)$groups; - + $domain = $this->resolveDomainID( $domain ); + $flags = $this->sanitizeConnectionFlags( $flags ); $masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() ); - $oldConnsOpened = $this->connsOpened; // connections open now - - if ( $i == self::DB_MASTER ) { - $i = $this->getWriterIndex(); - } elseif ( $i == self::DB_REPLICA ) { - # Try to find an available server in any the query groups (in order) - foreach ( $groups as $group ) { - $groupIndex = $this->getReaderIndex( $group, $domain ); - if ( $groupIndex !== false ) { - $i = $groupIndex; - break; - } - } - } - # Operation-based index - if ( $i == self::DB_REPLICA ) { - $this->lastError = 'Unknown error'; // reset error string - # Try the general server pool if $groups are unavailable. - $i = ( $groups === [ false ] ) - ? false // don't bother with this if that is what was tried above - : $this->getReaderIndex( false, $domain ); - # Couldn't find a working server in getReaderIndex()? - if ( $i === false ) { - $this->lastError = 'No working replica DB server: ' . $this->lastError; - // Throw an exception + // Number of connections made before getting the server index and handle + $priorConnectionsMade = $this->connsOpened; + + // Choose a server if $i is DB_MASTER/DB_REPLICA (might trigger new connections) + $serverIndex = $this->getConnectionIndex( $i, $groups, $domain ); + // Get an open connection to that server (might trigger a new connection) + $conn = $this->localDomain->equals( $domain ) + ? $this->getLocalConnection( $serverIndex, $flags ) + : $this->getForeignConnection( $serverIndex, $domain, $flags ); + // Throw an error or bail out if the connection attempt failed + if ( !( $conn instanceof IDatabase ) ) { + if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) { $this->reportConnectionError(); - return null; // not reached } - } - # Now we have an explicit index into the servers array - $conn = $this->openConnection( $i, $domain, $flags ); - if ( !$conn ) { - // Throw an exception - $this->reportConnectionError(); - return null; // not reached + return false; } - # Profile any new connections that happen - if ( $this->connsOpened > $oldConnsOpened ) { + // Profile any new connections caused by this method + if ( $this->connsOpened > $priorConnectionsMade ) { $host = $conn->getServer(); $dbname = $conn->getDBname(); $this->trxProfiler->recordConnection( $host, $dbname, $masterOnly ); } - if ( $masterOnly ) { - # Make master-requested DB handles inherit any read-only mode setting + if ( !$conn->isOpen() ) { + // Connection was made but later unrecoverably lost for some reason. + // Do not return a handle that will just throw exceptions on use, + // but let the calling code (e.g. getReaderIndex) try another server. + $this->errorConnection = $conn; + return false; + } + + $this->enforceConnectionFlags( $conn, $flags ); + if ( $serverIndex == $this->getWriterIndex() ) { + // If the load balancer is read-only, perhaps due to replication lag, then master + // DB handles will reflect that. Note that Database::assertIsWritableMaster() takes + // care of replica DB handles whereas getReadOnlyReason() would cause infinite loops. $conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) ); } @@ -815,11 +911,11 @@ class LoadBalancer implements ILoadBalancer { $domain = $conn->getDomainID(); if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) { - throw new InvalidArgumentException( __METHOD__ . - ": connection $serverIndex/$domain not found; it may have already been freed." ); + throw new InvalidArgumentException( + "Connection $serverIndex/$domain not found; it may have already been freed" ); } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) { - throw new InvalidArgumentException( __METHOD__ . - ": connection $serverIndex/$domain mismatched; it may have already been freed." ); + throw new InvalidArgumentException( + "Connection $serverIndex/$domain mismatched; it may have already been freed" ); } $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); @@ -836,72 +932,47 @@ class LoadBalancer implements ILoadBalancer { } } - public function getConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) { + public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) { $domain = $this->resolveDomainID( $domain ); + $role = $this->getRoleFromIndex( $i ); - return new DBConnRef( $this, $this->getConnection( $db, $groups, $domain, $flags ) ); + return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role ); } - public function getLazyConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) { + public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) { $domain = $this->resolveDomainID( $domain ); + $role = $this->getRoleFromIndex( $i ); - return new DBConnRef( $this, [ $db, $groups, $domain, $flags ] ); + return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role ); } - public function getMaintenanceConnectionRef( $db, $groups = [], $domain = false, $flags = 0 ) { + public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) { $domain = $this->resolveDomainID( $domain ); + $role = $this->getRoleFromIndex( $i ); return new MaintainableDBConnRef( - $this, $this->getConnection( $db, $groups, $domain, $flags ) ); + $this, $this->getConnection( $i, $groups, $domain, $flags ), $role ); } - public function openConnection( $i, $domain = false, $flags = 0 ) { - if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) { - $domain = false; // local connection requested - } - - if ( !$this->connectionAttempted && $this->chronologyCallback ) { - $this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' ); - // Load any "waitFor" positions before connecting so that doWait() is triggered - $this->connectionAttempted = true; - ( $this->chronologyCallback )( $this ); - } - - // Check if an auto-commit connection is being requested. If so, it will not reuse the - // main set of DB connections but rather its own pool since: - // a) those are usually set to implicitly use transaction rounds via DBO_TRX - // b) those must support the use of explicit transaction rounds via beginMasterChanges() - $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ); - - if ( $domain !== false ) { - // Connection is to a foreign domain - $conn = $this->openForeignConnection( $i, $domain, $flags ); - } else { - // Connection is to the local domain - $conn = $this->openLocalConnection( $i, $flags ); - } - - if ( $conn instanceof IDatabase && !$conn->isOpen() ) { - // Connection was made but later unrecoverably lost for some reason. - // Do not return a handle that will just throw exceptions on use, - // but let the calling code (e.g. getReaderIndex) try another server. - // See DatabaseMyslBase::ping() for how this can happen. - $this->errorConnection = $conn; - $conn = false; - } - - if ( $autoCommit && $conn instanceof IDatabase ) { - if ( $conn->trxLevel() ) { // sanity - throw new DBUnexpectedError( - $conn, - __METHOD__ . ': CONN_TRX_AUTOCOMMIT handle has a transaction.' - ); - } - - $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode - } + /** + * @param int $i Server index or DB_MASTER/DB_REPLICA + * @return int One of DB_MASTER/DB_REPLICA + */ + private function getRoleFromIndex( $i ) { + return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() ) + ? self::DB_MASTER + : self::DB_REPLICA; + } - return $conn; + /** + * @param int $i + * @param bool $domain + * @param int $flags + * @return Database|bool Live database handle or false on failure + * @deprecated Since 1.34 Use getConnection() instead + */ + public function openConnection( $i, $domain = false, $flags = 0 ) { + return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS ); } /** @@ -916,7 +987,9 @@ class LoadBalancer implements ILoadBalancer { * @param int $flags Class CONN_* constant bitfield * @return Database */ - private function openLocalConnection( $i, $flags = 0 ) { + private function getLocalConnection( $i, $flags = 0 ) { + // Connection handles required to be in auto-commit mode use a separate connection + // pool since the main pool is effected by implicit and explicit transaction rounds $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ); $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL; @@ -924,7 +997,7 @@ class LoadBalancer implements ILoadBalancer { $conn = $this->conns[$connKey][$i][0]; } else { if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) { - throw new InvalidArgumentException( "No server with index '$i'." ); + throw new InvalidArgumentException( "No server with index '$i'" ); } // Open a new connection $server = $this->servers[$i]; @@ -951,7 +1024,7 @@ class LoadBalancer implements ILoadBalancer { ) { throw new UnexpectedValueException( "Got connection to '{$conn->getDomainID()}', " . - "but expected local domain ('{$this->localDomain}')." ); + "but expected local domain ('{$this->localDomain}')" ); } return $conn; @@ -979,8 +1052,10 @@ class LoadBalancer implements ILoadBalancer { * @return Database|bool Returns false on connection error * @throws DBError When database selection fails */ - private function openForeignConnection( $i, $domain, $flags = 0 ) { + private function getForeignConnection( $i, $domain, $flags = 0 ) { $domainInstance = DatabaseDomain::newFromId( $domain ); + // Connection handles required to be in auto-commit mode use a separate connection + // pool since the main pool is effected by implicit and explicit transaction rounds $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ); if ( $autoCommit ) { @@ -1036,7 +1111,7 @@ class LoadBalancer implements ILoadBalancer { if ( !$conn ) { if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) { - throw new InvalidArgumentException( "No server with index '$i'." ); + throw new InvalidArgumentException( "No server with index '$i'" ); } // Open a new connection $server = $this->servers[$i]; @@ -1060,7 +1135,7 @@ class LoadBalancer implements ILoadBalancer { // Final sanity check to make sure the right domain is selected if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) { throw new UnexpectedValueException( - "Got connection to '{$conn->getDomainID()}', but expected '$domain'." ); + "Got connection to '{$conn->getDomainID()}', but expected '$domain'" ); } // Increment reference count $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); @@ -1179,9 +1254,22 @@ class LoadBalancer implements ILoadBalancer { } } + $this->lazyLoadReplicationPositions(); // session consistency + return $db; } + /** + * Make sure that any "waitForPos" positions are loaded and available to doWait() + */ + private function lazyLoadReplicationPositions() { + if ( !$this->connectionAttempted && $this->chronologyCallback ) { + $this->connectionAttempted = true; + ( $this->chronologyCallback )( $this ); // generally calls waitFor() + $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' ); + } + } + /** * @throws DBConnectionError */ @@ -1222,7 +1310,7 @@ class LoadBalancer implements ILoadBalancer { } public function isNonZeroLoad( $i ) { - return array_key_exists( $i, $this->servers ) && $this->loads[$i] != 0; + return array_key_exists( $i, $this->servers ) && $this->genericLoads[$i] != 0; } public function getServerCount() { @@ -1290,7 +1378,7 @@ class LoadBalancer implements ILoadBalancer { public function closeConnection( IDatabase $conn ) { if ( $conn instanceof DBConnRef ) { // Avoid calling close() but still leaving the handle in the pool - throw new RuntimeException( __METHOD__ . ': got DBConnRef instance.' ); + throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' ); } $serverIndex = $conn->getLBInfo( 'serverIndex' ); @@ -1314,13 +1402,14 @@ class LoadBalancer implements ILoadBalancer { $conn->close(); } - public function commitAll( $fname = __METHOD__ ) { - $this->commitMasterChanges( $fname ); + public function commitAll( $fname = __METHOD__, $owner = null ) { + $this->commitMasterChanges( $fname, $owner ); $this->flushMasterSnapshots( $fname ); $this->flushReplicaSnapshots( $fname ); } - public function finalizeMasterChanges() { + public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) { + $this->assertOwnership( $fname, $owner ); $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] ); $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise @@ -1344,7 +1433,8 @@ class LoadBalancer implements ILoadBalancer { return $total; } - public function approveMasterChanges( array $options ) { + public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) { + $this->assertOwnership( $fname, $owner ); $this->assertTransactionRoundStage( self::ROUND_FINALIZED ); $limit = $options['maxWriteDuration'] ?? 0; @@ -1361,7 +1451,7 @@ class LoadBalancer implements ILoadBalancer { if ( $limit > 0 && $time > $limit ) { throw new DBTransactionSizeError( $conn, - "Transaction spent $time second(s) in writes, exceeding the limit of $limit.", + "Transaction spent $time second(s) in writes, exceeding the limit of $limit", [ $time, $limit ] ); } @@ -1370,18 +1460,19 @@ class LoadBalancer implements ILoadBalancer { if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) { throw new DBTransactionError( $conn, - "A connection to the {$conn->getDBname()} database was lost before commit." + "A connection to the {$conn->getDBname()} database was lost before commit" ); } } ); $this->trxRoundStage = self::ROUND_APPROVED; } - public function beginMasterChanges( $fname = __METHOD__ ) { + public function beginMasterChanges( $fname = __METHOD__, $owner = null ) { + $this->assertOwnership( $fname, $owner ); if ( $this->trxRoundId !== false ) { throw new DBTransactionError( null, - "$fname: Transaction round '{$this->trxRoundId}' already started." + "$fname: Transaction round '{$this->trxRoundId}' already started" ); } $this->assertTransactionRoundStage( self::ROUND_CURSORY ); @@ -1401,7 +1492,8 @@ class LoadBalancer implements ILoadBalancer { $this->trxRoundStage = self::ROUND_CURSORY; } - public function commitMasterChanges( $fname = __METHOD__ ) { + public function commitMasterChanges( $fname = __METHOD__, $owner = null ) { + $this->assertOwnership( $fname, $owner ); $this->assertTransactionRoundStage( self::ROUND_APPROVED ); $failures = []; @@ -1439,7 +1531,8 @@ class LoadBalancer implements ILoadBalancer { $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS; } - public function runMasterTransactionIdleCallbacks() { + public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) { + $this->assertOwnership( $fname, $owner ); if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { $type = IDatabase::TRIGGER_COMMIT; } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { @@ -1508,7 +1601,8 @@ class LoadBalancer implements ILoadBalancer { return $e; } - public function runMasterTransactionListenerCallbacks() { + public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) { + $this->assertOwnership( $fname, $owner ); if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) { $type = IDatabase::TRIGGER_COMMIT; } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) { @@ -1535,7 +1629,9 @@ class LoadBalancer implements ILoadBalancer { return $e; } - public function rollbackMasterChanges( $fname = __METHOD__ ) { + public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) { + $this->assertOwnership( $fname, $owner ); + $restore = ( $this->trxRoundId !== false ); $this->trxRoundId = false; $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise @@ -1553,6 +1649,7 @@ class LoadBalancer implements ILoadBalancer { /** * @param string|string[] $stage + * @throws DBTransactionError */ private function assertTransactionRoundStage( $stage ) { $stages = (array)$stage; @@ -1571,6 +1668,20 @@ class LoadBalancer implements ILoadBalancer { } } + /** + * @param string $fname + * @param int|null $owner Owner ID of the caller + * @throws DBTransactionError + */ + private function assertOwnership( $fname, $owner ) { + if ( $this->ownerId !== null && $owner !== $this->ownerId ) { + throw new DBTransactionError( + null, + "$fname: LoadBalancer is owned by LBFactory #{$this->ownerId} (got '$owner')." + ); + } + } + /** * Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round * @@ -1672,12 +1783,17 @@ class LoadBalancer implements ILoadBalancer { } public function getLaggedReplicaMode( $domain = false ) { - // No-op if there is only one DB (also avoids recursion) - if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) { + if ( + // Avoid recursion if there is only one DB + $this->getServerCount() > 1 && + // Avoid recursion if the (non-zero load) master DB was picked for generic reads + $this->genericReadIndex !== $this->getWriterIndex() && + // Stay in lagged replica mode during the load balancer instance lifetime + !$this->laggedReplicaMode + ) { try { - // See if laggedReplicaMode gets set - $conn = $this->getConnection( self::DB_REPLICA, false, $domain ); - $this->reuseConnection( $conn ); + // Calling this method will set "laggedReplicaMode" as needed + $this->getReaderIndex( false, $domain ); } catch ( DBConnectionError $e ) { // Avoid expensive re-connect attempts and failures $this->allReplicasDownMode = true; @@ -1815,7 +1931,7 @@ class LoadBalancer implements ILoadBalancer { $lagTimes = $this->getLagTimes( $domain ); foreach ( $lagTimes as $i => $lag ) { - if ( $this->loads[$i] > 0 && $lag > $maxLag ) { + if ( $this->genericLoads[$i] > 0 && $lag > $maxLag ) { $maxLag = $lag; $host = $this->servers[$i]['host']; $maxIndex = $i; @@ -1860,11 +1976,12 @@ class LoadBalancer implements ILoadBalancer { if ( !$pos ) { // Get the current master position, opening a connection if needed - $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() ); + $index = $this->getWriterIndex(); + $masterConn = $this->getAnyOpenConnection( $index ); if ( $masterConn ) { $pos = $masterConn->getMasterPos(); } else { - $masterConn = $this->openConnection( $this->getWriterIndex(), self::DOMAIN_ANY ); + $masterConn = $this->getConnection( $index, [], self::DOMAIN_ANY, self::CONN_SILENCE_ERRORS ); if ( !$masterConn ) { throw new DBReplicationWaitError( null, @@ -1948,7 +2065,7 @@ class LoadBalancer implements ILoadBalancer { if ( $domainsInUse ) { $domains = implode( ', ', $domainsInUse ); throw new DBUnexpectedError( null, - "Foreign domain connections are still in use ($domains)." ); + "Foreign domain connections are still in use ($domains)" ); } $this->setLocalDomain( new DatabaseDomain(