From: Tim Starling Date: Wed, 29 Oct 2008 15:15:29 +0000 (+0000) Subject: * Added recompressTracked.php, the second part of the recompression project. Uses... X-Git-Tag: 1.31.0-rc.0~44519 X-Git-Url: http://git.cyclocoop.org/?a=commitdiff_plain;ds=inline;h=0922f54539f9156cfe3b7f972070bbd430fe8e4b;p=lhc%2Fweb%2Fwiklou.git * Added recompressTracked.php, the second part of the recompression project. Uses the sorted list of blobs compiled by trackBlobs.php to recompress the entire contents of a set of external storage clusters to a new set of clusters, and updates the text table accordingly. * Tweaked blob_tracking indexes now that I know what I'm doing with them * Standardised isHappy() on no arguments. Use an uncompressed size limit instead of the weird things I did originally. --- diff --git a/includes/HistoryBlob.php b/includes/HistoryBlob.php index b051adf143..8cd2945f24 100644 --- a/includes/HistoryBlob.php +++ b/includes/HistoryBlob.php @@ -43,7 +43,9 @@ interface HistoryBlob class ConcatenatedGzipHistoryBlob implements HistoryBlob { public $mVersion = 0, $mCompressed = false, $mItems = array(), $mDefaultHash = ''; - public $mFast = 0, $mSize = 0; + public $mSize = 0; + public $mMaxSize = 10000000; + public $mMaxCount = 100; /** Constructor */ public function ConcatenatedGzipHistoryBlob() { @@ -122,25 +124,9 @@ class ConcatenatedGzipHistoryBlob implements HistoryBlob * Helper function for compression jobs * Returns true until the object is "full" and ready to be committed */ - public function isHappy( $maxFactor, $factorThreshold ) { - if ( count( $this->mItems ) == 0 ) { - return true; - } - if ( !$this->mFast ) { - $this->uncompress(); - $record = serialize( $this->mItems ); - $size = strlen( $record ); - $avgUncompressed = $size / count( $this->mItems ); - $compressed = strlen( gzdeflate( $record ) ); - - if ( $compressed < $factorThreshold * 1024 ) { - return true; - } else { - return $avgUncompressed * $maxFactor < $compressed; - } - } else { - return count( $this->mItems ) <= 10; - } + public function isHappy() { + return $this->mSize < $this->mMaxSize + && count( $this->mItems ) < $this->mMaxCount; } } @@ -313,6 +299,17 @@ class DiffHistoryBlob implements HistoryBlob { var $mFrozen = false; + /** + * The maximum uncompressed size before the object becomes sad + * Should be less than max_allowed_packet + */ + var $mMaxSize = 10000000; + + /** + * The maximum number of text items before the object becomes sad + */ + var $mMaxCount = 100; + function __construct() { if ( !function_exists( 'xdiff_string_bdiff' ) ){ throw new MWException( "Need xdiff 1.5+ support to read or write DiffHistoryBlob\n" ); @@ -328,6 +325,7 @@ class DiffHistoryBlob implements HistoryBlob { } $this->mItems[] = $text; + $this->mSize += strlen( $text ); $i = count( $this->mItems ) - 1; if ( $i > 0 ) { # Need to do a null concatenation with warnings off, due to bugs in the current version of xdiff @@ -401,4 +399,14 @@ class DiffHistoryBlob implements HistoryBlob { $this->mItems[0] = $info['base']; $this->mDiffs = $info['diffs']; } + + /** + * Helper function for compression jobs + * Returns true until the object is "full" and ready to be committed + */ + function isHappy() { + return $this->mSize < $this->mMaxSize + && count( $this->mItems ) < $this->mMaxCount; + } + } diff --git a/maintenance/storage/blob_tracking.sql b/maintenance/storage/blob_tracking.sql index fe7aadfbdd..b136609c9a 100644 --- a/maintenance/storage/blob_tracking.sql +++ b/maintenance/storage/blob_tracking.sql @@ -28,16 +28,19 @@ CREATE TABLE /*$wgDBprefix*/blob_tracking ( -- True if the text table has been updated to point to bt_new_url bt_moved bool not null default 0, - PRIMARY KEY (bt_rev_id, bt_text_id), + -- Primary key + -- Note that text_id is not unique due to null edits (protection, move) + -- moveTextRow(), commit(), trackOrphanText() + PRIMARY KEY (bt_text_id, bt_rev_id), -- Sort by page for easy CGZ recompression - KEY (bt_moved, bt_page, bt_rev_id), - - -- For fast orphan searches - KEY (bt_text_id), + -- doAllPages(), doAllOrphans(), doPage(), finishIncompleteMoves() + KEY (bt_moved, bt_page, bt_text_id), -- Key for determining the revisions using a given blob + -- Not used by any scripts yet KEY (bt_cluster, bt_blob_id, bt_cgz_hash) + ) /*$wgDBTableOptions*/; -- Tracking table for blob rows that aren't tracked by the text table diff --git a/maintenance/storage/compressOld.inc b/maintenance/storage/compressOld.inc index d812a95baf..fb8cc422ef 100644 --- a/maintenance/storage/compressOld.inc +++ b/maintenance/storage/compressOld.inc @@ -66,7 +66,7 @@ define( 'LS_INDIVIDUAL', 0 ); define( 'LS_CHUNKED', 1 ); /** @todo document */ -function compressWithConcat( $startId, $maxChunkSize, $maxChunkFactor, $factorThreshold, $beginDate, +function compressWithConcat( $startId, $maxChunkSize, $beginDate, $endDate, $extdb="", $maxPageId = false ) { $fname = 'compressWithConcat'; @@ -194,7 +194,7 @@ function compressWithConcat( $startId, $maxChunkSize, $maxChunkFactor, $factorTh $primaryOldid = $revs[$i]->rev_text_id; # Get the text of each revision and add it to the object - for ( $j = 0; $j < $thisChunkSize && $chunk->isHappy( $maxChunkFactor, $factorThreshold ); $j++ ) { + for ( $j = 0; $j < $thisChunkSize && $chunk->isHappy(); $j++ ) { $oldid = $revs[$i + $j]->rev_text_id; # Get text diff --git a/maintenance/storage/compressOld.php b/maintenance/storage/compressOld.php index dda765d74f..6f8b48eb53 100644 --- a/maintenance/storage/compressOld.php +++ b/maintenance/storage/compressOld.php @@ -18,8 +18,6 @@ * -b earliest date to check for uncompressed revisions * -e latest revision date to compress * -s the old_id to start from - * -f the maximum ratio of compressed chunk bytes to uncompressed avg. revision bytes - * -h is a minimum number of KB, where cuts in * --extdb store specified revisions in an external cluster (untested) * * @file @@ -40,8 +38,6 @@ $defaults = array( 't' => 'concat', 'c' => 20, 's' => 0, - 'f' => 5, - 'h' => 100, 'b' => '', 'e' => '', 'extdb' => '', @@ -62,7 +58,7 @@ if ( $options['extdb'] != '' ) { $success = true; if ( $options['t'] == 'concat' ) { - $success = compressWithConcat( $options['s'], $options['c'], $options['f'], $options['h'], $options['b'], + $success = compressWithConcat( $options['s'], $options['c'], $options['b'], $options['e'], $options['extdb'], $options['endid'] ); } else { compressOldPages( $options['s'], $options['extdb'] ); diff --git a/maintenance/storage/recompressTracked.php b/maintenance/storage/recompressTracked.php new file mode 100644 index 0000000000..9c32f126a7 --- /dev/null +++ b/maintenance/storage/recompressTracked.php @@ -0,0 +1,624 @@ + [... ...]\n"; + echo "Moves blobs indexed by trackBlobs.php to a specified list of destination +clusters, and recompresses them in the process. Restartable.\n"; + exit( 1 ); +} + +$job = RecompressTracked::newFromCommandLine( $args, $options ); +$job->execute(); + +class RecompressTracked { + var $destClusters; + var $batchSize = 1000; + var $reportingInterval = 10; + var $numProcs = 8; + var $slavePipes, $slaveProcs, $prevSlaveId; + var $blobClass = 'DiffHistoryBlob'; + var $copyOnly = false; + var $isChild = false; + var $slaveId = false; + var $store; + + static $optionsWithArgs = array( 'procs', 'class' ); + static $cmdLineOptionMap = array( + 'procs' => 'numProcs', + 'class' => 'blobClass', + 'copy-only' => 'copyOnly', + 'child' => 'isChild', + 'slave-id' => 'slaveId', + ); + + static function getOptionsWithArgs() { + return self::$optionsWithArgs; + } + + static function newFromCommandLine( $args, $options ) { + $jobOptions = array( 'destClusters' => $args ); + foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { + if ( isset( $options[$cmdOption] ) ) { + $jobOptions[$classOption] = $options[$cmdOption]; + } + } + return new self( $jobOptions ); + } + + function __construct( $options ) { + foreach ( $options as $name => $value ) { + $this->$name = $value; + } + $this->store = new ExternalStoreDB; + } + + function debug( $msg ) { + if ( $this->slaveId !== false ) { + $msg = "{$this->slaveId}: $msg"; + } + $msg .= "\n"; + wfDebug( $msg ); + } + + /** + * Wait until the selected slave has caught up to the master. + * This allows us to use the slave for things that were committed in a + * previous part of this batch process. + */ + function syncDBs() { + $dbw = wfGetDB( DB_MASTER ); + $dbr = wfGetDB( DB_SLAVE ); + $pos = $dbw->getMasterPos(); + $dbr->masterPosWait( $pos, 100000 ); + } + + /** + * Execute parent or child depending on the isChild option + */ + function execute() { + if ( $this->isChild ) { + $this->executeChild(); + } else { + $this->executeParent(); + } + } + + /** + * Execute the parent process + */ + function executeParent() { + if ( !$this->checkTrackingTable() ) { + return; + } + + $this->syncDBs(); + $this->startSlaveProcs(); + $this->doAllPages(); + $this->doAllOrphans(); + $this->killSlaveProcs(); + } + + /** + * Make sure the tracking table exists and isn't empty + */ + function checkTrackingTable() { + $dbr = wfGetDB( DB_SLAVE ); + if ( !$dbr->tableExists( 'blob_tracking' ) ) { + echo "Error: blob_tracking table does not exist\n"; + return false; + } + $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ ); + if ( !$row ) { + echo "Warning: blob_tracking table contains no rows, skipping this wiki.\n"; + return false; + } + return true; + } + + /** + * Start the worker processes. + * These processes will listen on stdin for commands. + * This necessary because text recompression is slow: loading, compressing and + * writing are all slow. + */ + function startSlaveProcs() { + $cmd = 'php ' . wfEscapeShellArg( __FILE__ ); + foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { + if ( in_array( $cmdOption, self::$optionsWithArgs ) ) { + $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption ); + } elseif ( $this->$classOption ) { + $cmd .= " --$cmdOption"; + } + } + $cmd .= ' --child' . + ' --wiki ' . wfEscapeShellArg( wfWikiID() ) . + ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters ); + + $this->slavePipes = $this->slaveProcs = array(); + for ( $i = 0; $i < $this->numProcs; $i++ ) { + $pipes = false; + $spec = array( + array( 'pipe', 'r' ), + array( 'file', '/dev/stdout', 'w' ), + array( 'file', '/dev/stderr', 'w' ) + ); + wfSuppressWarnings(); + $proc = proc_open( $cmd, $spec, $pipes ); + wfRestoreWarnings(); + if ( !$proc ) { + echo "Error opening slave process\n"; + exit( 1 ); + } + $this->slaveProcs[$i] = $proc; + $this->slavePipes[$i] = $pipes[0]; + } + $this->prevSlaveId = -1; + } + + /** + * Gracefully terminate the child processes + */ + function killSlaveProcs() { + for ( $i = 0; $i < $this->numProcs; $i++ ) { + $this->dispatchToSlave( $i, 'quit' ); + } + for ( $i = 0; $i < $this->numProcs; $i++ ) { + proc_close( $this->slaveProcs[$i] ); + } + } + + /** + * Dispatch a command to the next available slave. + * This may block until a slave finishes its work and becomes available. + */ + function dispatch( /*...*/ ) { + $args = func_get_args(); + $pipes = $this->slavePipes; + $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 ); + if ( !$numPipes ) { + echo "Error waiting to write to slaves. Aborting\n"; + exit( 1 ); + } + for ( $i = 0; $i < $this->numProcs; $i++ ) { + $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs; + if ( isset( $pipes[$slaveId] ) ) { + $this->prevSlaveId = $slaveId; + $this->dispatchToSlave( $slaveId, $args ); + return; + } + } + echo "Unreachable\n"; + exit( 1 ); + } + + /** + * Dispatch a command to a specified slave + */ + function dispatchToSlave( $slaveId, $args ) { + $args = (array)$args; + $cmd = implode( ' ', $args ); + fwrite( $this->slavePipes[$slaveId], "$cmd\n" ); + } + + /** + * Move all tracked pages to the new clusters + */ + function doAllPages() { + $dbr = wfGetDB( DB_SLAVE ); + $startId = 0; + $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_page)', + # A condition is required so that this query uses the index + array( 'bt_moved' => 0 ), + __METHOD__ ); + echo "Moving pages...\n"; + while ( true ) { + $res = $dbr->select( 'blob_tracking', + array( 'bt_page' ), + array( + 'bt_moved' => 0, + 'bt_page > ' . $dbr->addQuotes( $startId ) + ), + __METHOD__, + array( + 'DISTINCT', + 'ORDER BY' => 'bt_page', + 'LIMIT' => $this->batchSize, + ) + ); + if ( !$res->numRows() ) { + break; + } + foreach ( $res as $row ) { + $this->dispatch( 'doPage', $row->bt_page ); + } + $startId = $row->bt_page; + $this->report( $startId, $endId ); + } + echo "Done moving pages.\n"; + } + + /** + * Display a progress report + */ + function report( $start, $end ) { + $this->numBatches++; + if ( $this->numBatches >= $this->reportingInterval ) { + $this->numBatches = 0; + echo "$start / $end\n"; + wfWaitForSlaves( 5 ); + } + } + + /** + * Move all orphan text to the new clusters + */ + function doAllOrphans() { + $dbr = wfGetDB( DB_SLAVE ); + $startId = 0; + $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_text_id)', + array( 'bt_moved' => 0, 'bt_page' => 0 ), + __METHOD__ ); + if ( !$endId ) { + return; + } + echo "Moving orphans...\n"; + + while ( true ) { + $res = $dbr->select( 'blob_tracking', + array( 'bt_text_id' ), + array( + 'bt_moved' => 0, + 'bt_page' => 0, + 'bt_text_id > ' . $dbr->addQuotes( $startId ) + ), + __METHOD__, + array( + 'DISTINCT', + 'ORDER BY' => 'bt_text_id', + 'LIMIT' => $this->batchSize + ) + ); + if ( !$res->numRows() ) { + break; + } + $args = array( 'doOrphanList' ); + foreach ( $res as $row ) { + $args[] = $row->bt_text_id; + } + call_user_func_array( array( $this, 'dispatch' ), $args ); + $startId = $row->bt_text_id; + $this->report( $startId, $endId ); + } + echo "Done moving orphans.\n"; + } + + /** + * Main entry point for worker processes + */ + function executeChild() { + $this->syncDBs(); + + while ( !feof( STDIN ) ) { + $line = rtrim( fgets( STDIN ) ); + if ( $line == '' ) { + continue; + } + $args = explode( ' ', $line ); + $cmd = array_shift( $args ); + switch ( $cmd ) { + case 'doPage': + $this->doPage( intval( $args[0] ) ); + break; + case 'doOrphanList': + $this->doOrphanList( array_map( 'intval', $args ) ); + break; + case 'quit': + return; + } + } + } + + /** + * Move tracked text in a given page + */ + function doPage( $pageId ) { + $dbr = wfGetDB( DB_SLAVE ); + + // Finish any incomplete transactions + if ( !$this->copyOnly ) { + $this->finishIncompleteMoves(); + } + + $startId = 0; + $trx = new CgzCopyTransaction( $this ); + + while ( true ) { + $res = $dbr->select( + array( 'blob_tracking', 'text' ), + '*', + array( + 'bt_page' => $pageId, + 'bt_text_id > ' . $dbr->addQuotes( $startId ), + 'bt_moved' => 0, + 'bt_new_url' => '', + 'bt_text_id=old_id', + ), + __METHOD__, + array( + 'ORDER BY' => 'bt_text_id', + 'LIMIT' => $this->batchSize + ) + ); + if ( !$res->numRows() ) { + break; + } + + $lastTextId = 0; + foreach ( $res as $row ) { + if ( $lastTextId == $row->bt_text_id ) { + // Duplicate (null edit) + continue; + } + $lastTextId = $row->bt_text_id; + // Load the text + $text = Revision::getRevisionText( $row ); + if ( $text === false ) { + echo "Error loading {$row->bt_rev_id}/{$row->bt_text_id}\n"; + continue; + } + + // Queue it + if ( !$trx->addItem( $text, $row->bt_text_id ) ) { + $trx->commit(); + $trx = new CgzCopyTransaction( $this ); + } + } + $startId = $row->bt_text_id; + } + $trx->commit(); + } + + /** + * Atomic move operation. + * + * Write the new URL to the text table and set the bt_moved flag. + * + * This is done in a single transaction to provide restartable behaviour + * without data loss. + * + * The transaction is kept short to reduce locking. + */ + function moveTextRow( $textId, $url ) { + $dbw = wfGetDB( DB_MASTER ); + $dbw->begin(); + $dbw->update( 'text', + array( // set + 'old_text' => $url, + 'old_flags' => 'external,utf8', + ), + array( // where + 'old_id' => $textId + ), + __METHOD__ + ); + $dbw->update( 'blob_tracking', + array( 'bt_moved' => 1 ), + array( 'bt_text_id' => $textId ), + __METHOD__ + ); + $dbw->commit(); + } + + /** + * Moves are done in two phases: bt_new_url and then bt_moved. + * - bt_new_url indicates that the text has been copied to the new cluster. + * - bt_moved indicates that the text table has been updated. + * + * This function completes any moves that only have done bt_new_url. This + * can happen when the script is interrupted, or when --copy-only is used. + */ + function finishIncompleteMoves() { + $dbr = wfGetDB( DB_SLAVE ); + + $startId = 0; + while ( true ) { + $res = $dbr->select( 'blob_tracking', + '*', + array( + 'bt_text_id > ' . $dbr->addQuotes( $startId ), + 'bt_moved' => 0, + "bt_new_url <> ''", + ), + __METHOD__, + array( + 'ORDER BY' => 'bt_text_id', + 'LIMIT' => $this->batchSize, + ) + ); + if ( !$res->numRows() ) { + break; + } + foreach ( $res as $row ) { + $this->moveTextRow( $row->bt_text_id, $row->bt_new_url ); + } + $startId = $row->bt_text_id; + } + } + + /** + * Returns the name of the next target cluster + */ + function getTargetCluster() { + $cluster = next( $this->destClusters ); + if ( $cluster === false ) { + $cluster = reset( $this->destClusters ); + } + return $cluster; + } + + /** + * Gets a DB master connection for the given external cluster name + */ + function getExtDB( $cluster ) { + $lb = wfGetLBFactory()->getExternalLB( $cluster ); + return $lb->getConnection( DB_MASTER ); + } + + /** + * Move an orphan text_id to the new cluster + */ + function doOrphanList( $textIds ) { + $trx = new CgzCopyTransaction( $this ); + foreach ( $textIds as $textId ) { + $row = wfGetDB( DB_SLAVE )->selectRow( 'text', array( 'old_text', 'old_flags' ), + array( 'old_id' => $textId ), __METHOD__ ); + $text = Revision::getRevisionText( $row ); + if ( $text === false ) { + echo "Error: cannot load revision text for $textId\n"; + continue; + } + + if ( !$trx->addItem( $text, $textId ) ) { + $trx->commit(); + $trx = new CgzCopyTransaction( $this ); + } + } + } +} + +/** + * Class to represent a recompression operation for a single CGZ blob + */ +class CgzCopyTransaction { + var $blobClass; + var $cgz; + var $referrers; + + /** + * Create a transaction from a RecompressTracked object + */ + function __construct( $parent ) { + $this->blobClass = $parent->blobClass; + $this->cgz = false; + $this->texts = array(); + } + + /** + * Add text. + * Returns false if it's ready to commit. + */ + function addItem( $text, $textId ) { + if ( !$this->cgz ) { + $class = $this->blobClass; + $this->cgz = new $class; + } + $hash = $this->cgz->addItem( $text ); + $this->referrers[$textId] = $hash; + $this->texts[$textId] = $text; + return $this->cgz->isHappy(); + } + + /** + * Recompress text after some aberrant modification + */ + function recompress() { + $class = $this->blobClass; + $this->cgz = new $class; + $this->referrers = array(); + foreach ( $this->texts as $textId => $text ) { + $hash = $this->cgz->addItem( $text ); + $this->referrers[$textId] = $hash; + } + } + + /** + * Commit the blob. + * Does nothing if no text items have been added. + * May skip the move if --copy-only is set. + */ + function commit() { + $originalCount = count( $this->texts ); + if ( !$originalCount ) { + return; + } + + // Check to see if the target text_ids have been moved already. + // + // We originally read from the slave, so this can happen when a single + // text_id is shared between multiple pages. It's rare, but possible + // if a delete/move/undelete cycle splits up a null edit. + // + // We do a locking read to prevent closer-run race conditions. + $dbw = wfGetDB( DB_MASTER ); + $dbw->begin(); + $dirty = false; + foreach ( $this->referrers as $textId => $hash ) { + $moved = $dbw->selectField( 'blob_tracking', 'bt_moved', + array( 'bt_text_id' => $textId ), + __METHOD__, + array( 'FOR UPDATE' ) + ); + if ( !$moved ) { + # This row has already been moved, remove it + unset( $this->texts[$textId] ); + $dirty = true; + } + } + + // Recompress the blob if necessary + if ( $dirty ) { + if ( !count( $this->texts ) ) { + // All have been moved already + if ( $originalCount > 1 ) { + // This is suspcious, make noise + echo "Warning: concurrent operation detected, are there two conflicting\n" . + "processes running, doing the same job?\n"; + } + return; + } + $this->recompress(); + } + + // Insert the data into the destination cluster + $targetCluster = $this->parent->getTargetCluster(); + $store = $this->parent->store; + $targetDB = $store->getMaster( $targetCluster ); + $targetDB->clearFlag( DBO_TRX ); // we manage the transactions + $targetDB->begin(); + $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) ); + + // Write the new URLs to the blob_tracking table + foreach ( $this->referrers as $textId => $hash ) { + $url = $baseUrl . '/' . $hash; + $dbw->update( 'blob_tracking', + array( 'bt_new_url' => $url ), + array( + 'bt_text_id' => $textId, + 'bt_moved' => 0, # Check for concurrent conflicting update + ), + __METHOD__ + ); + } + + $targetDB->commit(); + // Critical section here: interruption at this point causes blob duplication + // Reversing the order of the commits would cause data loss instead + $dbw->commit(); + + // Write the new URLs to the text table and set the moved flag + if ( !$this->parent->copyOnly ) { + foreach ( $this->referrers as $textId => $hash ) { + $url = $baseUrl . '/' . $hash; + $this->parent->moveTextRow( $textId, $url ); + } + } + } + + function signalHandler() { + $this->signalled = true; + } +} +