Refactored and added orphan blob search
authorTim Starling <tstarling@users.mediawiki.org>
Sun, 19 Oct 2008 06:10:53 +0000 (06:10 +0000)
committerTim Starling <tstarling@users.mediawiki.org>
Sun, 19 Oct 2008 06:10:53 +0000 (06:10 +0000)
maintenance/storage/blob_tracking.sql
maintenance/storage/trackBlobs.php

index 119bd3b..007aa8f 100644 (file)
@@ -34,3 +34,11 @@ CREATE TABLE /*$wgDBprefix*/blob_tracking (
        KEY (bt_cluster, bt_blob_id, bt_cgz_hash)
 ) /*$wgDBTableOptions*/;
 
+-- Tracking table for blob rows that aren't tracked by the text table
+CREATE TABLE /*$wgDBprefix*/blob_orphans (
+       bo_cluster varbinary(255),
+       bo_blob_id integer not null,
+
+       PRIMARY KEY (bo_cluster, bo_blob_id)
+) /*$wgDBTableOptions*/;
+
index 36eb731..653a8c3 100644 (file)
@@ -8,193 +8,300 @@ if ( count( $args ) < 1 ) {
        echo "Adds blobs from a given ES cluster to the blob_tracking table\n";
        exit( 1 );
 }
+$tracker = new TrackBlobs( $args );
+$tracker->trackBlobs();
 
