From 21ddcf15922e967ce2cd2adc0f40ed722e5b5d1f Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 12 Aug 2016 19:56:21 -0700 Subject: [PATCH] Add convenience commitAndWaitForReplication() method This also does sanity checks to avoid breaking transactions Change-Id: I7453c245eee25a26243e606970ef5f79b21a8141 --- includes/WatchedItemStore.php | 6 ++- includes/db/loadbalancer/LBFactory.php | 41 +++++++++++++++++++ includes/deferred/DataUpdate.php | 11 +++++ includes/deferred/LinksDeletionUpdate.php | 15 ++++--- includes/deferred/LinksUpdate.php | 10 +++-- .../jobs/CategoryMembershipChangeJob.php | 8 ++-- includes/jobqueue/jobs/DeleteLinksJob.php | 5 ++- includes/jobqueue/jobs/HTMLCacheUpdateJob.php | 4 +- .../jobqueue/jobs/RecentChangesUpdateJob.php | 14 +++---- includes/jobqueue/jobs/RefreshLinksJob.php | 3 ++ 10 files changed, 91 insertions(+), 26 deletions(-) diff --git a/includes/WatchedItemStore.php b/includes/WatchedItemStore.php index cf18fab51f..a13609bf5d 100644 --- a/includes/WatchedItemStore.php +++ b/includes/WatchedItemStore.php @@ -742,6 +742,7 @@ class WatchedItemStore implements StatsdAwareInterface { $dbw = $this->getConnection( DB_MASTER ); $factory = wfGetLBFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); $watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery ); foreach ( $watchersChunks as $watchersChunk ) { @@ -755,8 +756,9 @@ class WatchedItemStore implements StatsdAwareInterface { ], $fname ); if ( count( $watchersChunks ) > 1 ) { - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] ); + $factory->commitAndWaitForReplication( + __METHOD__, $ticket, [ 'wiki' => $dbw->getWikiID() ] + ); } } $this->uncacheLinkTarget( $target ); diff --git a/includes/db/loadbalancer/LBFactory.php b/includes/db/loadbalancer/LBFactory.php index 4078a3926e..efc6148fee 100644 --- a/includes/db/loadbalancer/LBFactory.php +++ b/includes/db/loadbalancer/LBFactory.php @@ -42,6 +42,8 @@ abstract class LBFactory implements DestructibleService { /** @var WANObjectCache */ protected $wanCache; + /** @var mixed */ + protected $ticket; /** @var string|bool Reason all LBs are read-only or false if not */ protected $readOnlyReason = false; @@ -72,6 +74,7 @@ abstract class LBFactory implements DestructibleService { $this->wanCache = WANObjectCache::newEmpty(); } $this->trxLogger = LoggerFactory::getInstance( 'DBTransaction' ); + $this->ticket = mt_rand(); } /** @@ -404,6 +407,44 @@ abstract class LBFactory implements DestructibleService { } } + /** + * Get a token asserting that no transaction writes are active + * + * @param string $fname Caller name (e.g. __METHOD__) + * @return mixed A value to pass to commitAndWaitForReplication() + * @since 1.28 + */ + public function getEmptyTransactionTicket( $fname ) { + if ( $this->hasMasterChanges() ) { + $this->trxLogger->error( __METHOD__ . ": $fname does not have outer scope." ); + return null; + } + + return $this->ticket; + } + + /** + * Convenience method for safely running commitMasterChanges()/waitForReplication() + * + * This will commit and wait unless $ticket indicates it is unsafe to do so + * + * @param string $fname Caller name (e.g. __METHOD__) + * @param mixed $ticket Result of getOuterTransactionScopeTicket() + * @param array $opts Options to waitForReplication() + * @throws DBReplicationWaitError + * @since 1.28 + */ + public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) { + if ( $ticket !== $this->ticket ) { + $logger = LoggerFactory::getInstance( 'DBPerformance' ); + $logger->error( __METHOD__ . ": cannot commit; $fname does not have outer scope." ); + return; + } + + $this->commitMasterChanges( $fname ); + $this->waitForReplication( $opts ); + } + /** * Disable the ChronologyProtector for all load balancers * diff --git a/includes/deferred/DataUpdate.php b/includes/deferred/DataUpdate.php index 2865461e5e..5b84ca91f6 100644 --- a/includes/deferred/DataUpdate.php +++ b/includes/deferred/DataUpdate.php @@ -30,10 +30,21 @@ * subclasses can override the beginTransaction() and commitTransaction() methods. */ abstract class DataUpdate implements DeferrableUpdate { + /** @var mixed Result from LBFactory::getEmptyTransactionTicket() */ + protected $ticket; + public function __construct() { // noop } + /** + * @param mixed $ticket Result of getEmptyTransactionTicket() + * @since 1.28 + */ + public function setTransactionTicket( $ticket ) { + $this->ticket = $ticket; + } + /** * Begin an appropriate transaction, if any. * This default implementation does nothing. diff --git a/includes/deferred/LinksDeletionUpdate.php b/includes/deferred/LinksDeletionUpdate.php index f96df489b2..47f2b21ff5 100644 --- a/includes/deferred/LinksDeletionUpdate.php +++ b/includes/deferred/LinksDeletionUpdate.php @@ -78,8 +78,9 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate foreach ( $catBatches as $catBatch ) { $this->page->updateCategoryCounts( [], $catBatch, $id ); if ( count( $catBatches ) > 1 ) { - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); + $factory->commitAndWaitForReplication( + __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ] + ); } } @@ -174,8 +175,9 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate foreach ( $rcIdBatches as $rcIdBatch ) { $this->mDb->delete( 'recentchanges', [ 'rc_id' => $rcIdBatch ], __METHOD__ ); if ( count( $rcIdBatches ) > 1 ) { - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); + $factory->commitAndWaitForReplication( + __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ] + ); } } } @@ -194,8 +196,9 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate $pkDeleteConds[] = $this->mDb->makeList( (array)$row, LIST_AND ); if ( count( $pkDeleteConds ) >= $bSize ) { $dbw->delete( $table, $dbw->makeList( $pkDeleteConds, LIST_OR ), __METHOD__ ); - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] ); + $factory->commitAndWaitForReplication( + __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ] + ); $pkDeleteConds = []; } } diff --git a/includes/deferred/LinksUpdate.php b/includes/deferred/LinksUpdate.php index aed6b1870f..4f40c3839d 100644 --- a/includes/deferred/LinksUpdate.php +++ b/includes/deferred/LinksUpdate.php @@ -387,15 +387,17 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate { foreach ( $deleteWheres as $deleteWhere ) { $this->mDb->delete( $table, $deleteWhere, __METHOD__ ); - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); + $factory->commitAndWaitForReplication( + __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ] + ); } $insertBatches = array_chunk( $insertions, $bSize ); foreach ( $insertBatches as $insertBatch ) { $this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' ); - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] ); + $factory->commitAndWaitForReplication( + __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ] + ); } if ( count( $insertions ) ) { diff --git a/includes/jobqueue/jobs/CategoryMembershipChangeJob.php b/includes/jobqueue/jobs/CategoryMembershipChangeJob.php index bea33dc1cb..b561021b29 100644 --- a/includes/jobqueue/jobs/CategoryMembershipChangeJob.php +++ b/includes/jobqueue/jobs/CategoryMembershipChangeJob.php @@ -158,6 +158,8 @@ class CategoryMembershipChangeJob extends Job { $dbw = wfGetDB( DB_MASTER ); $factory = wfGetLBFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); + $catMembChange = new CategoryMembershipChange( $title, $newRev ); $catMembChange->checkTemplateLinks(); @@ -168,8 +170,7 @@ class CategoryMembershipChangeJob extends Job { $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName ); $catMembChange->triggerCategoryAddedNotification( $categoryTitle ); if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) { - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication(); + $factory->commitAndWaitForReplication( __METHOD__, $ticket ); } } @@ -177,8 +178,7 @@ class CategoryMembershipChangeJob extends Job { $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName ); $catMembChange->triggerCategoryRemovedNotification( $categoryTitle ); if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) { - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication(); + $factory->commitAndWaitForReplication( __METHOD__, $ticket ); } } } diff --git a/includes/jobqueue/jobs/DeleteLinksJob.php b/includes/jobqueue/jobs/DeleteLinksJob.php index f39f8fde05..8d565bd1d9 100644 --- a/includes/jobqueue/jobs/DeleteLinksJob.php +++ b/includes/jobqueue/jobs/DeleteLinksJob.php @@ -20,6 +20,7 @@ * @file * @ingroup JobQueue */ +use \MediaWiki\MediaWikiServices; /** * Job to prune link tables for pages that were deleted @@ -52,10 +53,12 @@ class DeleteLinksJob extends Job { return false; } + $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); $timestamp = isset( $this->params['timestamp'] ) ? $this->params['timestamp'] : null; - $page = WikiPage::factory( $this->title ); // title when deleted + $update = new LinksDeletionUpdate( $page, $pageId, $timestamp ); + $update->setTransactionTicket( $factory->getEmptyTransactionTicket( __METHOD__ ) ); DataUpdate::runUpdates( [ $update ] ); return true; diff --git a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php index 3ce4324071..f09ba57b53 100644 --- a/includes/jobqueue/jobs/HTMLCacheUpdateJob.php +++ b/includes/jobqueue/jobs/HTMLCacheUpdateJob.php @@ -114,11 +114,11 @@ class HTMLCacheUpdateJob extends Job { $dbw = wfGetDB( DB_MASTER ); $factory = wfGetLBFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); // Update page_touched (skipping pages already touched since the root job). // Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already. foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) { - $factory->commitMasterChanges( __METHOD__ ); - $factory->waitForReplication(); + $factory->commitAndWaitForReplication( __METHOD__, $ticket ); $dbw->update( 'page', [ 'page_touched' => $dbw->timestamp( $touchTimestamp ) ], diff --git a/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/includes/jobqueue/jobs/RecentChangesUpdateJob.php index 843076469e..2fd3899f9f 100644 --- a/includes/jobqueue/jobs/RecentChangesUpdateJob.php +++ b/includes/jobqueue/jobs/RecentChangesUpdateJob.php @@ -82,6 +82,7 @@ class RecentChangesUpdateJob extends Job { } $factory = wfGetLBFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); $cutoff = $dbw->timestamp( time() - $wgRCMaxAge ); do { $rcIds = $dbw->selectFieldValues( 'recentchanges', @@ -92,14 +93,11 @@ class RecentChangesUpdateJob extends Job { ); if ( $rcIds ) { $dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ ); - } - // Commit in chunks to avoid slave lag - $factory->commitMasterChanges( __METHOD__ ); - - if ( count( $rcIds ) === $wgUpdateRowsPerQuery ) { // There might be more, so try waiting for slaves try { - wfGetLBFactory()->waitForReplication( [ 'timeout' => 3 ] ); + $factory->commitAndWaitForReplication( + __METHOD__, $ticket, [ 'timeout' => 3 ] + ); } catch ( DBReplicationWaitError $e ) { // Another job will continue anyway break; @@ -122,6 +120,8 @@ class RecentChangesUpdateJob extends Job { // JobRunner uses DBO_TRX, but doesn't call begin/commit itself; // onTransactionIdle() will run immediately since there is no trx. $dbw->onTransactionIdle( function() use ( $dbw, $days, $window ) { + $factory = wfGetLBFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); // Avoid disconnect/ping() cycle that makes locks fall off $dbw->setSessionOptions( [ 'connTimeout' => 900 ] ); @@ -205,7 +205,7 @@ class RecentChangesUpdateJob extends Job { } foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) { $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ ); - wfGetLBFactory()->waitForReplication(); + $factory->commitAndWaitForReplication( __METHOD__, $ticket ); } } diff --git a/includes/jobqueue/jobs/RefreshLinksJob.php b/includes/jobqueue/jobs/RefreshLinksJob.php index 8fba72894b..9cdb1617d1 100644 --- a/includes/jobqueue/jobs/RefreshLinksJob.php +++ b/includes/jobqueue/jobs/RefreshLinksJob.php @@ -241,7 +241,10 @@ class RefreshLinksJob extends Job { $parserOutput ); + $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); foreach ( $updates as $key => $update ) { + $update->setTransactionTicket( $ticket ); // FIXME: This code probably shouldn't be here? // Needed by things like Echo notifications which need // to know which user caused the links update -- 2.20.1