Insert blob_orphans rows in batches.
[lhc/web/wiklou.git] / maintenance / storage / trackBlobs.php
1 <?php
2
3 require( dirname( __FILE__ ) .'/../commandLine.inc' );
4
5
6 if ( count( $args ) < 1 ) {
7 echo "Usage: php trackBlobs.php <cluster> [... <cluster>]\n";
8 echo "Adds blobs from a given ES cluster to the blob_tracking table\n";
9 echo "Automatically deletes the tracking table and starts from the start again when restarted.\n";
10
11 exit( 1 );
12 }
13 $tracker = new TrackBlobs( $args );
14 $tracker->run();
15
16 class TrackBlobs {
17 var $clusters, $textClause;
18 var $doBlobOrphans;
19 var $trackedBlobs = array();
20
21 var $batchSize = 1000;
22 var $reportingInterval = 10;
23
24 function __construct( $clusters ) {
25 $this->clusters = $clusters;
26 if ( extension_loaded( 'gmp' ) ) {
27 $this->doBlobOrphans = true;
28 foreach ( $clusters as $cluster ) {
29 $this->trackedBlobs[$cluster] = gmp_init( 0 );
30 }
31 } else {
32 echo "Warning: the gmp extension is needed to find orphan blobs\n";
33 }
34 }
35
36 function run() {
37 $this->initTrackingTable();
38 $this->trackRevisions();
39 $this->trackOrphanText();
40 if ( $this->doBlobOrphans ) {
41 $this->findOrphanBlobs();
42 }
43 }
44
45 function initTrackingTable() {
46 $dbw = wfGetDB( DB_MASTER );
47 if ( $dbw->tableExists( 'blob_tracking' ) ) {
48 $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_tracking' ) );
49 $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_orphans' ) );
50 }
51 $dbw->sourceFile( dirname( __FILE__ ) . '/blob_tracking.sql' );
52 }
53
54 function getTextClause() {
55 if ( !$this->textClause ) {
56 $dbr = wfGetDB( DB_SLAVE );
57 $this->textClause = '';
58 foreach ( $this->clusters as $cluster ) {
59 if ( $this->textClause != '' ) {
60 $this->textClause .= ' OR ';
61 }
62 $this->textClause .= 'old_text LIKE ' . $dbr->addQuotes( $dbr->escapeLike( "DB://$cluster/" ) . '%' );
63 }
64 }
65 return $this->textClause;
66 }
67
68 function interpretPointer( $text ) {
69 if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) {
70 return false;
71 }
72 return array(
73 'cluster' => $m[1],
74 'id' => intval( $m[2] ),
75 'hash' => isset( $m[3] ) ? $m[2] : null
76 );
77 }
78
79 /**
80 * Scan the revision table for rows stored in the specified clusters
81 */
82 function trackRevisions() {
83 $dbw = wfGetDB( DB_MASTER );
84 $dbr = wfGetDB( DB_SLAVE );
85
86 $textClause = $this->getTextClause();
87 $startId = 0;
88 $endId = $dbr->selectField( 'revision', 'MAX(rev_id)', false, __METHOD__ );
89 $batchesDone = 0;
90 $rowsInserted = 0;
91
92 echo "Finding revisions...\n";
93
94 while ( true ) {
95 $res = $dbr->select( array( 'revision', 'text' ),
96 array( 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ),
97 array(
98 'rev_id > ' . $dbr->addQuotes( $startId ),
99 'rev_text_id=old_id',
100 $textClause,
101 "old_flags LIKE '%external%'",
102 ),
103 __METHOD__,
104 array(
105 'ORDER BY' => 'rev_id',
106 'LIMIT' => $this->batchSize
107 )
108 );
109 if ( !$res->numRows() ) {
110 break;
111 }
112
113 $insertBatch = array();
114 foreach ( $res as $row ) {
115 $startId = $row->rev_id;
116 $info = $this->interpretPointer( $row->old_text );
117 if ( !$info ) {
118 echo "Invalid DB:// URL in rev_id {$row->rev_id}\n";
119 continue;
120 }
121 if ( !in_array( $info['cluster'], $this->clusters ) ) {
122 echo "Invalid cluster returned in SQL query: {$info['cluster']}\n";
123 continue;
124 }
125 $insertBatch[] = array(
126 'bt_page' => $row->rev_page,
127 'bt_rev_id' => $row->rev_id,
128 'bt_text_id' => $row->old_id,
129 'bt_cluster' => $info['cluster'],
130 'bt_blob_id' => $info['id'],
131 'bt_cgz_hash' => $info['hash']
132 );
133 if ( $this->doBlobOrphans ) {
134 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
135 }
136 }
137 $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
138 $rowsInserted += count( $insertBatch );
139
140 ++$batchesDone;
141 if ( $batchesDone >= $this->reportingInterval ) {
142 $batchesDone = 0;
143 echo "$startId / $endId\n";
144 wfWaitForSlaves( 5 );
145 }
146 }
147 echo "Found $rowsInserted revisions\n";
148 }
149
150 /**
151 * Scan the text table for orphan text
152 * Orphan text here does not imply DB corruption -- deleted text tracked by the
153 * archive table counts as orphan for our purposes.
154 */
155 function trackOrphanText() {
156 # Wait until the blob_tracking table is available in the slave
157 $dbw = wfGetDB( DB_MASTER );
158 $dbr = wfGetDB( DB_SLAVE );
159 $pos = $dbw->getMasterPos();
160 $dbr->masterPosWait( $pos, 100000 );
161
162 $textClause = $this->getTextClause( $this->clusters );
163 $startId = 0;
164 $endId = $dbr->selectField( 'text', 'MAX(old_id)', false, __METHOD__ );
165 $rowsInserted = 0;
166 $batchesDone = 0;
167
168 echo "Finding orphan text...\n";
169
170 # Scan the text table for orphan text
171 while ( true ) {
172 $res = $dbr->select( array( 'text', 'blob_tracking' ),
173 array( 'old_id', 'old_flags', 'old_text' ),
174 array(
175 'old_id>' . $dbr->addQuotes( $startId ),
176 $textClause,
177 "old_flags LIKE '%external%'",
178 'bt_text_id IS NULL'
179 ),
180 __METHOD__,
181 array(
182 'ORDER BY' => 'old_id',
183 'LIMIT' => $this->batchSize
184 ),
185 array( 'blob_tracking' => array( 'LEFT JOIN', 'bt_text_id=old_id' ) )
186 );
187 $ids = array();
188 foreach ( $res as $row ) {
189 $ids[] = $row->old_id;
190 }
191
192 if ( !$res->numRows() ) {
193 break;
194 }
195
196 $insertBatch = array();
197 foreach ( $res as $row ) {
198 $startId = $row->old_id;
199 $info = $this->interpretPointer( $row->old_text );
200 if ( !$info ) {
201 echo "Invalid DB:// URL in old_id {$row->old_id}\n";
202 continue;
203 }
204 if ( !in_array( $info['cluster'], $this->clusters ) ) {
205 echo "Invalid cluster returned in SQL query\n";
206 continue;
207 }
208
209 $insertBatch[] = array(
210 'bt_page' => 0,
211 'bt_rev_id' => 0,
212 'bt_text_id' => $row->old_id,
213 'bt_cluster' => $info['cluster'],
214 'bt_blob_id' => $info['id'],
215 'bt_cgz_hash' => $info['hash']
216 );
217 if ( $this->doBlobOrphans ) {
218 gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] );
219 }
220 }
221 $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ );
222
223 $rowsInserted += count( $insertBatch );
224 ++$batchesDone;
225 if ( $batchesDone >= $this->reportingInterval ) {
226 $batchesDone = 0;
227 echo "$startId / $endId\n";
228 wfWaitForSlaves( 5 );
229 }
230 }
231 echo "Found $rowsInserted orphan text rows\n";
232 }
233
234 /**
235 * Scan the blobs table for rows not registered in blob_tracking (and thus not
236 * registered in the text table).
237 *
238 * Orphan blobs are indicative of DB corruption. They are inaccessible and
239 * should probably be deleted.
240 */
241 function findOrphanBlobs() {
242 if ( !extension_loaded( 'gmp' ) ) {
243 echo "Can't find orphan blobs, need bitfield support provided by GMP.\n";
244 return;
245 }
246
247 $dbw = wfGetDB( DB_MASTER );
248
249 foreach ( $this->clusters as $cluster ) {
250 echo "Searching for orphan blobs in $cluster...\n";
251 $lb = wfGetLBFactory()->getExternalLB( $cluster );
252 try {
253 $extDB = $lb->getConnection( DB_SLAVE );
254 } catch ( DBConnectionError $e ) {
255 if ( strpos( $e->error, 'Unknown database' ) !== false ) {
256 echo "No database on $cluster\n";
257 } else {
258 echo "Error on $cluster: " . $e->getMessage() . "\n";
259 }
260 continue;
261 }
262 $table = $extDB->getLBInfo( 'blobs table' );
263 if ( is_null( $table ) ) {
264 $table = 'blobs';
265 }
266 $startId = 0;
267 $batchesDone = 0;
268 $actualBlobs = gmp_init( 0 );
269 $endId = $extDB->selectField( $table, 'MAX(blob_id)', false, __METHOD__ );
270
271 // Build a bitmap of actual blob rows
272 while ( true ) {
273 $res = $extDB->select( $table,
274 array( 'blob_id' ),
275 array( 'blob_id > ' . $extDB->addQuotes( $startId ) ),
276 __METHOD__,
277 array( 'LIMIT' => $this->batchSize, 'ORDER BY' => 'blob_id' )
278 );
279
280 if ( !$res->numRows() ) {
281 break;
282 }
283
284 foreach ( $res as $row ) {
285 gmp_setbit( $actualBlobs, $row->blob_id );
286 }
287 $startId = $row->blob_id;
288
289 ++$batchesDone;
290 if ( $batchesDone >= $this->reportingInterval ) {
291 $batchesDone = 0;
292 echo "$startId / $endId\n";
293 }
294 }
295
296 // Find actual blobs that weren't tracked by the previous passes
297 // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B
298 $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) );
299
300 // Traverse the orphan list
301 $insertBatch = array();
302 $id = 0;
303 $numOrphans = 0;
304 while ( true ) {
305 $id = gmp_scan1( $orphans, $id );
306 if ( $id == -1 ) {
307 break;
308 }
309 $insertBatch[] = array(
310 'bo_cluster' => $cluster,
311 'bo_blob_id' => $id
312 );
313 if ( count( $insertBatch ) > $this->batchSize ) {
314 $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ );
315 $insertBatch = array();
316 }
317
318 ++$id;
319 ++$numOrphans;
320 }
321 if ( $insertBatch ) {
322 $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ );
323 }
324 echo "Found $numOrphans orphan(s) in $cluster\n";
325 }
326 }
327 }