$dbw = $this->getConnection( DB_MASTER );
$factory = wfGetLBFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
$watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery );
foreach ( $watchersChunks as $watchersChunk ) {
], $fname
);
if ( count( $watchersChunks ) > 1 ) {
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
+ $factory->commitAndWaitForReplication(
+ __METHOD__, $ticket, [ 'wiki' => $dbw->getWikiID() ]
+ );
}
}
$this->uncacheLinkTarget( $target );
/** @var WANObjectCache */
protected $wanCache;
+ /** @var mixed */
+ protected $ticket;
/** @var string|bool Reason all LBs are read-only or false if not */
protected $readOnlyReason = false;
$this->wanCache = WANObjectCache::newEmpty();
}
$this->trxLogger = LoggerFactory::getInstance( 'DBTransaction' );
+ $this->ticket = mt_rand();
}
/**
}
}
+ /**
+ * Get a token asserting that no transaction writes are active
+ *
+ * @param string $fname Caller name (e.g. __METHOD__)
+ * @return mixed A value to pass to commitAndWaitForReplication()
+ * @since 1.28
+ */
+ public function getEmptyTransactionTicket( $fname ) {
+ if ( $this->hasMasterChanges() ) {
+ $this->trxLogger->error( __METHOD__ . ": $fname does not have outer scope." );
+ return null;
+ }
+
+ return $this->ticket;
+ }
+
+ /**
+ * Convenience method for safely running commitMasterChanges()/waitForReplication()
+ *
+ * This will commit and wait unless $ticket indicates it is unsafe to do so
+ *
+ * @param string $fname Caller name (e.g. __METHOD__)
+ * @param mixed $ticket Result of getOuterTransactionScopeTicket()
+ * @param array $opts Options to waitForReplication()
+ * @throws DBReplicationWaitError
+ * @since 1.28
+ */
+ public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
+ if ( $ticket !== $this->ticket ) {
+ $logger = LoggerFactory::getInstance( 'DBPerformance' );
+ $logger->error( __METHOD__ . ": cannot commit; $fname does not have outer scope." );
+ return;
+ }
+
+ $this->commitMasterChanges( $fname );
+ $this->waitForReplication( $opts );
+ }
+
/**
* Disable the ChronologyProtector for all load balancers
*
* subclasses can override the beginTransaction() and commitTransaction() methods.
*/
abstract class DataUpdate implements DeferrableUpdate {
+ /** @var mixed Result from LBFactory::getEmptyTransactionTicket() */
+ protected $ticket;
+
public function __construct() {
// noop
}
+ /**
+ * @param mixed $ticket Result of getEmptyTransactionTicket()
+ * @since 1.28
+ */
+ public function setTransactionTicket( $ticket ) {
+ $this->ticket = $ticket;
+ }
+
/**
* Begin an appropriate transaction, if any.
* This default implementation does nothing.
foreach ( $catBatches as $catBatch ) {
$this->page->updateCategoryCounts( [], $catBatch, $id );
if ( count( $catBatches ) > 1 ) {
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+ $factory->commitAndWaitForReplication(
+ __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+ );
}
}
foreach ( $rcIdBatches as $rcIdBatch ) {
$this->mDb->delete( 'recentchanges', [ 'rc_id' => $rcIdBatch ], __METHOD__ );
if ( count( $rcIdBatches ) > 1 ) {
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+ $factory->commitAndWaitForReplication(
+ __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+ );
}
}
}
$pkDeleteConds[] = $this->mDb->makeList( (array)$row, LIST_AND );
if ( count( $pkDeleteConds ) >= $bSize ) {
$dbw->delete( $table, $dbw->makeList( $pkDeleteConds, LIST_OR ), __METHOD__ );
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
+ $factory->commitAndWaitForReplication(
+ __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+ );
$pkDeleteConds = [];
}
}
foreach ( $deleteWheres as $deleteWhere ) {
$this->mDb->delete( $table, $deleteWhere, __METHOD__ );
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+ $factory->commitAndWaitForReplication(
+ __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+ );
}
$insertBatches = array_chunk( $insertions, $bSize );
foreach ( $insertBatches as $insertBatch ) {
$this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' );
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+ $factory->commitAndWaitForReplication(
+ __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+ );
}
if ( count( $insertions ) ) {
$dbw = wfGetDB( DB_MASTER );
$factory = wfGetLBFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
+
$catMembChange = new CategoryMembershipChange( $title, $newRev );
$catMembChange->checkTemplateLinks();
$categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
$catMembChange->triggerCategoryAddedNotification( $categoryTitle );
if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) {
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication();
+ $factory->commitAndWaitForReplication( __METHOD__, $ticket );
}
}
$categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
$catMembChange->triggerCategoryRemovedNotification( $categoryTitle );
if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) {
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication();
+ $factory->commitAndWaitForReplication( __METHOD__, $ticket );
}
}
}
* @file
* @ingroup JobQueue
*/
+use \MediaWiki\MediaWikiServices;
/**
* Job to prune link tables for pages that were deleted
return false;
}
+ $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$timestamp = isset( $this->params['timestamp'] ) ? $this->params['timestamp'] : null;
-
$page = WikiPage::factory( $this->title ); // title when deleted
+
$update = new LinksDeletionUpdate( $page, $pageId, $timestamp );
+ $update->setTransactionTicket( $factory->getEmptyTransactionTicket( __METHOD__ ) );
DataUpdate::runUpdates( [ $update ] );
return true;
$dbw = wfGetDB( DB_MASTER );
$factory = wfGetLBFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
// Update page_touched (skipping pages already touched since the root job).
// Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already.
foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) {
- $factory->commitMasterChanges( __METHOD__ );
- $factory->waitForReplication();
+ $factory->commitAndWaitForReplication( __METHOD__, $ticket );
$dbw->update( 'page',
[ 'page_touched' => $dbw->timestamp( $touchTimestamp ) ],
}
$factory = wfGetLBFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
$cutoff = $dbw->timestamp( time() - $wgRCMaxAge );
do {
$rcIds = $dbw->selectFieldValues( 'recentchanges',
);
if ( $rcIds ) {
$dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ );
- }
- // Commit in chunks to avoid slave lag
- $factory->commitMasterChanges( __METHOD__ );
-
- if ( count( $rcIds ) === $wgUpdateRowsPerQuery ) {
// There might be more, so try waiting for slaves
try {
- wfGetLBFactory()->waitForReplication( [ 'timeout' => 3 ] );
+ $factory->commitAndWaitForReplication(
+ __METHOD__, $ticket, [ 'timeout' => 3 ]
+ );
} catch ( DBReplicationWaitError $e ) {
// Another job will continue anyway
break;
// JobRunner uses DBO_TRX, but doesn't call begin/commit itself;
// onTransactionIdle() will run immediately since there is no trx.
$dbw->onTransactionIdle( function() use ( $dbw, $days, $window ) {
+ $factory = wfGetLBFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
// Avoid disconnect/ping() cycle that makes locks fall off
$dbw->setSessionOptions( [ 'connTimeout' => 900 ] );
}
foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) {
$dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ );
- wfGetLBFactory()->waitForReplication();
+ $factory->commitAndWaitForReplication( __METHOD__, $ticket );
}
}
$parserOutput
);
+ $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
foreach ( $updates as $key => $update ) {
+ $update->setTransactionTicket( $ticket );
// FIXME: This code probably shouldn't be here?
// Needed by things like Echo notifications which need
// to know which user caused the links update