Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable.
Options:
- --procs <procs> Set the number of child processes (default 8)
- --copy-only Copy only, do not update the text table. Restart without this option to complete.
+ --procs <procs> Set the number of child processes (default 1)
+ --copy-only Copy only, do not update the text table. Restart without this option to complete.
+ --debug-log <file> Log debugging data to the specified file
+ --info-log <file> Log progress messages to the specified file
+ --critical-log <file> Log error messages to the specified file
";
exit( 1 );
}
class RecompressTracked {
var $destClusters;
var $batchSize = 1000;
+ var $orphanBatchSize = 1000;
var $reportingInterval = 10;
- var $numProcs = 8;
+ var $numProcs = 1;
var $useDiff, $pageBlobClass, $orphanBlobClass;
var $slavePipes, $slaveProcs, $prevSlaveId;
var $copyOnly = false;
var $isChild = false;
var $slaveId = false;
+ var $debugLog, $infoLog, $criticalLog;
var $store;
- static $optionsWithArgs = array( 'procs', 'slave-id' );
+ static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' );
static $cmdLineOptionMap = array(
'procs' => 'numProcs',
'copy-only' => 'copyOnly',
'child' => 'isChild',
'slave-id' => 'slaveId',
+ 'debug-log' => 'debugLog',
+ 'info-log' => 'infoLog',
+ 'critical-log' => 'criticalLog',
);
static function getOptionsWithArgs() {
function debug( $msg ) {
wfDebug( "$msg\n" );
+ if ( $this->debugLog ) {
+ $this->logToFile( $msg, $this->debugLog );
+ }
+
+ }
+
+ function info( $msg ) {
+ echo "$msg\n";
+ if ( $this->infoLog ) {
+ $this->logToFile( $msg, $this->infoLog );
+ }
+ }
+
+ function critical( $msg ) {
+ echo "$msg\n";
+ if ( $this->criticalLog ) {
+ $this->logToFile( $msg, $this->criticalLog );
+ }
+ }
+
+ function logToFile( $msg, $file ) {
+ $header = '[' . date('d\TH:i:s') . '] ' . wfHostname() . ' ' . posix_getpid();
+ if ( $this->slaveId !== false ) {
+ $header .= "({$this->slaveId})";
+ }
+ $header .= ' ' . wfWikiID();
+ wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file );
}
/**
function checkTrackingTable() {
$dbr = wfGetDB( DB_SLAVE );
if ( !$dbr->tableExists( 'blob_tracking' ) ) {
- echo "Error: blob_tracking table does not exist\n";
+ $this->critical( "Error: blob_tracking table does not exist" );
return false;
}
$row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ );
if ( !$row ) {
- echo "Warning: blob_tracking table contains no rows, skipping this wiki.\n";
+ $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." );
return false;
}
return true;
$proc = proc_open( "$cmd --slave-id $i", $spec, $pipes );
wfRestoreWarnings();
if ( !$proc ) {
- echo "Error opening slave process\n";
+ $this->critical( "Error opening slave process" );
exit( 1 );
}
$this->slaveProcs[$i] = $proc;
* Gracefully terminate the child processes
*/
function killSlaveProcs() {
+ $this->info( "Waiting for slave processes to finish..." );
for ( $i = 0; $i < $this->numProcs; $i++ ) {
$this->dispatchToSlave( $i, 'quit' );
}
for ( $i = 0; $i < $this->numProcs; $i++ ) {
- proc_close( $this->slaveProcs[$i] );
+ $status = proc_close( $this->slaveProcs[$i] );
+ if ( $status ) {
+ $this->critical( "Warning: child #$i exited with status $status" );
+ }
}
+ $this->info( "Done." );
}
/**
$pipes = $this->slavePipes;
$numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 );
if ( !$numPipes ) {
- echo "Error waiting to write to slaves. Aborting\n";
+ $this->critical( "Error waiting to write to slaves. Aborting" );
exit( 1 );
}
for ( $i = 0; $i < $this->numProcs; $i++ ) {
return;
}
}
- echo "Unreachable\n";
+ $this->critical( "Unreachable" );
exit( 1 );
}
*/
function doAllPages() {
$dbr = wfGetDB( DB_SLAVE );
+ $i = 0;
$startId = 0;
- $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_page)',
+ $numPages = $dbr->selectField( 'blob_tracking',
+ 'COUNT(DISTINCT bt_page)',
# A condition is required so that this query uses the index
array( 'bt_moved' => 0 ),
- __METHOD__ );
- echo "Moving pages...\n";
+ __METHOD__
+ );
+ if ( $this->copyOnly ) {
+ $this->info( "Copying pages..." );
+ } else {
+ $this->info( "Moving pages..." );
+ }
while ( true ) {
$res = $dbr->select( 'blob_tracking',
array( 'bt_page' ),
}
foreach ( $res as $row ) {
$this->dispatch( 'doPage', $row->bt_page );
+ $i++;
}
$startId = $row->bt_page;
- $this->report( $startId, $endId );
+ $this->report( 'pages', $i, $numPages );
+ }
+ $this->report( 'pages', $i, $numPages );
+ if ( $this->copyOnly ) {
+ $this->info( "All page copies queued." );
+ } else {
+ $this->info( "All page moves queued." );
}
- echo "Done moving pages.\n";
}
/**
* Display a progress report
*/
- function report( $start, $end ) {
+ function report( $label, $current, $end ) {
$this->numBatches++;
- if ( $this->numBatches >= $this->reportingInterval ) {
+ if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
$this->numBatches = 0;
- echo "$start / $end\n";
+ $this->info( "$label: $current / $end" );
wfWaitForSlaves( 5 );
}
}
function doAllOrphans() {
$dbr = wfGetDB( DB_SLAVE );
$startId = 0;
- $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_text_id)',
+ $i = 0;
+ $numOrphans = $dbr->selectField( 'blob_tracking',
+ 'COUNT(DISTINCT bt_text_id)',
array( 'bt_moved' => 0, 'bt_page' => 0 ),
__METHOD__ );
- if ( !$endId ) {
+ if ( !$numOrphans ) {
return;
}
- echo "Moving orphans...\n";
+ if ( $this->copyOnly ) {
+ $this->info( "Copying orphans..." );
+ } else {
+ $this->info( "Moving orphans..." );
+ }
+ $ids = array();
while ( true ) {
$res = $dbr->select( 'blob_tracking',
if ( !$res->numRows() ) {
break;
}
- $args = array( 'doOrphanList' );
foreach ( $res as $row ) {
- $args[] = $row->bt_text_id;
+ $ids[] = $row->bt_text_id;
+ $i++;
+ }
+ // Need to send enough orphan IDs to the child at a time to fill a blob,
+ // so orphanBatchSize needs to be at least ~100.
+ // batchSize can be smaller or larger.
+ while ( count( $ids ) > $this->orphanBatchSize ) {
+ $args = array_slice( $ids, 0, $this->orphanBatchSize );
+ $ids = array_slice( $ids, $this->orphanBatchSize );
+ array_unshift( $args, 'doOrphanList' );
+ call_user_func_array( array( $this, 'dispatch' ), $args );
}
- call_user_func_array( array( $this, 'dispatch' ), $args );
$startId = $row->bt_text_id;
- $this->report( $startId, $endId );
+ $this->report( 'orphans', $i, $numOrphans );
}
- echo "Done moving orphans.\n";
+ $this->report( 'orphans', $i, $numOrphans );
+ $this->info( "All orphans queued." );
}
/**
// Finish any incomplete transactions
if ( !$this->copyOnly ) {
$this->finishIncompleteMoves( array( 'bt_page' => $pageId ) );
+ $this->syncDBs();
}
$startId = 0;
// Load the text
$text = Revision::getRevisionText( $row );
if ( $text === false ) {
- echo "Error loading {$row->bt_rev_id}/{$row->bt_text_id}\n";
+ $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
continue;
}
* The transaction is kept short to reduce locking.
*/
function moveTextRow( $textId, $url ) {
+ if ( $this->copyOnly ) {
+ $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" );
+ exit( 1 );
+ }
$dbw = wfGetDB( DB_MASTER );
$dbw->begin();
$dbw->update( 'text',
*/
function doOrphanList( $textIds ) {
// Finish incomplete moves
- $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) );
+ if ( !$this->copyOnly ) {
+ $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) );
+ $this->syncDBs();
+ }
$trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
- foreach ( $textIds as $textId ) {
- $row = wfGetDB( DB_SLAVE )->selectRow( 'text', array( 'old_text', 'old_flags' ),
- array( 'old_id' => $textId ), __METHOD__ );
+
+ $res = wfGetDB( DB_SLAVE )->select(
+ array( 'text', 'blob_tracking' ),
+ array( 'old_id', 'old_text', 'old_flags' ),
+ array(
+ 'old_id' => $textIds,
+ 'bt_text_id=old_id',
+ 'bt_moved' => 0,
+ ),
+ __METHOD__,
+ array( 'DISTINCT' )
+ );
+
+ foreach ( $res as $row ) {
$text = Revision::getRevisionText( $row );
if ( $text === false ) {
- echo "Error: cannot load revision text for $textId\n";
+ $this->critical( "Error: cannot load revision text for old_id=$textId" );
continue;
}
- if ( !$trx->addItem( $text, $textId ) ) {
+ if ( !$trx->addItem( $text, $row->old_id ) ) {
$this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
$trx->commit();
$trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
// All have been moved already
if ( $originalCount > 1 ) {
// This is suspcious, make noise
- echo "Warning: concurrent operation detected, are there two conflicting " .
- "processes running, doing the same job?\n";
+ $this->critical( "Warning: concurrent operation detected, are there two conflicting " .
+ "processes running, doing the same job?" );
}
return;
}