Rename DB_SLAVE constant to DB_REPLICA
[lhc/web/wiklou.git] / includes / objectcache / SqlBagOStuff.php
index 5556dd8..4870d5e 100644 (file)
@@ -21,6 +21,8 @@
  * @ingroup Cache
  */
 
+use \MediaWiki\MediaWikiServices;
+
 /**
  * Class to store objects in the database
  *
@@ -46,6 +48,8 @@ class SqlBagOStuff extends BagOStuff {
        /** @var int */
        protected $syncTimeout = 3;
 
+       /** @var LoadBalancer|null */
+       protected $separateMainLB;
        /** @var array */
        protected $conns;
        /** @var array UNIX timestamps */
@@ -81,11 +85,11 @@ class SqlBagOStuff extends BagOStuff {
         *                  required to hold the largest shard index. Data will be
         *                  distributed across all tables by key hash. This is for
         *                  MySQL bugs 61735 and 61736.
-        *   - slaveOnly:   Whether to only use slave DBs and avoid triggering
+        *   - slaveOnly:   Whether to only use replica DBs and avoid triggering
         *                  garbage collection logic of expired items. This only
         *                  makes sense if the primary DB is used and only if get()
         *                  calls will be used. This is used by ReplicatedBagOStuff.
-        *   - syncTimeout: Max seconds to wait for slaves to catch up for WRITE_SYNC.
+        *   - syncTimeout: Max seconds to wait for replica DBs to catch up for WRITE_SYNC.
         *
         * @param array $params
         */
@@ -112,6 +116,7 @@ class SqlBagOStuff extends BagOStuff {
                        $this->serverInfos = [ $params['server'] ];
                        $this->numServers = count( $this->serverInfos );
                } else {
+                       // Default to using the main wiki's database servers
                        $this->serverInfos = false;
                        $this->numServers = 1;
                }
@@ -130,6 +135,23 @@ class SqlBagOStuff extends BagOStuff {
                $this->slaveOnly = !empty( $params['slaveOnly'] );
        }
 
+       protected function getSeparateMainLB() {
+               global $wgDBtype;
+
+               if ( $wgDBtype === 'mysql' && $this->usesMainDB() ) {
+                       if ( !$this->separateMainLB ) {
+                               // We must keep a separate connection to MySQL in order to avoid deadlocks
+                               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+                               $this->separateMainLB = $lbFactory->newMainLB();
+                       }
+                       return $this->separateMainLB;
+               } else {
+                       // However, SQLite has an opposite behavior. And PostgreSQL needs to know
+                       // if we are in transaction or not (@TODO: find some PostgreSQL work-around).
+                       return null;
+               }
+       }
+
        /**
         * Get a connection to the specified database
         *
@@ -161,16 +183,13 @@ class SqlBagOStuff extends BagOStuff {
                                $db = DatabaseBase::factory( $type, $info );
                                $db->clearFlag( DBO_TRX );
                        } else {
-                               // We must keep a separate connection to MySQL in order to avoid deadlocks
-                               // However, SQLite has an opposite behavior. And PostgreSQL needs to know
-                               // if we are in transaction or not (@TODO: find some work-around).
-                               $index = $this->slaveOnly ? DB_SLAVE : DB_MASTER;
-                               if ( wfGetDB( $index )->getType() == 'mysql' ) {
-                                       $lb = wfGetLBFactory()->newMainLB();
-                                       $db = $lb->getConnection( $index );
+                               $index = $this->slaveOnly ? DB_REPLICA : DB_MASTER;
+                               if ( $this->getSeparateMainLB() ) {
+                                       $db = $this->getSeparateMainLB()->getConnection( $index );
                                        $db->clearFlag( DBO_TRX ); // auto-commit mode
                                } else {
                                        $db = wfGetDB( $index );
+                                       // Can't mess with transaction rounds (DBO_TRX) :(
                                }
                        }
                        $this->logger->debug( sprintf( "Connection %s will be used for SqlBagOStuff", $db ) );
@@ -276,6 +295,7 @@ class SqlBagOStuff extends BagOStuff {
                        if ( isset( $dataRows[$key] ) ) { // HIT?
                                $row = $dataRows[$key];
                                $this->debug( "get: retrieved data; expiry time is " . $row->exptime );
+                               $db = null;
                                try {
                                        $db = $this->getDB( $row->serverIndex );
                                        if ( $this->isExpired( $db, $row->exptime ) ) { // MISS
@@ -284,7 +304,7 @@ class SqlBagOStuff extends BagOStuff {
                                                $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) );
                                        }
                                } catch ( DBQueryError $e ) {
-                                       $this->handleWriteError( $e, $row->serverIndex );
+                                       $this->handleWriteError( $e, $db, $row->serverIndex );
                                }
                        } else { // MISS
                                $this->debug( 'get: no matching rows' );
@@ -306,10 +326,11 @@ class SqlBagOStuff extends BagOStuff {
                $result = true;
                $exptime = (int)$expiry;
                foreach ( $keysByTable as $serverIndex => $serverKeys ) {
+                       $db = null;
                        try {
                                $db = $this->getDB( $serverIndex );
                        } catch ( DBError $e ) {
-                               $this->handleWriteError( $e, $serverIndex );
+                               $this->handleWriteError( $e, $db, $serverIndex );
                                $result = false;
                                continue;
                        }
@@ -342,7 +363,7 @@ class SqlBagOStuff extends BagOStuff {
                                                __METHOD__
                                        );
                                } catch ( DBError $e ) {
-                                       $this->handleWriteError( $e, $serverIndex );
+                                       $this->handleWriteError( $e, $db, $serverIndex );
                                        $result = false;
                                }
 
@@ -356,7 +377,7 @@ class SqlBagOStuff extends BagOStuff {
        public function set( $key, $value, $exptime = 0, $flags = 0 ) {
                $ok = $this->setMulti( [ $key => $value ], $exptime );
                if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
-                       $ok = $ok && $this->waitForSlaves();
+                       $ok = $this->waitForReplication() && $ok;
                }
 
                return $ok;
@@ -364,6 +385,7 @@ class SqlBagOStuff extends BagOStuff {
 
        protected function cas( $casToken, $key, $value, $exptime = 0 ) {
                list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+               $db = null;
                try {
                        $db = $this->getDB( $serverIndex );
                        $exptime = intval( $exptime );
@@ -394,7 +416,7 @@ class SqlBagOStuff extends BagOStuff {
                                __METHOD__
                        );
                } catch ( DBQueryError $e ) {
-                       $this->handleWriteError( $e, $serverIndex );
+                       $this->handleWriteError( $e, $db, $serverIndex );
 
                        return false;
                }
@@ -404,6 +426,7 @@ class SqlBagOStuff extends BagOStuff {
 
        public function delete( $key ) {
                list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+               $db = null;
                try {
                        $db = $this->getDB( $serverIndex );
                        $db->delete(
@@ -411,7 +434,7 @@ class SqlBagOStuff extends BagOStuff {
                                [ 'keyname' => $key ],
                                __METHOD__ );
                } catch ( DBError $e ) {
-                       $this->handleWriteError( $e, $serverIndex );
+                       $this->handleWriteError( $e, $db, $serverIndex );
                        return false;
                }
 
@@ -420,6 +443,7 @@ class SqlBagOStuff extends BagOStuff {
 
        public function incr( $key, $step = 1 ) {
                list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+               $db = null;
                try {
                        $db = $this->getDB( $serverIndex );
                        $step = intval( $step );
@@ -455,7 +479,7 @@ class SqlBagOStuff extends BagOStuff {
                                $newValue = null;
                        }
                } catch ( DBError $e ) {
-                       $this->handleWriteError( $e, $serverIndex );
+                       $this->handleWriteError( $e, $db, $serverIndex );
                        return null;
                }
 
@@ -465,7 +489,7 @@ class SqlBagOStuff extends BagOStuff {
        public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
                $ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts );
                if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
-                       $ok = $ok && $this->waitForSlaves();
+                       $ok = $this->waitForReplication() && $ok;
                }
 
                return $ok;
@@ -473,6 +497,7 @@ class SqlBagOStuff extends BagOStuff {
 
        public function changeTTL( $key, $expiry = 0 ) {
                list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+               $db = null;
                try {
                        $db = $this->getDB( $serverIndex );
                        $db->update(
@@ -485,7 +510,7 @@ class SqlBagOStuff extends BagOStuff {
                                return false;
                        }
                } catch ( DBError $e ) {
-                       $this->handleWriteError( $e, $serverIndex );
+                       $this->handleWriteError( $e, $db, $serverIndex );
                        return false;
                }
 
@@ -542,6 +567,7 @@ class SqlBagOStuff extends BagOStuff {
         */
        public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) {
                for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
+                       $db = null;
                        try {
                                $db = $this->getDB( $serverIndex );
                                $dbTimestamp = $db->timestamp( $timestamp );
@@ -604,7 +630,7 @@ class SqlBagOStuff extends BagOStuff {
                                        }
                                }
                        } catch ( DBError $e ) {
-                               $this->handleWriteError( $e, $serverIndex );
+                               $this->handleWriteError( $e, $db, $serverIndex );
                                return false;
                        }
                }
