* Added $wgDebugLogPrefix, to allow users (or in this case, multiprocess command...
[lhc/web/wiklou.git] / maintenance / storage / recompressTracked.php
index 9c32f12..1bb15b7 100644 (file)
@@ -4,9 +4,13 @@ $optionsWithArgs = RecompressTracked::getOptionsWithArgs();
 require( dirname( __FILE__ ) .'/../commandLine.inc' );
 
 if ( count( $args ) < 1 ) {
-       echo "Usage: php recompressTracked.php <cluster> [... <cluster>...]\n";
-       echo "Moves blobs indexed by trackBlobs.php to a specified list of destination 
-clusters, and recompresses them in the process. Restartable.\n";
+       echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
+Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable.
+
+Options: 
+    --procs <procs>     Set the number of child processes (default 8)
+    --copy-only         Copy only, do not update the text table. Restart without this option to complete.
+";
        exit( 1 );
 }
 
@@ -18,17 +22,16 @@ class RecompressTracked {
        var $batchSize = 1000;
        var $reportingInterval = 10;
        var $numProcs = 8;
+       var $useDiff, $pageBlobClass, $orphanBlobClass;
        var $slavePipes, $slaveProcs, $prevSlaveId;
-       var $blobClass = 'DiffHistoryBlob';
        var $copyOnly = false;
        var $isChild = false;
        var $slaveId = false;
        var $store;
 
-       static $optionsWithArgs = array( 'procs', 'class' );
+       static $optionsWithArgs = array( 'procs', 'slave-id' );
        static $cmdLineOptionMap = array(
                'procs' => 'numProcs',
-               'class' => 'blobClass',
                'copy-only' => 'copyOnly',
                'child' => 'isChild',
                'slave-id' => 'slaveId',
@@ -53,14 +56,18 @@ class RecompressTracked {
                        $this->$name = $value;
                }
                $this->store = new ExternalStoreDB;
+               if ( !$this->isChild ) {
+                       $GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
+               } elseif ( $this->slaveId !== false ) {
+                       $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: ";
+               }
+               $this->useDiff = function_exists( 'xdiff_string_bdiff' );
+               $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob';
+               $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob';
        }
 
        function debug( $msg ) {
-               if ( $this->slaveId !== false ) {
-                       $msg = "{$this->slaveId}: $msg";
-               }
-               $msg .= "\n";
-               wfDebug( $msg );
+               wfDebug( "$msg\n" );
        }
 
        /**
@@ -146,7 +153,7 @@ class RecompressTracked {
                                array( 'file', '/dev/stderr', 'w' )
                        );
                        wfSuppressWarnings();
-                       $proc = proc_open( $cmd, $spec, $pipes );
+                       $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes );
                        wfRestoreWarnings();
                        if ( !$proc ) {
                                echo "Error opening slave process\n";
@@ -299,6 +306,7 @@ class RecompressTracked {
         * Main entry point for worker processes
         */
        function executeChild() {
+               $this->debug( 'starting' );
                $this->syncDBs();
 
                while ( !feof( STDIN ) ) {
@@ -306,6 +314,7 @@ class RecompressTracked {
                        if ( $line == '' ) {
                                continue;
                        }
+                       $this->debug( $line );
                        $args = explode( ' ', $line );
                        $cmd = array_shift( $args );
                        switch ( $cmd ) {
@@ -325,15 +334,21 @@ class RecompressTracked {
         * Move tracked text in a given page
         */
        function doPage( $pageId ) {
+               $title = Title::newFromId( $pageId );
+               if ( $title ) {
+                       $titleText = $title->getPrefixedText();
+               } else {
+                       $titleText = '[deleted]';
+               }
                $dbr = wfGetDB( DB_SLAVE );
 
                // Finish any incomplete transactions
                if ( !$this->copyOnly ) {
-                       $this->finishIncompleteMoves();
+                       $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) );
                }
 
                $startId = 0;
-               $trx = new CgzCopyTransaction( $this );
+               $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
 
                while ( true ) {
                        $res = $dbr->select( 
@@ -343,7 +358,7 @@ class RecompressTracked {
                                        'bt_page' => $pageId,
                                        'bt_text_id > ' . $dbr->addQuotes( $startId ),
                                        'bt_moved' => 0,
-                                       'bt_new_url' => '',
+                                       'bt_new_url IS NULL',
                                        'bt_text_id=old_id',
                                ),
                                __METHOD__,
@@ -372,12 +387,15 @@ class RecompressTracked {
 
                                // Queue it
                                if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
+                                       $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
                                        $trx->commit();
-                                       $trx = new CgzCopyTransaction( $this );
+                                       $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
                                }
                        }
                        $startId = $row->bt_text_id;
                }
+
+               $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
                $trx->commit();
        }
 
@@ -420,18 +438,18 @@ class RecompressTracked {
         * This function completes any moves that only have done bt_new_url. This
         * can happen when the script is interrupted, or when --copy-only is used.
         */
-       function finishIncompleteMoves() {
+       function finishIncompleteMoves( $conds ) {
                $dbr = wfGetDB( DB_SLAVE );
 
                $startId = 0;
+               $conds = array_merge( $conds, array( 
+                       'bt_moved' => 0,
+                       'bt_new_url IS NOT NULL'
+               ));
                while ( true ) {
                        $res = $dbr->select( 'blob_tracking',
                                '*',
-                               array(
-                                       'bt_text_id > ' . $dbr->addQuotes( $startId ),
-                                       'bt_moved' => 0,
-                                       "bt_new_url <> ''",
-                               ),
+                               array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ),
                                __METHOD__,
                                array( 
                                        'ORDER BY' => 'bt_text_id',
@@ -441,6 +459,7 @@ class RecompressTracked {
                        if ( !$res->numRows() ) {
                                break;
                        }
+                       $this->debug( 'Incomplete: ' . $row->numRows() . ' rows' );
                        foreach ( $res as $row ) {
                                $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
                        }
@@ -471,7 +490,10 @@ class RecompressTracked {
         * Move an orphan text_id to the new cluster
         */
        function doOrphanList( $textIds ) {
-               $trx = new CgzCopyTransaction( $this );
+               // Finish incomplete moves
+               $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) );
+               
+               $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
                foreach ( $textIds as $textId ) {
                        $row = wfGetDB( DB_SLAVE )->selectRow( 'text', array( 'old_text', 'old_flags' ), 
                                array( 'old_id' => $textId ), __METHOD__ );
@@ -482,10 +504,13 @@ class RecompressTracked {
                        }
                        
                        if ( !$trx->addItem( $text, $textId ) ) {
+                               $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
                                $trx->commit();
-                               $trx = new CgzCopyTransaction( $this );
+                               $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
                        }
                }
+               $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
+               $trx->commit();
        }
 }
 
@@ -493,6 +518,7 @@ class RecompressTracked {
  * Class to represent a recompression operation for a single CGZ blob
  */
 class CgzCopyTransaction {
+       var $parent;
        var $blobClass;
        var $cgz;
        var $referrers;
@@ -500,10 +526,11 @@ class CgzCopyTransaction {
        /**
         * Create a transaction from a RecompressTracked object
         */
-       function __construct( $parent ) {
-               $this->blobClass = $parent->blobClass;
+       function __construct( $parent, $blobClass ) {
+               $this->blobClass = $blobClass;
                $this->cgz = false;
                $this->texts = array();
+               $this->parent = $parent;
        }
 
        /**
@@ -521,6 +548,10 @@ class CgzCopyTransaction {
                return $this->cgz->isHappy();
        }
 
+       function getSize() {
+               return count( $this->texts );
+       }
+
        /**
         * Recompress text after some aberrant modification
         */
@@ -554,16 +585,16 @@ class CgzCopyTransaction {
                // We do a locking read to prevent closer-run race conditions.
                $dbw = wfGetDB( DB_MASTER );
                $dbw->begin();
+               $res = $dbw->select( 'blob_tracking', 
+                       array( 'bt_text_id', 'bt_moved' ),
+                       array( 'bt_text_id' => array_keys( $this->referrers ) ),
+                       __METHOD__, array( 'FOR UPDATE' ) );
                $dirty = false;
-               foreach ( $this->referrers as $textId => $hash ) {
-                       $moved = $dbw->selectField( 'blob_tracking', 'bt_moved',
-                               array( 'bt_text_id' => $textId ),
-                               __METHOD__,
-                               array( 'FOR UPDATE' )
-                       );
-                       if ( !$moved ) {
+               foreach ( $res as $row ) {
+                       if ( $row->bt_moved ) {
                                # This row has already been moved, remove it
-                               unset( $this->texts[$textId] );
+                               $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" );
+                               unset( $this->texts[$row->bt_text_id] );
                                $dirty = true;
                        }
                }
@@ -574,7 +605,7 @@ class CgzCopyTransaction {
                                // All have been moved already
                                if ( $originalCount > 1 ) {
                                        // This is suspcious, make noise
-                                       echo "Warning: concurrent operation detected, are there two conflicting\n" .
+                                       echo "Warning: concurrent operation detected, are there two conflicting " .
                                                "processes running, doing the same job?\n";
                                }
                                return;
@@ -616,9 +647,5 @@ class CgzCopyTransaction {
                        }
                }
        }
-
-       function signalHandler() {
-               $this->signalled = true;
-       }
 }