From: Tim Starling Date: Mon, 3 Nov 2008 21:06:15 +0000 (+0000) Subject: Assorted tweaks on further review and local testing: X-Git-Tag: 1.31.0-rc.0~44445 X-Git-Url: http://git.cyclocoop.org/%7B%24admin_url%7Dcompta/comptes/journal.php?a=commitdiff_plain;h=54db57f69dae06acd9d6a3550410b971da30484c;p=lhc%2Fweb%2Fwiklou.git Assorted tweaks on further review and local testing: * Default 1 child process * Introduce a log system so that critical errors can be filtered out of the progress messages * Warn on abnormal child termination * Use the actual row count for progress instead of the end ID. I think we can afford to count rows. * Check for bt_moved=0 in doOrphanList() after finishIncompleteMoves(), to avoid attempted double-move and resulting spurious "conflict detected" warning --- diff --git a/maintenance/storage/recompressTracked.php b/maintenance/storage/recompressTracked.php index d98b42f1df..e1866a8c37 100644 --- a/maintenance/storage/recompressTracked.php +++ b/maintenance/storage/recompressTracked.php @@ -8,8 +8,11 @@ if ( count( $args ) < 1 ) { Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable. Options: - --procs Set the number of child processes (default 8) - --copy-only Copy only, do not update the text table. Restart without this option to complete. + --procs Set the number of child processes (default 1) + --copy-only Copy only, do not update the text table. Restart without this option to complete. + --debug-log Log debugging data to the specified file + --info-log Log progress messages to the specified file + --critical-log Log error messages to the specified file "; exit( 1 ); } @@ -20,21 +23,26 @@ $job->execute(); class RecompressTracked { var $destClusters; var $batchSize = 1000; + var $orphanBatchSize = 1000; var $reportingInterval = 10; - var $numProcs = 8; + var $numProcs = 1; var $useDiff, $pageBlobClass, $orphanBlobClass; var $slavePipes, $slaveProcs, $prevSlaveId; var $copyOnly = false; var $isChild = false; var $slaveId = false; + var $debugLog, $infoLog, $criticalLog; var $store; - static $optionsWithArgs = array( 'procs', 'slave-id' ); + static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' ); static $cmdLineOptionMap = array( 'procs' => 'numProcs', 'copy-only' => 'copyOnly', 'child' => 'isChild', 'slave-id' => 'slaveId', + 'debug-log' => 'debugLog', + 'info-log' => 'infoLog', + 'critical-log' => 'criticalLog', ); static function getOptionsWithArgs() { @@ -68,6 +76,33 @@ class RecompressTracked { function debug( $msg ) { wfDebug( "$msg\n" ); + if ( $this->debugLog ) { + $this->logToFile( $msg, $this->debugLog ); + } + + } + + function info( $msg ) { + echo "$msg\n"; + if ( $this->infoLog ) { + $this->logToFile( $msg, $this->infoLog ); + } + } + + function critical( $msg ) { + echo "$msg\n"; + if ( $this->criticalLog ) { + $this->logToFile( $msg, $this->criticalLog ); + } + } + + function logToFile( $msg, $file ) { + $header = '[' . date('d\TH:i:s') . '] ' . wfHostname() . ' ' . posix_getpid(); + if ( $this->slaveId !== false ) { + $header .= "({$this->slaveId})"; + } + $header .= ' ' . wfWikiID(); + wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file ); } /** @@ -114,12 +149,12 @@ class RecompressTracked { function checkTrackingTable() { $dbr = wfGetDB( DB_SLAVE ); if ( !$dbr->tableExists( 'blob_tracking' ) ) { - echo "Error: blob_tracking table does not exist\n"; + $this->critical( "Error: blob_tracking table does not exist" ); return false; } $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ ); if ( !$row ) { - echo "Warning: blob_tracking table contains no rows, skipping this wiki.\n"; + $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." ); return false; } return true; @@ -156,7 +191,7 @@ class RecompressTracked { $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes ); wfRestoreWarnings(); if ( !$proc ) { - echo "Error opening slave process\n"; + $this->critical( "Error opening slave process" ); exit( 1 ); } $this->slaveProcs[$i] = $proc; @@ -169,12 +204,17 @@ class RecompressTracked { * Gracefully terminate the child processes */ function killSlaveProcs() { + $this->info( "Waiting for slave processes to finish..." ); for ( $i = 0; $i < $this->numProcs; $i++ ) { $this->dispatchToSlave( $i, 'quit' ); } for ( $i = 0; $i < $this->numProcs; $i++ ) { - proc_close( $this->slaveProcs[$i] ); + $status = proc_close( $this->slaveProcs[$i] ); + if ( $status ) { + $this->critical( "Warning: child #$i exited with status $status" ); + } } + $this->info( "Done." ); } /** @@ -186,7 +226,7 @@ class RecompressTracked { $pipes = $this->slavePipes; $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 ); if ( !$numPipes ) { - echo "Error waiting to write to slaves. Aborting\n"; + $this->critical( "Error waiting to write to slaves. Aborting" ); exit( 1 ); } for ( $i = 0; $i < $this->numProcs; $i++ ) { @@ -197,7 +237,7 @@ class RecompressTracked { return; } } - echo "Unreachable\n"; + $this->critical( "Unreachable" ); exit( 1 ); } @@ -215,12 +255,19 @@ class RecompressTracked { */ function doAllPages() { $dbr = wfGetDB( DB_SLAVE ); + $i = 0; $startId = 0; - $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_page)', + $numPages = $dbr->selectField( 'blob_tracking', + 'COUNT(DISTINCT bt_page)', # A condition is required so that this query uses the index array( 'bt_moved' => 0 ), - __METHOD__ ); - echo "Moving pages...\n"; + __METHOD__ + ); + if ( $this->copyOnly ) { + $this->info( "Copying pages..." ); + } else { + $this->info( "Moving pages..." ); + } while ( true ) { $res = $dbr->select( 'blob_tracking', array( 'bt_page' ), @@ -240,21 +287,27 @@ class RecompressTracked { } foreach ( $res as $row ) { $this->dispatch( 'doPage', $row->bt_page ); + $i++; } $startId = $row->bt_page; - $this->report( $startId, $endId ); + $this->report( 'pages', $i, $numPages ); + } + $this->report( 'pages', $i, $numPages ); + if ( $this->copyOnly ) { + $this->info( "All page copies queued." ); + } else { + $this->info( "All page moves queued." ); } - echo "Done moving pages.\n"; } /** * Display a progress report */ - function report( $start, $end ) { + function report( $label, $current, $end ) { $this->numBatches++; - if ( $this->numBatches >= $this->reportingInterval ) { + if ( $current == $end || $this->numBatches >= $this->reportingInterval ) { $this->numBatches = 0; - echo "$start / $end\n"; + $this->info( "$label: $current / $end" ); wfWaitForSlaves( 5 ); } } @@ -265,13 +318,20 @@ class RecompressTracked { function doAllOrphans() { $dbr = wfGetDB( DB_SLAVE ); $startId = 0; - $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_text_id)', + $i = 0; + $numOrphans = $dbr->selectField( 'blob_tracking', + 'COUNT(DISTINCT bt_text_id)', array( 'bt_moved' => 0, 'bt_page' => 0 ), __METHOD__ ); - if ( !$endId ) { + if ( !$numOrphans ) { return; } - echo "Moving orphans...\n"; + if ( $this->copyOnly ) { + $this->info( "Copying orphans..." ); + } else { + $this->info( "Moving orphans..." ); + } + $ids = array(); while ( true ) { $res = $dbr->select( 'blob_tracking', @@ -291,15 +351,24 @@ class RecompressTracked { if ( !$res->numRows() ) { break; } - $args = array( 'doOrphanList' ); foreach ( $res as $row ) { - $args[] = $row->bt_text_id; + $ids[] = $row->bt_text_id; + $i++; + } + // Need to send enough orphan IDs to the child at a time to fill a blob, + // so orphanBatchSize needs to be at least ~100. + // batchSize can be smaller or larger. + while ( count( $ids ) > $this->orphanBatchSize ) { + $args = array_slice( $ids, 0, $this->orphanBatchSize ); + $ids = array_slice( $ids, $this->orphanBatchSize ); + array_unshift( $args, 'doOrphanList' ); + call_user_func_array( array( $this, 'dispatch' ), $args ); } - call_user_func_array( array( $this, 'dispatch' ), $args ); $startId = $row->bt_text_id; - $this->report( $startId, $endId ); + $this->report( 'orphans', $i, $numOrphans ); } - echo "Done moving orphans.\n"; + $this->report( 'orphans', $i, $numOrphans ); + $this->info( "All orphans queued." ); } /** @@ -345,6 +414,7 @@ class RecompressTracked { // Finish any incomplete transactions if ( !$this->copyOnly ) { $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) ); + $this->syncDBs(); } $startId = 0; @@ -381,7 +451,7 @@ class RecompressTracked { // Load the text $text = Revision::getRevisionText( $row ); if ( $text === false ) { - echo "Error loading {$row->bt_rev_id}/{$row->bt_text_id}\n"; + $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" ); continue; } @@ -410,6 +480,10 @@ class RecompressTracked { * The transaction is kept short to reduce locking. */ function moveTextRow( $textId, $url ) { + if ( $this->copyOnly ) { + $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" ); + exit( 1 ); + } $dbw = wfGetDB( DB_MASTER ); $dbw->begin(); $dbw->update( 'text', @@ -491,19 +565,33 @@ class RecompressTracked { */ function doOrphanList( $textIds ) { // Finish incomplete moves - $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) ); + if ( !$this->copyOnly ) { + $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) ); + $this->syncDBs(); + } $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); - foreach ( $textIds as $textId ) { - $row = wfGetDB( DB_SLAVE )->selectRow( 'text', array( 'old_text', 'old_flags' ), - array( 'old_id' => $textId ), __METHOD__ ); + + $res = wfGetDB( DB_SLAVE )->select( + array( 'text', 'blob_tracking' ), + array( 'old_id', 'old_text', 'old_flags' ), + array( + 'old_id' => $textIds, + 'bt_text_id=old_id', + 'bt_moved' => 0, + ), + __METHOD__, + array( 'DISTINCT' ) + ); + + foreach ( $res as $row ) { $text = Revision::getRevisionText( $row ); if ( $text === false ) { - echo "Error: cannot load revision text for $textId\n"; + $this->critical( "Error: cannot load revision text for old_id=$textId" ); continue; } - if ( !$trx->addItem( $text, $textId ) ) { + if ( !$trx->addItem( $text, $row->old_id ) ) { $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" ); $trx->commit(); $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); @@ -605,8 +693,8 @@ class CgzCopyTransaction { // All have been moved already if ( $originalCount > 1 ) { // This is suspcious, make noise - echo "Warning: concurrent operation detected, are there two conflicting " . - "processes running, doing the same job?\n"; + $this->critical( "Warning: concurrent operation detected, are there two conflicting " . + "processes running, doing the same job?" ); } return; }