* Added recompressTracked.php, the second part of the recompression project. Uses...
[lhc/web/wiklou.git] / maintenance / storage / recompressTracked.php
1 <?php
2
3 $optionsWithArgs = RecompressTracked::getOptionsWithArgs();
4 require( dirname( __FILE__ ) .'/../commandLine.inc' );
5
6 if ( count( $args ) < 1 ) {
7 echo "Usage: php recompressTracked.php <cluster> [... <cluster>...]\n";
8 echo "Moves blobs indexed by trackBlobs.php to a specified list of destination
9 clusters, and recompresses them in the process. Restartable.\n";
10 exit( 1 );
11 }
12
13 $job = RecompressTracked::newFromCommandLine( $args, $options );
14 $job->execute();
15
16 class RecompressTracked {
17 var $destClusters;
18 var $batchSize = 1000;
19 var $reportingInterval = 10;
20 var $numProcs = 8;
21 var $slavePipes, $slaveProcs, $prevSlaveId;
22 var $blobClass = 'DiffHistoryBlob';
23 var $copyOnly = false;
24 var $isChild = false;
25 var $slaveId = false;
26 var $store;
27
28 static $optionsWithArgs = array( 'procs', 'class' );
29 static $cmdLineOptionMap = array(
30 'procs' => 'numProcs',
31 'class' => 'blobClass',
32 'copy-only' => 'copyOnly',
33 'child' => 'isChild',
34 'slave-id' => 'slaveId',
35 );
36
37 static function getOptionsWithArgs() {
38 return self::$optionsWithArgs;
39 }
40
41 static function newFromCommandLine( $args, $options ) {
42 $jobOptions = array( 'destClusters' => $args );
43 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
44 if ( isset( $options[$cmdOption] ) ) {
45 $jobOptions[$classOption] = $options[$cmdOption];
46 }
47 }
48 return new self( $jobOptions );
49 }
50
51 function __construct( $options ) {
52 foreach ( $options as $name => $value ) {
53 $this->$name = $value;
54 }
55 $this->store = new ExternalStoreDB;
56 }
57
58 function debug( $msg ) {
59 if ( $this->slaveId !== false ) {
60 $msg = "{$this->slaveId}: $msg";
61 }
62 $msg .= "\n";
63 wfDebug( $msg );
64 }
65
66 /**
67 * Wait until the selected slave has caught up to the master.
68 * This allows us to use the slave for things that were committed in a
69 * previous part of this batch process.
70 */
71 function syncDBs() {
72 $dbw = wfGetDB( DB_MASTER );
73 $dbr = wfGetDB( DB_SLAVE );
74 $pos = $dbw->getMasterPos();
75 $dbr->masterPosWait( $pos, 100000 );
76 }
77
78 /**
79 * Execute parent or child depending on the isChild option
80 */
81 function execute() {
82 if ( $this->isChild ) {
83 $this->executeChild();
84 } else {
85 $this->executeParent();
86 }
87 }
88
89 /**
90 * Execute the parent process
91 */
92 function executeParent() {
93 if ( !$this->checkTrackingTable() ) {
94 return;
95 }
96
97 $this->syncDBs();
98 $this->startSlaveProcs();
99 $this->doAllPages();
100 $this->doAllOrphans();
101 $this->killSlaveProcs();
102 }
103
104 /**
105 * Make sure the tracking table exists and isn't empty
106 */
107 function checkTrackingTable() {
108 $dbr = wfGetDB( DB_SLAVE );
109 if ( !$dbr->tableExists( 'blob_tracking' ) ) {
110 echo "Error: blob_tracking table does not exist\n";
111 return false;
112 }
113 $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ );
114 if ( !$row ) {
115 echo "Warning: blob_tracking table contains no rows, skipping this wiki.\n";
116 return false;
117 }
118 return true;
119 }
120
121 /**
122 * Start the worker processes.
123 * These processes will listen on stdin for commands.
124 * This necessary because text recompression is slow: loading, compressing and
125 * writing are all slow.
126 */
127 function startSlaveProcs() {
128 $cmd = 'php ' . wfEscapeShellArg( __FILE__ );
129 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
130 if ( in_array( $cmdOption, self::$optionsWithArgs ) ) {
131 $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption );
132 } elseif ( $this->$classOption ) {
133 $cmd .= " --$cmdOption";
134 }
135 }
136 $cmd .= ' --child' .
137 ' --wiki ' . wfEscapeShellArg( wfWikiID() ) .
138 ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters );
139
140 $this->slavePipes = $this->slaveProcs = array();
141 for ( $i = 0; $i < $this->numProcs; $i++ ) {
142 $pipes = false;
143 $spec = array(
144 array( 'pipe', 'r' ),
145 array( 'file', '/dev/stdout', 'w' ),
146 array( 'file', '/dev/stderr', 'w' )
147 );
148 wfSuppressWarnings();
149 $proc = proc_open( $cmd, $spec, $pipes );
150 wfRestoreWarnings();
151 if ( !$proc ) {
152 echo "Error opening slave process\n";
153 exit( 1 );
154 }
155 $this->slaveProcs[$i] = $proc;
156 $this->slavePipes[$i] = $pipes[0];
157 }
158 $this->prevSlaveId = -1;
159 }
160
161 /**
162 * Gracefully terminate the child processes
163 */
164 function killSlaveProcs() {
165 for ( $i = 0; $i < $this->numProcs; $i++ ) {
166 $this->dispatchToSlave( $i, 'quit' );
167 }
168 for ( $i = 0; $i < $this->numProcs; $i++ ) {
169 proc_close( $this->slaveProcs[$i] );
170 }
171 }
172
173 /**
174 * Dispatch a command to the next available slave.
175 * This may block until a slave finishes its work and becomes available.
176 */
177 function dispatch( /*...*/ ) {
178 $args = func_get_args();
179 $pipes = $this->slavePipes;
180 $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 );
181 if ( !$numPipes ) {
182 echo "Error waiting to write to slaves. Aborting\n";
183 exit( 1 );
184 }
185 for ( $i = 0; $i < $this->numProcs; $i++ ) {
186 $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs;
187 if ( isset( $pipes[$slaveId] ) ) {
188 $this->prevSlaveId = $slaveId;
189 $this->dispatchToSlave( $slaveId, $args );
190 return;
191 }
192 }
193 echo "Unreachable\n";
194 exit( 1 );
195 }
196
197 /**
198 * Dispatch a command to a specified slave
199 */
200 function dispatchToSlave( $slaveId, $args ) {
201 $args = (array)$args;
202 $cmd = implode( ' ', $args );
203 fwrite( $this->slavePipes[$slaveId], "$cmd\n" );
204 }
205
206 /**
207 * Move all tracked pages to the new clusters
208 */
209 function doAllPages() {
210 $dbr = wfGetDB( DB_SLAVE );
211 $startId = 0;
212 $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_page)',
213 # A condition is required so that this query uses the index
214 array( 'bt_moved' => 0 ),
215 __METHOD__ );
216 echo "Moving pages...\n";
217 while ( true ) {
218 $res = $dbr->select( 'blob_tracking',
219 array( 'bt_page' ),
220 array(
221 'bt_moved' => 0,
222 'bt_page > ' . $dbr->addQuotes( $startId )
223 ),
224 __METHOD__,
225 array(
226 'DISTINCT',
227 'ORDER BY' => 'bt_page',
228 'LIMIT' => $this->batchSize,
229 )
230 );
231 if ( !$res->numRows() ) {
232 break;
233 }
234 foreach ( $res as $row ) {
235 $this->dispatch( 'doPage', $row->bt_page );
236 }
237 $startId = $row->bt_page;
238 $this->report( $startId, $endId );
239 }
240 echo "Done moving pages.\n";
241 }
242
243 /**
244 * Display a progress report
245 */
246 function report( $start, $end ) {
247 $this->numBatches++;
248 if ( $this->numBatches >= $this->reportingInterval ) {
249 $this->numBatches = 0;
250 echo "$start / $end\n";
251 wfWaitForSlaves( 5 );
252 }
253 }
254
255 /**
256 * Move all orphan text to the new clusters
257 */
258 function doAllOrphans() {
259 $dbr = wfGetDB( DB_SLAVE );
260 $startId = 0;
261 $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_text_id)',
262 array( 'bt_moved' => 0, 'bt_page' => 0 ),
263 __METHOD__ );
264 if ( !$endId ) {
265 return;
266 }
267 echo "Moving orphans...\n";
268
269 while ( true ) {
270 $res = $dbr->select( 'blob_tracking',
271 array( 'bt_text_id' ),
272 array(
273 'bt_moved' => 0,
274 'bt_page' => 0,
275 'bt_text_id > ' . $dbr->addQuotes( $startId )
276 ),
277 __METHOD__,
278 array(
279 'DISTINCT',
280 'ORDER BY' => 'bt_text_id',
281 'LIMIT' => $this->batchSize
282 )
283 );
284 if ( !$res->numRows() ) {
285 break;
286 }
287 $args = array( 'doOrphanList' );
288 foreach ( $res as $row ) {
289 $args[] = $row->bt_text_id;
290 }
291 call_user_func_array( array( $this, 'dispatch' ), $args );
292 $startId = $row->bt_text_id;
293 $this->report( $startId, $endId );
294 }
295 echo "Done moving orphans.\n";
296 }
297
298 /**
299 * Main entry point for worker processes
300 */
301 function executeChild() {
302 $this->syncDBs();
303
304 while ( !feof( STDIN ) ) {
305 $line = rtrim( fgets( STDIN ) );
306 if ( $line == '' ) {
307 continue;
308 }
309 $args = explode( ' ', $line );
310 $cmd = array_shift( $args );
311 switch ( $cmd ) {
312 case 'doPage':
313 $this->doPage( intval( $args[0] ) );
314 break;
315 case 'doOrphanList':
316 $this->doOrphanList( array_map( 'intval', $args ) );
317 break;
318 case 'quit':
319 return;
320 }
321 }
322 }
323
324 /**
325 * Move tracked text in a given page
326 */
327 function doPage( $pageId ) {
328 $dbr = wfGetDB( DB_SLAVE );
329
330 // Finish any incomplete transactions
331 if ( !$this->copyOnly ) {
332 $this->finishIncompleteMoves();
333 }
334
335 $startId = 0;
336 $trx = new CgzCopyTransaction( $this );
337
338 while ( true ) {
339 $res = $dbr->select(
340 array( 'blob_tracking', 'text' ),
341 '*',
342 array(
343 'bt_page' => $pageId,
344 'bt_text_id > ' . $dbr->addQuotes( $startId ),
345 'bt_moved' => 0,
346 'bt_new_url' => '',
347 'bt_text_id=old_id',
348 ),
349 __METHOD__,
350 array(
351 'ORDER BY' => 'bt_text_id',
352 'LIMIT' => $this->batchSize
353 )
354 );
355 if ( !$res->numRows() ) {
356 break;
357 }
358
359 $lastTextId = 0;
360 foreach ( $res as $row ) {
361 if ( $lastTextId == $row->bt_text_id ) {
362 // Duplicate (null edit)
363 continue;
364 }
365 $lastTextId = $row->bt_text_id;
366 // Load the text
367 $text = Revision::getRevisionText( $row );
368 if ( $text === false ) {
369 echo "Error loading {$row->bt_rev_id}/{$row->bt_text_id}\n";
370 continue;
371 }
372
373 // Queue it
374 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
375 $trx->commit();
376 $trx = new CgzCopyTransaction( $this );
377 }
378 }
379 $startId = $row->bt_text_id;
380 }
381 $trx->commit();
382 }
383
384 /**
385 * Atomic move operation.
386 *
387 * Write the new URL to the text table and set the bt_moved flag.
388 *
389 * This is done in a single transaction to provide restartable behaviour
390 * without data loss.
391 *
392 * The transaction is kept short to reduce locking.
393 */
394 function moveTextRow( $textId, $url ) {
395 $dbw = wfGetDB( DB_MASTER );
396 $dbw->begin();
397 $dbw->update( 'text',
398 array( // set
399 'old_text' => $url,
400 'old_flags' => 'external,utf8',
401 ),
402 array( // where
403 'old_id' => $textId
404 ),
405 __METHOD__
406 );
407 $dbw->update( 'blob_tracking',
408 array( 'bt_moved' => 1 ),
409 array( 'bt_text_id' => $textId ),
410 __METHOD__
411 );
412 $dbw->commit();
413 }
414
415 /**
416 * Moves are done in two phases: bt_new_url and then bt_moved.
417 * - bt_new_url indicates that the text has been copied to the new cluster.
418 * - bt_moved indicates that the text table has been updated.
419 *
420 * This function completes any moves that only have done bt_new_url. This
421 * can happen when the script is interrupted, or when --copy-only is used.
422 */
423 function finishIncompleteMoves() {
424 $dbr = wfGetDB( DB_SLAVE );
425
426 $startId = 0;
427 while ( true ) {
428 $res = $dbr->select( 'blob_tracking',
429 '*',
430 array(
431 'bt_text_id > ' . $dbr->addQuotes( $startId ),
432 'bt_moved' => 0,
433 "bt_new_url <> ''",
434 ),
435 __METHOD__,
436 array(
437 'ORDER BY' => 'bt_text_id',
438 'LIMIT' => $this->batchSize,
439 )
440 );
441 if ( !$res->numRows() ) {
442 break;
443 }
444 foreach ( $res as $row ) {
445 $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
446 }
447 $startId = $row->bt_text_id;
448 }
449 }
450
451 /**
452 * Returns the name of the next target cluster
453 */
454 function getTargetCluster() {
455 $cluster = next( $this->destClusters );
456 if ( $cluster === false ) {
457 $cluster = reset( $this->destClusters );
458 }
459 return $cluster;
460 }
461
462 /**
463 * Gets a DB master connection for the given external cluster name
464 */
465 function getExtDB( $cluster ) {
466 $lb = wfGetLBFactory()->getExternalLB( $cluster );
467 return $lb->getConnection( DB_MASTER );
468 }
469
470 /**
471 * Move an orphan text_id to the new cluster
472 */
473 function doOrphanList( $textIds ) {
474 $trx = new CgzCopyTransaction( $this );
475 foreach ( $textIds as $textId ) {
476 $row = wfGetDB( DB_SLAVE )->selectRow( 'text', array( 'old_text', 'old_flags' ),
477 array( 'old_id' => $textId ), __METHOD__ );
478 $text = Revision::getRevisionText( $row );
479 if ( $text === false ) {
480 echo "Error: cannot load revision text for $textId\n";
481 continue;
482 }
483
484 if ( !$trx->addItem( $text, $textId ) ) {
485 $trx->commit();
486 $trx = new CgzCopyTransaction( $this );
487 }
488 }
489 }
490 }
491
492 /**
493 * Class to represent a recompression operation for a single CGZ blob
494 */
495 class CgzCopyTransaction {
496 var $blobClass;
497 var $cgz;
498 var $referrers;
499
500 /**
501 * Create a transaction from a RecompressTracked object
502 */
503 function __construct( $parent ) {
504 $this->blobClass = $parent->blobClass;
505 $this->cgz = false;
506 $this->texts = array();
507 }
508
509 /**
510 * Add text.
511 * Returns false if it's ready to commit.
512 */
513 function addItem( $text, $textId ) {
514 if ( !$this->cgz ) {
515 $class = $this->blobClass;
516 $this->cgz = new $class;
517 }
518 $hash = $this->cgz->addItem( $text );
519 $this->referrers[$textId] = $hash;
520 $this->texts[$textId] = $text;
521 return $this->cgz->isHappy();
522 }
523
524 /**
525 * Recompress text after some aberrant modification
526 */
527 function recompress() {
528 $class = $this->blobClass;
529 $this->cgz = new $class;
530 $this->referrers = array();
531 foreach ( $this->texts as $textId => $text ) {
532 $hash = $this->cgz->addItem( $text );
533 $this->referrers[$textId] = $hash;
534 }
535 }
536
537 /**
538 * Commit the blob.
539 * Does nothing if no text items have been added.
540 * May skip the move if --copy-only is set.
541 */
542 function commit() {
543 $originalCount = count( $this->texts );
544 if ( !$originalCount ) {
545 return;
546 }
547
548 // Check to see if the target text_ids have been moved already.
549 //
550 // We originally read from the slave, so this can happen when a single
551 // text_id is shared between multiple pages. It's rare, but possible
552 // if a delete/move/undelete cycle splits up a null edit.
553 //
554 // We do a locking read to prevent closer-run race conditions.
555 $dbw = wfGetDB( DB_MASTER );
556 $dbw->begin();
557 $dirty = false;
558 foreach ( $this->referrers as $textId => $hash ) {
559 $moved = $dbw->selectField( 'blob_tracking', 'bt_moved',
560 array( 'bt_text_id' => $textId ),
561 __METHOD__,
562 array( 'FOR UPDATE' )
563 );
564 if ( !$moved ) {
565 # This row has already been moved, remove it
566 unset( $this->texts[$textId] );
567 $dirty = true;
568 }
569 }
570
571 // Recompress the blob if necessary
572 if ( $dirty ) {
573 if ( !count( $this->texts ) ) {
574 // All have been moved already
575 if ( $originalCount > 1 ) {
576 // This is suspcious, make noise
577 echo "Warning: concurrent operation detected, are there two conflicting\n" .
578 "processes running, doing the same job?\n";
579 }
580 return;
581 }
582 $this->recompress();
583 }
584
585 // Insert the data into the destination cluster
586 $targetCluster = $this->parent->getTargetCluster();
587 $store = $this->parent->store;
588 $targetDB = $store->getMaster( $targetCluster );
589 $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
590 $targetDB->begin();
591 $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
592
593 // Write the new URLs to the blob_tracking table
594 foreach ( $this->referrers as $textId => $hash ) {
595 $url = $baseUrl . '/' . $hash;
596 $dbw->update( 'blob_tracking',
597 array( 'bt_new_url' => $url ),
598 array(
599 'bt_text_id' => $textId,
600 'bt_moved' => 0, # Check for concurrent conflicting update
601 ),
602 __METHOD__
603 );
604 }
605
606 $targetDB->commit();
607 // Critical section here: interruption at this point causes blob duplication
608 // Reversing the order of the commits would cause data loss instead
609 $dbw->commit();
610
611 // Write the new URLs to the text table and set the moved flag
612 if ( !$this->parent->copyOnly ) {
613 foreach ( $this->referrers as $textId => $hash ) {
614 $url = $baseUrl . '/' . $hash;
615 $this->parent->moveTextRow( $textId, $url );
616 }
617 }
618 }
619
620 function signalHandler() {
621 $this->signalled = true;
622 }
623 }
624