From: Aaron Schulz Date: Sat, 13 Aug 2016 01:56:41 +0000 (-0700) Subject: Database transaction flushing cleanups X-Git-Tag: 1.31.0-rc.0~6042^2 X-Git-Url: http://git.cyclocoop.org/%22.%20generer_url_ecrire%28%22sites_tous%22%2C%22%22%29.%20%22?a=commitdiff_plain;h=4bcfe7d0ad73fc75b7131425479f2b337b0df28d;p=lhc%2Fweb%2Fwiklou.git Database transaction flushing cleanups * Do not commit() inside masterPosWait(). This could happen inside JobRunner::commitMasterChanges, resulting in one DB committing while the others may or may not later commit. * Migrate some commit() callers to commitMasterChanges(). * Removed unsafe upload class commit() which could break outer transactions. * Also cleaned up the "flush" flag to make it harder to misuse. Change-Id: I75193baaed979100c5338abe0c0899abba3444cb --- diff --git a/includes/WatchedItemStore.php b/includes/WatchedItemStore.php index 89ca50c00b..92cdee5f06 100644 --- a/includes/WatchedItemStore.php +++ b/includes/WatchedItemStore.php @@ -741,6 +741,7 @@ class WatchedItemStore implements StatsdAwareInterface { global $wgUpdateRowsPerQuery; $dbw = $this->getConnection( DB_MASTER ); + $factory = wfGetLBFactory(); $watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery ); foreach ( $watchersChunks as $watchersChunk ) { @@ -754,8 +755,8 @@ class WatchedItemStore implements StatsdAwareInterface { ], $fname ); if ( count( $watchersChunks ) > 1 ) { - $dbw->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] ); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] ); } } $this->uncacheLinkTarget( $target ); diff --git a/includes/db/Database.php b/includes/db/Database.php index 4e358d4c78..3ea2cdaa18 100644 --- a/includes/db/Database.php +++ b/includes/db/Database.php @@ -703,7 +703,7 @@ abstract class DatabaseBase implements IDatabase { " performing implicit commit before closing connection!" ); } - $this->commit( __METHOD__, 'flush' ); + $this->commit( __METHOD__, self::FLUSHING_INTERNAL ); } $closed = $this->closeConnection(); @@ -2651,7 +2651,7 @@ abstract class DatabaseBase implements IDatabase { } if ( !$this->mTrxAtomicLevels && $this->mTrxAutomaticAtomic ) { - $this->commit( $fname, 'flush' ); + $this->commit( $fname, self::FLUSHING_INTERNAL ); } } @@ -2746,7 +2746,7 @@ abstract class DatabaseBase implements IDatabase { ); } - if ( $flush === 'flush' ) { + if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) { if ( !$this->mTrxLevel ) { return; // nothing to do } elseif ( !$this->mTrxAutomatic ) { @@ -2796,7 +2796,7 @@ abstract class DatabaseBase implements IDatabase { } final public function rollback( $fname = __METHOD__, $flush = '' ) { - if ( $flush !== 'flush' ) { + if ( $flush !== self::FLUSHING_INTERNAL && $flush !== self::FLUSHING_ALL_PEERS ) { if ( !$this->mTrxLevel ) { wfWarn( "$fname: No transaction to rollback, something got out of sync!" ); return; // nothing to do @@ -3302,11 +3302,11 @@ abstract class DatabaseBase implements IDatabase { } $unlocker = new ScopedCallback( function () use ( $lockKey, $fname ) { - $this->commit( __METHOD__, 'flush' ); + $this->commit( __METHOD__, self::FLUSHING_INTERNAL ); $this->unlock( $lockKey, $fname ); } ); - $this->commit( __METHOD__, 'flush' ); + $this->commit( __METHOD__, self::FLUSHING_INTERNAL ); return $unlocker; } diff --git a/includes/db/DatabaseMysqlBase.php b/includes/db/DatabaseMysqlBase.php index 6380fe7654..d1ebe62f2e 100644 --- a/includes/db/DatabaseMysqlBase.php +++ b/includes/db/DatabaseMysqlBase.php @@ -767,9 +767,6 @@ abstract class DatabaseMysqlBase extends Database { return 0; // already reached this point for sure } - // Commit any open transactions - $this->commit( __METHOD__, 'flush' ); - // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set if ( $this->useGTIDs && $pos->gtids ) { // Wait on the GTID set (MariaDB only) diff --git a/includes/db/IDatabase.php b/includes/db/IDatabase.php index 43592e2743..af024b8517 100644 --- a/includes/db/IDatabase.php +++ b/includes/db/IDatabase.php @@ -33,11 +33,18 @@ * @ingroup Database */ interface IDatabase { - /* Constants to onTransactionResolution() callbacks */ + /** @var int Callback triggered immediately due to no active transaction */ const TRIGGER_IDLE = 1; + /** @var int Callback triggered by commit */ const TRIGGER_COMMIT = 2; + /** @var int Callback triggered by rollback */ const TRIGGER_ROLLBACK = 3; + /** @var string Transaction operation comes from service managing all DBs */ + const FLUSHING_ALL_PEERS = 'flush'; + /** @var string Transaction operation comes from the database class internally */ + const FLUSHING_INTERNAL = 'flush'; + /** * A string describing the current software version, and possibly * other details in a user-friendly way. Will be listed on Special:Version, etc. @@ -1366,9 +1373,9 @@ interface IDatabase { * Nesting of transactions is not supported. * * @param string $fname - * @param string $flush Flush flag, set to 'flush' to disable warnings about - * explicitly committing implicit transactions, or calling commit when no - * transaction is in progress. + * @param string $flush Flush flag, set to situationally valid IDatabase::FLUSHING_* + * constant to disable warnings about explicitly committing implicit transactions, + * or calling commit when no transaction is in progress. * * This will trigger an exception if there is an ongoing explicit transaction. * @@ -1386,10 +1393,10 @@ interface IDatabase { * No-op on non-transactional databases. * * @param string $fname - * @param string $flush Flush flag, set to 'flush' to disable warnings about - * calling rollback when no transaction is in progress. This will silently - * break any ongoing explicit transaction. Only set the flush flag if you - * are sure that it is safe to ignore these warnings in your context. + * @param string $flush Flush flag, set to a situationally valid IDatabase::FLUSHING_* + * constant to disable warnings about calling rollback when no transaction is in + * progress. This will silently break any ongoing explicit transaction. Only set the + * flush flag if you are sure that it is safe to ignore these warnings in your context. * @throws DBUnexpectedError * @since 1.23 Added $flush parameter */ diff --git a/includes/db/loadbalancer/LoadBalancer.php b/includes/db/loadbalancer/LoadBalancer.php index a7c486c69c..d08172a5e8 100644 --- a/includes/db/loadbalancer/LoadBalancer.php +++ b/includes/db/loadbalancer/LoadBalancer.php @@ -1062,7 +1062,7 @@ class LoadBalancer { */ public function commitAll( $fname = __METHOD__ ) { $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) { - $conn->commit( $fname, 'flush' ); + $conn->commit( $fname, IDatabase::FLUSHING_ALL_PEERS ); } ); } @@ -1109,7 +1109,7 @@ class LoadBalancer { public function commitMasterChanges( $fname = __METHOD__ ) { $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) { if ( $conn->writesOrCallbacksPending() ) { - $conn->commit( $fname, 'flush' ); + $conn->commit( $fname, IDatabase::FLUSHING_ALL_PEERS ); } } ); } @@ -1143,7 +1143,7 @@ class LoadBalancer { foreach ( $conns2[$masterIndex] as $conn ) { if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { try { - $conn->rollback( $fname, 'flush' ); + $conn->rollback( $fname, IDatabase::FLUSHING_ALL_PEERS ); } catch ( DBError $e ) { MWExceptionHandler::logException( $e ); $failedServers[] = $conn->getServer(); diff --git a/includes/deferred/LinksDeletionUpdate.php b/includes/deferred/LinksDeletionUpdate.php index 0009781f56..f96df489b2 100644 --- a/includes/deferred/LinksDeletionUpdate.php +++ b/includes/deferred/LinksDeletionUpdate.php @@ -54,6 +54,7 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate public function doUpdate() { $config = RequestContext::getMain()->getConfig(); $batchSize = $config->get( 'UpdateRowsPerQuery' ); + $factory = wfGetLBFactory(); // Page may already be deleted, so don't just getId() $id = $this->pageId; @@ -77,8 +78,8 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate foreach ( $catBatches as $catBatch ) { $this->page->updateCategoryCounts( [], $catBatch, $id ); if ( count( $catBatches ) > 1 ) { - $this->mDb->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); } } @@ -173,8 +174,8 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate foreach ( $rcIdBatches as $rcIdBatch ) { $this->mDb->delete( 'recentchanges', [ 'rc_id' => $rcIdBatch ], __METHOD__ ); if ( count( $rcIdBatches ) > 1 ) { - $this->mDb->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); } } } @@ -185,6 +186,7 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate private function batchDeleteByPK( $table, array $conds, array $pk, $bSize ) { $dbw = $this->mDb; // convenience + $factory = wfGetLBFactory(); $res = $dbw->select( $table, $pk, $conds, __METHOD__ ); $pkDeleteConds = []; @@ -192,8 +194,8 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate $pkDeleteConds[] = $this->mDb->makeList( (array)$row, LIST_AND ); if ( count( $pkDeleteConds ) >= $bSize ) { $dbw->delete( $table, $dbw->makeList( $pkDeleteConds, LIST_OR ), __METHOD__ ); - $dbw->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] ); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] ); $pkDeleteConds = []; } } diff --git a/includes/deferred/LinksUpdate.php b/includes/deferred/LinksUpdate.php index 22944eb7a6..aed6b1870f 100644 --- a/includes/deferred/LinksUpdate.php +++ b/includes/deferred/LinksUpdate.php @@ -335,6 +335,7 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate { */ private function incrTableUpdate( $table, $prefix, $deletions, $insertions ) { $bSize = RequestContext::getMain()->getConfig()->get( 'UpdateRowsPerQuery' ); + $factory = wfGetLBFactory(); if ( $table === 'page_props' ) { $fromField = 'pp_page'; @@ -386,15 +387,15 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate { foreach ( $deleteWheres as $deleteWhere ) { $this->mDb->delete( $table, $deleteWhere, __METHOD__ ); - $this->mDb->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); } $insertBatches = array_chunk( $insertions, $bSize ); foreach ( $insertBatches as $insertBatch ) { $this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' ); - $this->mDb->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); } if ( count( $insertions ) ) { diff --git a/includes/jobqueue/jobs/CategoryMembershipChangeJob.php b/includes/jobqueue/jobs/CategoryMembershipChangeJob.php index 57e69b422d..bea33dc1cb 100644 --- a/includes/jobqueue/jobs/CategoryMembershipChangeJob.php +++ b/includes/jobqueue/jobs/CategoryMembershipChangeJob.php @@ -64,7 +64,7 @@ class CategoryMembershipChangeJob extends Job { return false; } // Clear any stale REPEATABLE-READ snapshot - $dbr->commit( __METHOD__, 'flush' ); + wfGetLBFactory()->commitAll( __METHOD__ ); $cutoffUnix = wfTimestamp( TS_UNIX, $this->params['revTimestamp'] ); // Using ENQUEUE_FUDGE_SEC handles jobs inserted out of revision order due to the delay @@ -157,6 +157,7 @@ class CategoryMembershipChangeJob extends Job { } $dbw = wfGetDB( DB_MASTER ); + $factory = wfGetLBFactory(); $catMembChange = new CategoryMembershipChange( $title, $newRev ); $catMembChange->checkTemplateLinks(); @@ -167,8 +168,8 @@ class CategoryMembershipChangeJob extends Job { $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName ); $catMembChange->triggerCategoryAddedNotification( $categoryTitle ); if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) { - $dbw->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication(); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication(); } } @@ -176,8 +177,8 @@ class CategoryMembershipChangeJob extends Job { $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName ); $catMembChange->triggerCategoryRemovedNotification( $categoryTitle ); if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) { - $dbw->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication(); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication(); } } } diff --git a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php index a14cdd7bab..3ce4324071 100644 --- a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php +++ b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php @@ -113,11 +113,12 @@ class HTMLCacheUpdateJob extends Job { $touchTimestamp = wfTimestampNow(); $dbw = wfGetDB( DB_MASTER ); + $factory = wfGetLBFactory(); // Update page_touched (skipping pages already touched since the root job). // Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already. foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) { - $dbw->commit( __METHOD__, 'flush' ); - wfGetLBFactory()->waitForReplication(); + $factory->commitMasterChanges( __METHOD__ ); + $factory->waitForReplication(); $dbw->update( 'page', [ 'page_touched' => $dbw->timestamp( $touchTimestamp ) ], diff --git a/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/includes/jobqueue/jobs/RecentChangesUpdateJob.php index fbc1572512..843076469e 100644 --- a/includes/jobqueue/jobs/RecentChangesUpdateJob.php +++ b/includes/jobqueue/jobs/RecentChangesUpdateJob.php @@ -81,6 +81,7 @@ class RecentChangesUpdateJob extends Job { return; // already in progress } + $factory = wfGetLBFactory(); $cutoff = $dbw->timestamp( time() - $wgRCMaxAge ); do { $rcIds = $dbw->selectFieldValues( 'recentchanges', @@ -93,7 +94,7 @@ class RecentChangesUpdateJob extends Job { $dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ ); } // Commit in chunks to avoid slave lag - $dbw->commit( __METHOD__, 'flush' ); + $factory->commitMasterChanges( __METHOD__ ); if ( count( $rcIds ) === $wgUpdateRowsPerQuery ) { // There might be more, so try waiting for slaves diff --git a/includes/upload/UploadFromChunks.php b/includes/upload/UploadFromChunks.php index cc1d69854a..247f608bd3 100644 --- a/includes/upload/UploadFromChunks.php +++ b/includes/upload/UploadFromChunks.php @@ -235,8 +235,6 @@ class UploadFromChunks extends UploadFromFile { $this->getOffset() . ' inx:' . $this->getChunkIndex() . "\n" ); $dbw = $this->repo->getMasterDB(); - // Use a quick transaction since we will upload the full temp file into shared - // storage, which takes time for large files. We don't want to hold locks then. $dbw->update( 'uploadstash', [ @@ -247,7 +245,6 @@ class UploadFromChunks extends UploadFromFile { [ 'us_key' => $this->mFileKey ], __METHOD__ ); - $dbw->commit( __METHOD__, 'flush' ); } /**