*/
/**
- * See docs/deferred.txt
+ * Class the manages updates of *_link tables as well as similar extension-managed tables
+ *
+ * @note: LinksUpdate is managed by DeferredUpdates::execute(). Do not run this in a transaction.
*
- * @todo document (e.g. one-sentence top-level class description).
+ * See docs/deferred.txt
*/
class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
// @todo make members protected, but make sure extensions don't break
*/
private $user;
+ const BATCH_SIZE = 500; // try to keep typical updates in a single transaction
+
/**
* Constructor
*
* @throws MWException
*/
function __construct( Title $title, ParserOutput $parserOutput, $recursive = true ) {
- parent::__construct( false ); // no implicit transaction
+ // Implicit transactions are disabled as they interfere with batching
+ parent::__construct( false );
$this->mTitle = $title;
$this->mId = $title->getArticleID( Title::GAID_FOR_UPDATE );
/**
* Update link tables with outgoing links from an updated article
+ *
+ * @note: this is managed by DeferredUpdates::execute(). Do not run this in a transaction.
*/
public function doUpdate() {
+ // Make sure all links update threads see the changes of each other.
+ // This handles the case when updates have to batched into several COMMITs.
+ $scopedLock = self::acquirePageLock( $this->mDb, $this->mId );
+
Hooks::run( 'LinksUpdate', [ &$this ] );
$this->doIncrementalUpdate();
- $this->mDb->onTransactionIdle( function() {
+ $this->mDb->onTransactionIdle( function() use ( &$scopedLock ) {
Hooks::run( 'LinksUpdateComplete', [ &$this ] );
+ // Release the lock *after* the final COMMIT for correctness
+ ScopedCallback::consume( $scopedLock );
} );
}
+ /**
+ * Acquire a lock for performing link table updates for a page on a DB
+ *
+ * @param IDatabase $dbw
+ * @param integer $pageId
+ * @return ScopedCallback|null Returns null on failure
+ * @throws RuntimeException
+ * @since 1.27
+ */
+ public static function acquirePageLock( IDatabase $dbw, $pageId ) {
+ $scopedLock = $dbw->getScopedLockAndFlush(
+ "LinksUpdate:pageid:$pageId",
+ __METHOD__,
+ 15
+ );
+ if ( !$scopedLock ) {
+ throw new RuntimeException( "Could not acquire lock on page #$pageId." );
+ }
+
+ return $scopedLock;
+ }
+
protected function doIncrementalUpdate() {
# Page links
$existing = $this->getExistingLinks();
# Image links
$existing = $this->getExistingImages();
-
$imageDeletes = $this->getImageDeletions( $existing );
$this->incrTableUpdate( 'imagelinks', 'il', $imageDeletes,
$this->getImageInsertions( $existing ) );
# Category links
$existing = $this->getExistingCategories();
-
$categoryDeletes = $this->getCategoryDeletions( $existing );
-
$this->incrTableUpdate( 'categorylinks', 'cl', $categoryDeletes,
$this->getCategoryInsertions( $existing ) );
# Page properties
$existing = $this->getExistingProperties();
-
$propertiesDeletes = $this->getPropertyDeletions( $existing );
-
$this->incrTableUpdate( 'page_props', 'pp', $propertiesDeletes,
$this->getPropertyInsertions( $existing ) );
* @param array $deletions
* @param array $insertions Rows to insert
*/
- function incrTableUpdate( $table, $prefix, $deletions, $insertions ) {
- if ( $table == 'page_props' ) {
+ private function incrTableUpdate( $table, $prefix, $deletions, $insertions ) {
+ if ( $table === 'page_props' ) {
$fromField = 'pp_page';
} else {
$fromField = "{$prefix}_from";
}
- $where = [ $fromField => $this->mId ];
- if ( $table == 'pagelinks' || $table == 'templatelinks' || $table == 'iwlinks' ) {
- if ( $table == 'iwlinks' ) {
- $baseKey = 'iwl_prefix';
- } else {
- $baseKey = "{$prefix}_namespace";
+
+ $deleteWheres = []; // list of WHERE clause arrays for each DB delete() call
+ if ( $table === 'pagelinks' || $table === 'templatelinks' || $table === 'iwlinks' ) {
+ $baseKey = ( $table === 'iwlinks' ) ? 'iwl_prefix' : "{$prefix}_namespace";
+
+ $curBatchSize = 0;
+ $curDeletionBatch = [];
+ $deletionBatches = [];
+ foreach ( $deletions as $ns => $dbKeys ) {
+ foreach ( $dbKeys as $dbKey => $unused ) {
+ $curDeletionBatch[$ns][$dbKey] = 1;
+ if ( ++$curBatchSize >= self::BATCH_SIZE ) {
+ $deletionBatches[] = $curDeletionBatch;
+ $curDeletionBatch = [];
+ $curBatchSize = 0;
+ }
+ }
}
- $clause = $this->mDb->makeWhereFrom2d( $deletions, $baseKey, "{$prefix}_title" );
- if ( $clause ) {
- $where[] = $clause;
- } else {
- $where = false;
+ if ( $curDeletionBatch ) {
+ $deletionBatches[] = $curDeletionBatch;
+ }
+
+ foreach ( $deletionBatches as $deletionBatch ) {
+ $deleteWheres[] = [
+ $fromField => $this->mId,
+ $this->mDb->makeWhereFrom2d( $deletionBatch, $baseKey, "{$prefix}_title" )
+ ];
}
} else {
- if ( $table == 'langlinks' ) {
+ if ( $table === 'langlinks' ) {
$toField = 'll_lang';
- } elseif ( $table == 'page_props' ) {
+ } elseif ( $table === 'page_props' ) {
$toField = 'pp_propname';
} else {
$toField = $prefix . '_to';
}
- if ( count( $deletions ) ) {
- $where[$toField] = array_keys( $deletions );
- } else {
- $where = false;
+
+ $deletionBatches = array_chunk( array_keys( $deletions ), self::BATCH_SIZE );
+ foreach ( $deletionBatches as $deletionBatch ) {
+ $deleteWheres[] = [ $fromField => $this->mId, $toField => $deletionBatch ];
}
}
- if ( $where ) {
- $this->mDb->delete( $table, $where, __METHOD__ );
+
+ foreach ( $deleteWheres as $deleteWhere ) {
+ $this->mDb->delete( $table, $deleteWhere, __METHOD__ );
+ $this->mDb->commit( __METHOD__, 'flush' );
+ wfGetLBFactory()->waitForReplication();
+ }
+
+ $insertBatches = array_chunk( $insertions, self::BATCH_SIZE );
+ foreach ( $insertBatches as $insertBatch ) {
+ $this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' );
+ $this->mDb->commit( __METHOD__, 'flush' );
+ wfGetLBFactory()->waitForReplication();
}
+
if ( count( $insertions ) ) {
- $this->mDb->insert( $table, $insertions, __METHOD__, 'IGNORE' );
Hooks::run( 'LinksUpdateAfterInsert', [ $this, $table, $insertions ] );
}
}