Refactored and added orphan blob search
[lhc/web/wiklou.git] / maintenance / storage / trackBlobs.php
1 <?php
2
3 require( dirname( __FILE__ ) .'/../commandLine.inc' );
4
5
6 if ( count( $args ) < 1 ) {
7 echo "Usage: php trackBlobs.php <cluster> [... <cluster>]\n";
8 echo "Adds blobs from a given ES cluster to the blob_tracking table\n";
9 exit( 1 );
10 }
11 $tracker = new TrackBlobs( $args );
12 $tracker->trackBlobs();
13
14 class TrackBlobs {
15 var $clusters, $textClause;
16 var $doBlobOrphans;
17 var $trackedBlobs = array();
18
19 var $batchSize = 1000;
20 var $reportingInterval = 10;
21
22 function __construct( $clusters ) {
23 $this->clusters = $clusters;
24 if ( extension_loaded( 'gmp' ) ) {
25 $this->doBlobOrphans = true;
26 foreach ( $clusters as $cluster ) {
27 $this->trackedBlobs[$cluster] = gmp_init( 0 );
28 }
29 } else {
30 echo "Warning: the gmp extension is needed to find orphan blobs\n";
31 }
32 }
33
34 function trackBlobs() {
35 $this->initTrackingTable();
36 $this->trackRevisions();
37 $this->trackOrphanText();
38 if ( $this->doBlobOrphans ) {
39 $this->findOrphanBlobs();
40 }
41 }
42
43 function initTrackingTable() {
44 $dbw = wfGetDB( DB_MASTER );
45 if ( !$dbw->tableExists( 'blob_tracking' ) ) {
46 $dbw->sourceFile( dirname( __FILE__ ) . '/blob_tracking.sql' );
47 }
48 }
49
50 function getTextClause() {
51 if ( !$this->textClause ) {
52 $dbr = wfGetDB( DB_SLAVE );
53 $this->textClause = '';
54 foreach ( $this->clusters as $cluster ) {
55 if ( $this->textClause != '' ) {
56 $this->textClause .= ' OR ';
57 }
58 $this->textClause .= 'old_text LIKE ' . $dbr->addQuotes( $dbr->escapeLike( "DB://$cluster/" ) . '%' );
59 }
60 }
61 return $this->textClause;
62 }
63
64 function interpretPointer( $text ) {
65 if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) {
66 return false;
67 }
68 return array(
69 'cluster' => $m[1],
70 'id' => intval( $m[2] ),
71 'hash' => isset( $m[3] ) ? $m[2] : null
72 );
73 }
74
75 /**
76 * Scan the revision table for rows stored in the specified clusters
77 */
78 function trackRevisions() {
79 $dbw = wfGetDB( DB_MASTER );
80 $dbr = wfGetDB( DB_SLAVE );
81
82 $textClause = $this->getTextClause();
83 $startId = 0;
84 $endId = $dbr->selectField( 'revision', 'MAX(rev_id)', false, __METHOD__ );
85 $batchesDone = 0;
86 $rowsInserted = 0;
87
88 echo "Finding revisions...\n";
89
90 while ( true ) {
91 $res = $dbr->select( array( 'revision', 'text' ),
92 array( 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ),
93 array(
94 'rev_id > ' . $dbr->addQuotes( $startId ),
95 'rev_text_id=old_id',
96 $textClause,
97 "old_flags LIKE '%external%'",
98 ),
99 __METHOD__,
100 array(
101 'ORDER BY' => 'rev_id',
102 'LIMIT' => $this->batchSize
103 )
104 );
105 if ( !$res->numRows() ) {
106 break;
107 }
108
109 $insertBatch = array();
110 foreach ( $res as $row ) {
111 $startId = $row->rev_id;
112 $info = $this->interpretPointer( $row->old_text );
113 if ( !$info ) {
114 echo "Invalid DB:// URL in rev_id {$row->rev_id}\n";
115 continue;
116 }
117 if ( !in_array( $info['cluster'], $this->clusters ) ) {
118 echo "Invalid cluster returned in SQL query: {$info['cluster']}\n";
119 continue;
120 }
121 $insertBatch[] = array(
122 'bt_page' => $row->rev_page,
123 'bt_rev_id' => $row->rev_id,
124 'bt_text_id' => $row->old_id,
125 'bt_cluster' => $info['cluster'],
126 'bt_blob_id' => $info['id'],
127 'bt_cgz_hash' => $info['hash']
128 );
129 if ( $this->doBlobOrphans ) {
130 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
131 }
132 }
133 $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
134 $rowsInserted += count( $insertBatch );
135
136 ++$batchesDone;
137 if ( $batchesDone >= $this->reportingInterval ) {
138 $batchesDone = 0;
139 echo "$startId / $endId\n";
140 wfWaitForSlaves( 5 );
141 }
142 }
143 echo "Found $rowsInserted revisions\n";
144 }
145
146 /**
147 * Scan the text table for orphan text
148 * Orphan text here does not imply DB corruption -- deleted text tracked by the
149 * archive table counts as orphan for our purposes.
150 */
151 function trackOrphanText() {
152 # Wait until the blob_tracking table is available in the slave
153 $dbw = wfGetDB( DB_MASTER );
154 $dbr = wfGetDB( DB_SLAVE );
155 $pos = $dbw->getMasterPos();
156 $dbr->masterPosWait( $pos, 100000 );
157
158 $textClause = $this->getTextClause( $this->clusters );
159 $startId = 0;
160 $endId = $dbr->selectField( 'text', 'MAX(old_id)', false, __METHOD__ );
161 $rowsInserted = 0;
162 $batchesDone = 0;
163
164 echo "Finding orphan text...\n";
165
166 # Scan the text table for orphan text
167 while ( true ) {
168 $res = $dbr->select( array( 'text', 'blob_tracking' ),
169 array( 'old_id', 'old_flags', 'old_text' ),
170 array(
171 'old_id>' . $dbr->addQuotes( $startId ),
172 $textClause,
173 "old_flags LIKE '%external%'",
174 'bt_text_id IS NULL'
175 ),
176 __METHOD__,
177 array(
178 'ORDER BY' => 'old_id',
179 'LIMIT' => $this->batchSize
180 ),
181 array( 'blob_tracking' => array( 'LEFT JOIN', 'bt_text_id=old_id' ) )
182 );
183 $ids = array();
184 foreach ( $res as $row ) {
185 $ids[] = $row->old_id;
186 }
187
188 if ( !$res->numRows() ) {
189 break;
190 }
191
192 $insertBatch = array();
193 foreach ( $res as $row ) {
194 $startId = $row->old_id;
195 $info = $this->interpretPointer( $row->old_text );
196 if ( !$info ) {
197 echo "Invalid DB:// URL in old_id {$row->old_id}\n";
198 continue;
199 }
200 if ( !in_array( $info['cluster'], $this->clusters ) ) {
201 echo "Invalid cluster returned in SQL query\n";
202 continue;
203 }
204
205 $insertBatch[] = array(
206 'bt_page' => 0,
207 'bt_rev_id' => 0,
208 'bt_text_id' => $row->old_id,
209 'bt_cluster' => $info['cluster'],
210 'bt_blob_id' => $info['id'],
211 'bt_cgz_hash' => $info['hash']
212 );
213 if ( $this->doBlobOrphans ) {
214 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
215 }
216 }
217 $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
218
219 $rowsInserted += count( $insertBatch );
220 ++$batchesDone;
221 if ( $batchesDone >= $this->reportingInterval ) {
222 $batchesDone = 0;
223 echo "$startId / $endId\n";
224 wfWaitForSlaves( 5 );
225 }
226 }
227 echo "Found $rowsInserted orphan text rows\n";
228 }
229
230 /**
231 * Scan the blobs table for rows not registered in blob_tracking (and thus not
232 * registered in the text table).
233 *
234 * Orphan blobs are indicative of DB corruption. They are inaccessible and
235 * should probably be deleted.
236 */
237 function findOrphanBlobs() {
238 if ( !extension_loaded( 'gmp' ) ) {
239 echo "Can't find orphan blobs, need bitfield support provided by GMP.\n";
240 return;
241 }
242
243 # Wait until the blob_tracking table is available in the slave
244 $dbw = wfGetDB( DB_MASTER );
245 $dbr = wfGetDB( DB_SLAVE );
246 $pos = $dbw->getMasterPos();
247 $dbr->masterPosWait( $pos, 100000 );
248
249 foreach ( $this->clusters as $cluster ) {
250 echo "Searching for orphan blobs in $cluster...\n";
251 $lb = wfGetLBFactory()->getExternalLB( $cluster );
252 $extDB = $lb->getConnection( DB_SLAVE );
253 $startId = 0;
254 $batchesDone = 0;
255 $actualBlobs = gmp_init( 0 );
256 $endId = $extDB->selectField( 'blobs', 'MAX(blob_id)', false, __METHOD__ );
257
258 // Build a bitmap of actual blob rows
259 while ( true ) {
260 $res = $extDB->select( 'blobs',
261 array( 'blob_id' ),
262 array( 'blob_id > ' . $extDB->addQuotes( $startId ) ),
263 __METHOD__,
264 array( 'LIMIT' => $this->batchSize, 'ORDER BY' => 'blob_id' )
265 );
266
267 if ( !$res->numRows() ) {
268 break;
269 }
270
271 foreach ( $res as $row ) {
272 gmp_setbit( $actualBlobs, $row->blob_id );
273 }
274 $startId = $row->blob_id;
275
276 ++$batchesDone;
277 if ( $batchesDone >= $this->reportingInterval ) {
278 $batchesDone = 0;
279 echo "$startId / $endId\n";
280 }
281 }
282
283 // Find actual blobs that weren't tracked by the previous passes
284 // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B
285 $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) );
286
287 // Traverse the orphan list
288 $insertBatch = array();
289 $id = 0;
290 while ( true ) {
291 $id = gmp_scan1( $orphans, $id );
292 if ( $id == -1 ) {
293 break;
294 }
295 $insertBatch[] = array(
296 'bo_cluster' => $cluster,
297 'bo_blob_id' => $id
298 );
299 ++$id;
300 }
301
302 // Insert the batch
303 echo "Found " . count( $insertBatch ) . " orphan(s) in $cluster\n";
304 $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ );
305 }
306 }
307 }