-trackBlobs( $args );
+class TrackBlobs {
+       var $clusters, $textClause;
+       var $doBlobOrphans;
+       var $trackedBlobs = array();
 
-function trackBlobs( $clusters ) {
-       initTrackingTable();
-       trackRevisions( $clusters );
-       trackOrphans( $clusters );
-}
+       var $batchSize = 1000;
+       var $reportingInterval = 10;
 
-function initTrackingTable() {
-       $dbw = wfGetDB( DB_MASTER );
-       if ( !$dbw->tableExists( 'blob_tracking' ) ) {
-               $dbw->sourceFile( dirname( __FILE__ ) . '/blob_tracking.sql' );
+       function __construct( $clusters ) {
+               $this->clusters = $clusters;
+               if ( extension_loaded( 'gmp' ) ) {
+                       $this->doBlobOrphans = true;
+                       foreach ( $clusters as $cluster ) {
+                               $this->trackedBlobs[$cluster] = gmp_init( 0 );
+                       }
+               } else {
+                       echo "Warning: the gmp extension is needed to find orphan blobs\n";
+               }
        }
-}
 
-function getTextClause( $clusters ) {
-       $dbr = wfGetDB( DB_SLAVE );
-       $textClause = '';
-       foreach ( $clusters as $cluster ) {
-               if ( $textClause != '' ) {
-                       $textClause .= ' OR ';
+       function trackBlobs() {
+               $this->initTrackingTable();
+               $this->trackRevisions();
+               $this->trackOrphanText();
+               if ( $this->doBlobOrphans ) {
+                       $this->findOrphanBlobs();
+               }
+       }
+
+       function initTrackingTable() {
+               $dbw = wfGetDB( DB_MASTER );
+               if ( !$dbw->tableExists( 'blob_tracking' ) ) {
+                       $dbw->sourceFile( dirname( __FILE__ ) . '/blob_tracking.sql' );
                }
-               $textClause .= 'old_text LIKE ' . $dbr->addQuotes( $dbr->escapeLike( "DB://$cluster/" ) . '%' );
        }
-       return $textClause;
-}
 
-function interpretPointer( $text ) {
-       if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) {
-               return false;
+       function getTextClause() {
+               if ( !$this->textClause ) {
+                       $dbr = wfGetDB( DB_SLAVE );
+                       $this->textClause = '';
+                       foreach ( $this->clusters as $cluster ) {
+                               if ( $this->textClause != '' ) {
+                                       $this->textClause .= ' OR ';
+                               }
+                               $this->textClause .= 'old_text LIKE ' . $dbr->addQuotes( $dbr->escapeLike( "DB://$cluster/" ) . '%' );
+                       }
+               }
+               return $this->textClause;
        }
-       return array(
-               'cluster' => $m[1],
-               'id' => intval( $m[2] ),
-               'hash' => isset( $m[3] ) ? $m[2] : null
-       );
-}
 
-/**
- *  Scan the revision table for rows stored in the specified clusters
- */
-function trackRevisions( $clusters ) {
-       $dbw = wfGetDB( DB_MASTER );
-       $dbr = wfGetDB( DB_SLAVE );
-       $batchSize = 10;
-       $reportingInterval = 10;
-
-       $textClause = getTextClause( $clusters );
-       $startId = 0;
-       $endId = $dbr->selectField( 'revision', 'MAX(rev_id)', false, __METHOD__ );
-       $batchesDone = 0;
-       $rowsInserted = 0;
-
-       echo "Finding revisions...\n";
-
-       while ( true ) {
-               $res = $dbr->select( array( 'revision', 'text' ),
-                       array( 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ),
-                       array(
-                               'rev_id > ' . $dbr->addQuotes( $startId ),
-                               'rev_text_id=old_id',
-                               $textClause,
-                               "old_flags LIKE '%external%'",
-                       ),
-                       __METHOD__,
-                       array(
-                               'ORDER BY' => 'rev_id',
-                               'LIMIT' => $batchSize
-                       )
-               );
-               if ( !$res->numRows() ) {
-                       break;
+       function interpretPointer( $text ) {
+               if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) {
+                       return false;
                }
+               return array(
+                       'cluster' => $m[1],
+                       'id' => intval( $m[2] ),
+                       'hash' => isset( $m[3] ) ? $m[2] : null
+               );
+       }
+
+       /**
+        *  Scan the revision table for rows stored in the specified clusters
+        */
+       function trackRevisions() {
+               $dbw = wfGetDB( DB_MASTER );
+               $dbr = wfGetDB( DB_SLAVE );
 
-               $insertBatch = array();
-               foreach ( $res as $row ) {
-                       $startId = $row->rev_id;
-                       $info = interpretPointer( $row->old_text );
-                       if ( !$info ) {
-                               echo "Invalid DB:// URL in rev_id {$row->rev_id}\n";
-                               continue;
+               $textClause = $this->getTextClause();
+               $startId = 0;
+               $endId = $dbr->selectField( 'revision', 'MAX(rev_id)', false, __METHOD__ );
+               $batchesDone = 0;
+               $rowsInserted = 0;
+
+               echo "Finding revisions...\n";
+
+               while ( true ) {
+                       $res = $dbr->select( array( 'revision', 'text' ),
+                               array( 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ),
+                               array(
+                                       'rev_id > ' . $dbr->addQuotes( $startId ),
+                                       'rev_text_id=old_id',
+                                       $textClause,
+                                       "old_flags LIKE '%external%'",
+                               ),
+                               __METHOD__,
+                               array(
+                                       'ORDER BY' => 'rev_id',
+                                       'LIMIT' => $this->batchSize
+                               )
+                       );
+                       if ( !$res->numRows() ) {
+                               break;
                        }
-                       if ( !in_array( $info['cluster'], $clusters ) ) {
-                               echo "Invalid cluster returned in SQL query: {$info['cluster']}\n";
-                               continue;
+
+                       $insertBatch = array();
+                       foreach ( $res as $row ) {
+                               $startId = $row->rev_id;
+                               $info = $this->interpretPointer( $row->old_text );
+                               if ( !$info ) {
+                                       echo "Invalid DB:// URL in rev_id {$row->rev_id}\n";
+                                       continue;
+                               }
+                               if ( !in_array( $info['cluster'], $this->clusters ) ) {
+                                       echo "Invalid cluster returned in SQL query: {$info['cluster']}\n";
+                                       continue;
+                               }
+                               $insertBatch[] = array(
+                                       'bt_page' => $row->rev_page,
+                                       'bt_rev_id' => $row->rev_id,
+                                       'bt_text_id' => $row->old_id,
+                                       'bt_cluster' => $info['cluster'],
+                                       'bt_blob_id' => $info['id'],
+                                       'bt_cgz_hash' => $info['hash']
+                               );
+                               if ( $this->doBlobOrphans ) {
+                                       gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
+                               }
                        }
-                       $insertBatch[] = array(
-                               'bt_page' => $row->rev_page,
-                               'bt_rev_id' => $row->rev_id,
-                               'bt_text_id' => $row->old_id,
-                               'bt_cluster' => $info['cluster'],
-                               'bt_blob_id' => $info['id'],
-                               'bt_cgz_hash' => $info['hash']
-                       );
-               }
-               $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
-               $rowsInserted += count( $insertBatch );
+                       $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
+                       $rowsInserted += count( $insertBatch );
 
-               ++$batchesDone;
-               if ( $batchesDone >= $reportingInterval ) {
-                       $batchesDone = 0;
-                       echo "$startId / $endId\n";
-                       wfWaitForSlaves( 5 );
+                       ++$batchesDone;
+                       if ( $batchesDone >= $this->reportingInterval ) {
+                               $batchesDone = 0;
+                               echo "$startId / $endId\n";
+                               wfWaitForSlaves( 5 );
+                       }
                }
+               echo "Found $rowsInserted revisions\n";
        }
-       echo "Found $rowsInserted revisions\n";
-}
 
-/**
- * Scan the text table for orphan text
- */
-function trackOrphans( $clusters ) {
-       # Wait until the blob_tracking table is available in the slave
-       $dbw = wfGetDB( DB_MASTER );
-       $dbr = wfGetDB( DB_SLAVE );
-       $pos = $dbw->getMasterPos();
-       $dbr->masterPosWait( $pos, 100000 );
-
-       $batchSize = 10;
-       $reportingInterval = 10;
-
-       $textClause = getTextClause( $clusters );
-       $startId = 0;
-       $endId = $dbr->selectField( 'text', 'MAX(old_id)', false, __METHOD__ );
-       $rowsInserted = 0;
-       $batchesDone = 0;
-
-       echo "Finding orphan text...\n";
-
-       # Scan the text table for orphan text
-       while ( true ) {
-               $res = $dbr->select( array( 'text', 'blob_tracking' ), 
-                       array( 'old_id', 'old_flags', 'old_text' ),
-                       array( 
-                               'old_id>' . $dbr->addQuotes( $startId ),
-                               $textClause,
-                               "old_flags LIKE '%external%'",
-                               'bt_text_id IS NULL'
-                       ),
-                       __METHOD__,
-                       array(
-                               'ORDER BY' => 'old_id',
-                               'LIMIT' => $batchSize 
-                       ),
-                       array( 'blob_tracking' => array( 'LEFT JOIN', 'bt_text_id=old_id' ) )
-               );
-               $ids = array();
-               foreach ( $res as $row ) {
-                       $ids[] = $row->old_id;
-               }
+       /**
+        * Scan the text table for orphan text
+        * Orphan text here does not imply DB corruption -- deleted text tracked by the
+        * archive table counts as orphan for our purposes.
+        */
+       function trackOrphanText() {
+               # Wait until the blob_tracking table is available in the slave
+               $dbw = wfGetDB( DB_MASTER );
+               $dbr = wfGetDB( DB_SLAVE );
+               $pos = $dbw->getMasterPos();
+               $dbr->masterPosWait( $pos, 100000 );
 
-               if ( !$res->numRows() ) {
-                       break;
-               }
+               $textClause = $this->getTextClause( $this->clusters );
+               $startId = 0;
+               $endId = $dbr->selectField( 'text', 'MAX(old_id)', false, __METHOD__ );
+               $rowsInserted = 0;
+               $batchesDone = 0;
+
+               echo "Finding orphan text...\n";
+
+               # Scan the text table for orphan text
+               while ( true ) {
+                       $res = $dbr->select( array( 'text', 'blob_tracking' ), 
+                               array( 'old_id', 'old_flags', 'old_text' ),
+                               array( 
+                                       'old_id>' . $dbr->addQuotes( $startId ),
+                                       $textClause,
+                                       "old_flags LIKE '%external%'",
+                                       'bt_text_id IS NULL'
+                               ),
+                               __METHOD__,
+                               array(
+                                       'ORDER BY' => 'old_id',
+                                       'LIMIT' => $this->batchSize 
+                               ),
+                               array( 'blob_tracking' => array( 'LEFT JOIN', 'bt_text_id=old_id' ) )
+                       );
+                       $ids = array();
+                       foreach ( $res as $row ) {
+                               $ids[] = $row->old_id;
+                       }
 
-               $insertBatch = array();
-               foreach ( $res as $row ) {
-                       $startId = $row->old_id;
-                       $info = interpretPointer( $row->old_text );
-                       if ( !$info ) {
-                               echo "Invalid DB:// URL in old_id {$row->old_id}\n";
-                               continue;
+                       if ( !$res->numRows() ) {
+                               break;
                        }
-                       if ( !in_array( $info['cluster'], $clusters ) ) {
-                               echo "Invalid cluster returned in SQL query\n";
-                               continue;
+
+                       $insertBatch = array();
+                       foreach ( $res as $row ) {
+                               $startId = $row->old_id;
+                               $info = $this->interpretPointer( $row->old_text );
+                               if ( !$info ) {
+                                       echo "Invalid DB:// URL in old_id {$row->old_id}\n";
+                                       continue;
+                               }
+                               if ( !in_array( $info['cluster'], $this->clusters ) ) {
+                                       echo "Invalid cluster returned in SQL query\n";
+                                       continue;
+                               }
+
+                               $insertBatch[] = array(
+                                       'bt_page' => 0,
+                                       'bt_rev_id' => 0,
+                                       'bt_text_id' => $row->old_id,
+                                       'bt_cluster' => $info['cluster'],
+                                       'bt_blob_id' => $info['id'],
+                                       'bt_cgz_hash' => $info['hash']
+                               );
+                               if ( $this->doBlobOrphans ) {
+                                       gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
+                               }
                        }
+                       $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
 
-                       $insertBatch[] = array(
-                               'bt_page' => 0,
-                               'bt_rev_id' => 0,
-                               'bt_text_id' => $row->old_id,
-                               'bt_cluster' => $info['cluster'],
-                               'bt_blob_id' => $info['id'],
-                               'bt_cgz_hash' => $info['hash']
-                       );
+                       $rowsInserted += count( $insertBatch );
+                       ++$batchesDone;
+                       if ( $batchesDone >= $this->reportingInterval ) {
+                               $batchesDone = 0;
+                               echo "$startId / $endId\n";
+                               wfWaitForSlaves( 5 );
+                       }
                }
-               $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
+               echo "Found $rowsInserted orphan text rows\n";
+       }
 
-               $rowsInserted += count( $insertBatch );
-               ++$batchesDone;
-               if ( $batchesDone >= $reportingInterval ) {
+       /**
+        * Scan the blobs table for rows not registered in blob_tracking (and thus not
+        * registered in the text table).
+        *
+        * Orphan blobs are indicative of DB corruption. They are inaccessible and
+        * should probably be deleted.
+        */
+       function findOrphanBlobs() {
+               if ( !extension_loaded( 'gmp' ) ) {
+                       echo "Can't find orphan blobs, need bitfield support provided by GMP.\n";
+                       return;
+               }
+
+               # Wait until the blob_tracking table is available in the slave
+               $dbw = wfGetDB( DB_MASTER );
+               $dbr = wfGetDB( DB_SLAVE );
+               $pos = $dbw->getMasterPos();
+               $dbr->masterPosWait( $pos, 100000 );
+
+               foreach ( $this->clusters as $cluster ) {
+                       echo "Searching for orphan blobs in $cluster...\n";
+                       $lb = wfGetLBFactory()->getExternalLB( $cluster );
+                       $extDB = $lb->getConnection( DB_SLAVE );
+                       $startId = 0;
                        $batchesDone = 0;
-                       echo "$startId / $endId\n";
-                       wfWaitForSlaves( 5 );
+                       $actualBlobs = gmp_init( 0 );
+                       $endId = $extDB->selectField( 'blobs', 'MAX(blob_id)', false, __METHOD__ );
+
+                       // Build a bitmap of actual blob rows
+                       while ( true ) {
+                               $res = $extDB->select( 'blobs', 
+                                       array( 'blob_id' ), 
+                                       array( 'blob_id > ' . $extDB->addQuotes( $startId ) ),
+                                       __METHOD__,
+                                       array( 'LIMIT' => $this->batchSize, 'ORDER BY' => 'blob_id' )
+                               );
+
+                               if ( !$res->numRows() ) {
+                                       break;
+                               }
+
+                               foreach ( $res as $row ) {
+                                       gmp_setbit( $actualBlobs, $row->blob_id );
+                               }
+                               $startId = $row->blob_id;
+
+                               ++$batchesDone;
+                               if ( $batchesDone >= $this->reportingInterval ) {
+                                       $batchesDone = 0;
+                                       echo "$startId / $endId\n";
+                               }
+                       }
+
+                       // Find actual blobs that weren't tracked by the previous passes
+                       // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B
+                       $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) );
+                       
+                       // Traverse the orphan list
+                       $insertBatch = array();
+                       $id = 0;
+                       while ( true ) {
+                               $id = gmp_scan1( $orphans, $id );
+                               if ( $id == -1 ) {
+                                       break;
+                               }
+                               $insertBatch[] = array(
+                                       'bo_cluster' => $cluster,
+                                       'bo_blob_id' => $id
+                               );
+                               ++$id;
+                       }
+
+                       // Insert the batch
+                       echo "Found " . count( $insertBatch ) . " orphan(s) in $cluster\n";
+                       $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ );
                }
        }
-       echo "Found $rowsInserted orphan text rows\n";
 }