@@ -618,13 +644,14 @@ class SqlBagOStuff extends BagOStuff {
         */
        public function deleteAll() {
                for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
+                       $db = null;
                        try {
                                $db = $this->getDB( $serverIndex );
                                for ( $i = 0; $i < $this->shards; $i++ ) {
                                        $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
                                }
                        } catch ( DBError $e ) {
-                               $this->handleWriteError( $e, $serverIndex );
+                               $this->handleWriteError( $e, $db, $serverIndex );
                                return false;
                        }
                }
@@ -694,18 +721,19 @@ class SqlBagOStuff extends BagOStuff {
         * Handle a DBQueryError which occurred during a write operation.
         *
         * @param DBError $exception
+        * @param IDatabase|null $db DB handle or null if connection failed
         * @param int $serverIndex
+        * @throws Exception
         */
-       protected function handleWriteError( DBError $exception, $serverIndex ) {
-               if ( $exception instanceof DBConnectionError ) {
+       protected function handleWriteError( DBError $exception, IDatabase $db = null, $serverIndex ) {
+               if ( !$db ) {
                        $this->markServerDown( $exception, $serverIndex );
-               }
-               if ( $exception->db && $exception->db->wasReadOnlyError() ) {
-                       if ( $exception->db->trxLevel() ) {
-                               try {
-                                       $exception->db->rollback( __METHOD__ );
-                               } catch ( DBError $e ) {
-                               }
+               } elseif ( $db->wasReadOnlyError() ) {
+                       if ( $db->trxLevel() && $this->usesMainDB() ) {
+                               // Errors like deadlocks and connection drops already cause rollback.
+                               // For consistency, we have no choice but to throw an error and trigger
+                               // complete rollback if the main DB is also being used as the cache DB.
+                               throw $exception;
                        }
                }
 
@@ -725,7 +753,7 @@ class SqlBagOStuff extends BagOStuff {
         * @param DBError $exception
         * @param int $serverIndex
         */
-       protected function markServerDown( $exception, $serverIndex ) {
+       protected function markServerDown( DBError $exception, $serverIndex ) {
                unset( $this->conns[$serverIndex] ); // bug T103435
 
                if ( isset( $this->connFailureTimes[$serverIndex] ) ) {
@@ -762,18 +790,37 @@ class SqlBagOStuff extends BagOStuff {
                }
        }
 
-       protected function waitForSlaves() {
-               if ( !$this->serverInfos ) {
-                       // Main LB is used; wait for any slaves to catch up
-                       try {
-                               wfGetLBFactory()->waitForReplication( [ 'wiki' => wfWikiID() ] );
-                               return true;
-                       } catch ( DBReplicationWaitError $e ) {
-                               return false;
-                       }
-               } else {
+       /**
+        * @return bool Whether the main DB is used, e.g. wfGetDB( DB_MASTER )
+        */
+       protected function usesMainDB() {
+               return !$this->serverInfos;
+       }
+
+       protected function waitForReplication() {
+               if ( !$this->usesMainDB() ) {
                        // Custom DB server list; probably doesn't use replication
                        return true;
                }
+
+               $lb = $this->getSeparateMainLB()
+                       ?: MediaWikiServices::getInstance()->getDBLoadBalancer();
+
+               if ( $lb->getServerCount() <= 1 ) {
+                       return true; // no replica DBs
+               }
+
+               // Main LB is used; wait for any replica DBs to catch up
+               $masterPos = $lb->getMasterPos();
+
+               $loop = new WaitConditionLoop(
+                       function () use ( $lb, $masterPos ) {
+                               return $lb->waitForAll( $masterPos, 1 );
+                       },
+                       $this->syncTimeout,
+                       $this->busyCallbacks
+               );
+
+               return ( $loop->invoke() === $loop::CONDITION_REACHED );
        }
 }