move some member vars to parent class since they are needed there now, set lastTime...
[lhc/web/wiklou.git] / maintenance / dumpTextPass.php
1 <?php
2 /**
3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
4 *
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
22 *
23 * @file
24 * @ingroup Maintenance
25 */
26
27 $originalDir = getcwd();
28
29 require_once( dirname( __FILE__ ) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
31
32 /**
33 * @ingroup Maintenance
34 */
35 class TextPassDumper extends BackupDumper {
36 var $prefetch = null;
37 var $input = "php://stdin";
38 var $history = WikiExporter::FULL;
39 var $fetchCount = 0;
40 var $prefetchCount = 0;
41 var $prefetchCountLast = 0;
42 var $fetchCountLast = 0;
43
44 var $failures = 0;
45 var $maxFailures = 5;
46 var $failedTextRetrievals = 0;
47 var $maxConsecutiveFailedTextRetrievals = 200;
48 var $failureTimeout = 5; // Seconds to sleep after db failure
49
50 var $php = "php";
51 var $spawn = false;
52 var $spawnProc = false;
53 var $spawnWrite = false;
54 var $spawnRead = false;
55 var $spawnErr = false;
56
57 var $xmlwriterobj = false;
58
59 # when we spend more than maxTimeAllowed seconds on this run, we continue
60 # processing until we write out the next complete page, then save output file(s),
61 # rename it/them and open new one(s)
62 var $maxTimeAllowed = 0; // 0 = no limit
63 var $timeExceeded = false;
64 var $firstPageWritten = false;
65 var $lastPageWritten = false;
66 var $checkpointJustWritten = false;
67 var $checkpointFiles = array();
68
69 function initProgress( $history ) {
70 parent::initProgress();
71 $this->timeOfCheckpoint = $this->startTime;
72 }
73
74 function dump( $history, $text = WikiExporter::TEXT ) {
75 # This shouldn't happen if on console... ;)
76 header( 'Content-type: text/html; charset=UTF-8' );
77
78 # Notice messages will foul up your XML output even if they're
79 # relatively harmless.
80 if ( ini_get( 'display_errors' ) )
81 ini_set( 'display_errors', 'stderr' );
82
83 $this->initProgress( $this->history );
84
85 $this->db = $this->backupDb();
86
87 $this->egress = new ExportProgressFilter( $this->sink, $this );
88
89 # it would be nice to do it in the constructor, oh well. need egress set
90 $this->finalOptionCheck();
91
92 # we only want this so we know how to close a stream :-P
93 $this->xmlwriterobj = new XmlDumpWriter();
94
95 $input = fopen( $this->input, "rt" );
96 $result = $this->readDump( $input );
97
98 if ( WikiError::isError( $result ) ) {
99 wfDie( $result->getMessage() );
100 }
101
102 if ( $this->spawnProc ) {
103 $this->closeSpawn();
104 }
105
106 $this->report( true );
107 }
108
109 function processOption( $opt, $val, $param ) {
110 global $IP;
111 $url = $this->processFileOpt( $val, $param );
112
113 switch( $opt ) {
114 case 'prefetch':
115 require_once "$IP/maintenance/backupPrefetch.inc";
116 $this->prefetch = new BaseDump( $url );
117 break;
118 case 'stub':
119 $this->input = $url;
120 break;
121 case 'maxtime':
122 $this->maxTimeAllowed = intval($val)*60;
123 break;
124 case 'checkpointfile':
125 $this->checkpointFiles[] = $val;
126 break;
127 case 'current':
128 $this->history = WikiExporter::CURRENT;
129 break;
130 case 'full':
131 $this->history = WikiExporter::FULL;
132 break;
133 case 'spawn':
134 $this->spawn = true;
135 if ( $val ) {
136 $this->php = $val;
137 }
138 break;
139 }
140 }
141
142 function processFileOpt( $val, $param ) {
143 $fileURIs = explode(';',$param);
144 foreach ( $fileURIs as $URI ) {
145 switch( $val ) {
146 case "file":
147 $newURI = $URI;
148 break;
149 case "gzip":
150 $newURI = "compress.zlib://$URI";
151 break;
152 case "bzip2":
153 $newURI = "compress.bzip2://$URI";
154 break;
155 case "7zip":
156 $newURI = "mediawiki.compress.7z://$URI";
157 break;
158 default:
159 $newURI = $URI;
160 }
161 $newFileURIs[] = $newURI;
162 }
163 $val = implode( ';', $newFileURIs );
164 return $val;
165 }
166
167 /**
168 * Overridden to include prefetch ratio if enabled.
169 */
170 function showReport() {
171 if ( !$this->prefetch ) {
172 return parent::showReport();
173 }
174
175 if ( $this->reporting ) {
176 $now = wfTimestamp( TS_DB );
177 $nowts = wfTime();
178 $deltaAll = wfTime() - $this->startTime;
179 $deltaPart = wfTime() - $this->lastTime;
180 $this->pageCountPart = $this->pageCount - $this->pageCountLast;
181 $this->revCountPart = $this->revCount - $this->revCountLast;
182
183 if ( $deltaAll ) {
184 $portion = $this->revCount / $this->maxCount;
185 $eta = $this->startTime + $deltaAll / $portion;
186 $etats = wfTimestamp( TS_DB, intval( $eta ) );
187 if ( $this->fetchCount ) {
188 $fetchRate = 100.0 * $this->prefetchCount / $this->fetchCount;
189 }
190 else {
191 $fetchRate = '-';
192 }
193 $pageRate = $this->pageCount / $deltaAll;
194 $revRate = $this->revCount / $deltaAll;
195 } else {
196 $pageRate = '-';
197 $revRate = '-';
198 $etats = '-';
199 $fetchRate = '-';
200 }
201 if ( $deltaPart ) {
202 if ( $this->fetchCountLast ) {
203 $fetchRatePart = 100.0 * $this->prefetchCountLast / $this->fetchCountLast;
204 }
205 else {
206 $fetchRatePart = '-';
207 }
208 $pageRatePart = $this->pageCountPart / $deltaPart;
209 $revRatePart = $this->revCountPart / $deltaPart;
210
211 } else {
212 $fetchRatePart = '-';
213 $pageRatePart = '-';
214 $revRatePart = '-';
215 }
216 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
217 $now, wfWikiID(), $this->ID, $this->pageCount, $pageRate, $pageRatePart, $this->revCount, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount ) );
218 $this->lastTime = $nowts;
219 $this->revCountLast = $this->revCount;
220 $this->prefetchCountLast = $this->prefetchCount;
221 $this->fetchCountLast = $this->fetchCount;
222 }
223 }
224
225 function setTimeExceeded() {
226 $this->timeExceeded = True;
227 }
228
229 function checkIfTimeExceeded() {
230 if ( $this->maxTimeAllowed && ( $this->lastTime - $this->timeOfCheckpoint > $this->maxTimeAllowed ) ) {
231 return True;
232 }
233 return False;
234 }
235
236 function finalOptionCheck() {
237 if (($this->checkpointFiles && ! $this->maxTimeAllowed) ||
238 ($this->maxTimeAllowed && !$this->checkpointFiles)) {
239 wfDie("Options checkpointfile and maxtime must be specified together.\n");
240 }
241 foreach ($this->checkpointFiles as $checkpointFile) {
242 $count = substr_count ($checkpointFile,"%s");
243 if (substr_count ($checkpointFile,"%s") != 2) {
244 wfDie("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
245 }
246 }
247
248 $filenameList = $this->egress->getFilename();
249 if (! is_array($filenameList)) {
250 $filenameList = array( $filenameList );
251 }
252 if (count($filenameList) != count($this->checkpointFiles)) {
253 wfDie("One checkpointfile must be specified for each output option, if maxtime is used.\n");
254 }
255 }
256
257 function readDump( $input ) {
258 $this->buffer = "";
259 $this->openElement = false;
260 $this->atStart = true;
261 $this->state = "";
262 $this->lastName = "";
263 $this->thisPage = 0;
264 $this->thisRev = 0;
265
266 $parser = xml_parser_create( "UTF-8" );
267 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING, false );
268
269 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
270 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
271
272 $offset = 0; // for context extraction on error reporting
273 $bufferSize = 512 * 1024;
274 do {
275 if ($this->checkIfTimeExceeded()) {
276 $this->setTimeExceeded();
277 }
278 $chunk = fread( $input, $bufferSize );
279 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
280 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
281 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
282 }
283 $offset += strlen( $chunk );
284 } while ( $chunk !== false && !feof( $input ) );
285 if ($this->maxTimeAllowed) {
286 $filenameList = $this->egress->getFilename();
287 # we wrote some stuff after last checkpoint that needs renamed */
288 if (! is_array($filenameList)) {
289 $filenameList = array( $filenameList );
290 }
291 if (file_exists($filenameList[0])) {
292 $newFilenames = array();
293 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
294 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
295 for ($i =0; $i < count($filenameList); $i++) {
296 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
297 $fileinfo = pathinfo($filenameList[$i]);
298 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
299 }
300 $this->egress->closeAndRename( $newFilenames );
301 }
302 }
303 xml_parser_free( $parser );
304
305 return true;
306 }
307
308 function getText( $id ) {
309 $this->fetchCount++;
310 if ( isset( $this->prefetch ) ) {
311 $text = $this->prefetch->prefetch( $this->thisPage, $this->thisRev );
312 if ( $text !== null ) { // Entry missing from prefetch dump
313 $dbr = wfGetDB( DB_SLAVE );
314 $revID = intval( $this->thisRev );
315 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
316 // if length of rev text in file doesn't match length in db, we reload
317 // this avoids carrying forward broken data from previous xml dumps
318 if( strlen( $text ) == $revLength ) {
319 $this->prefetchCount++;
320 return $text;
321 }
322 }
323 }
324 return $this->doGetText( $id );
325 }
326
327 private function doGetText( $id ) {
328
329 $id = intval( $id );
330 $this->failures = 0;
331 $ex = new MWException( "Graceful storage failure" );
332 while (true) {
333 if ( $this->spawn ) {
334 if ($this->failures) {
335 // we don't know why it failed, could be the child process
336 // borked, could be db entry busted, could be db server out to lunch,
337 // so cover all bases
338 $this->closeSpawn();
339 $this->openSpawn();
340 }
341 $text = $this->getTextSpawned( $id );
342 } else {
343 $text = $this->getTextDbSafe( $id );
344 }
345 if ( $text === false ) {
346 $this->failures++;
347 if ( $this->failures > $this->maxFailures) {
348 $this->progress( "Failed to retrieve revision text for text id ".
349 "$id after $this->maxFailures tries, giving up" );
350 // were there so many bad retrievals in a row we want to bail?
351 // at some point we have to declare the dump irretrievably broken
352 $this->failedTextRetrievals++;
353 if ($this->failedTextRetrievals > $this->maxConsecutiveFailedTextRetrievals) {
354 throw $ex;
355 }
356 else {
357 // would be nice to return something better to the caller someday,
358 // log what we know about the failure and about the revision
359 return("");
360 }
361 } else {
362 $this->progress( "Error $this->failures " .
363 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
364 "Pausing $this->failureTimeout seconds before retry..." );
365 sleep( $this->failureTimeout );
366 }
367 } else {
368 $this->failedTextRetrievals= 0;
369 return( $text );
370 }
371 }
372
373 }
374
375 /**
376 * Fetch a text revision from the database, retrying in case of failure.
377 * This may survive some transitory errors by reconnecting, but
378 * may not survive a long-term server outage.
379 */
380 private function getTextDbSafe( $id ) {
381 while ( true ) {
382 try {
383 $text = $this->getTextDb( $id );
384 } catch ( DBQueryError $ex ) {
385 $text = false;
386 }
387 return $text;
388 }
389 }
390
391 /**
392 * May throw a database error if, say, the server dies during query.
393 */
394 private function getTextDb( $id ) {
395 global $wgContLang;
396 $row = $this->db->selectRow( 'text',
397 array( 'old_text', 'old_flags' ),
398 array( 'old_id' => $id ),
399 __METHOD__ );
400 $text = Revision::getRevisionText( $row );
401 if ( $text === false ) {
402 return false;
403 }
404 $stripped = str_replace( "\r", "", $text );
405 $normalized = $wgContLang->normalize( $stripped );
406 return $normalized;
407 }
408
409 private function getTextSpawned( $id ) {
410 wfSuppressWarnings();
411 if ( !$this->spawnProc ) {
412 // First time?
413 $this->openSpawn();
414 }
415 $text = $this->getTextSpawnedOnce( $id );
416 wfRestoreWarnings();
417 return $text;
418 }
419
420 function openSpawn() {
421 global $IP;
422
423 $cmd = implode( " ",
424 array_map( 'wfEscapeShellArg',
425 array(
426 $this->php,
427 "$IP/maintenance/fetchText.php",
428 '--wiki', wfWikiID() ) ) );
429 $spec = array(
430 0 => array( "pipe", "r" ),
431 1 => array( "pipe", "w" ),
432 2 => array( "file", "/dev/null", "a" ) );
433 $pipes = array();
434
435 $this->progress( "Spawning database subprocess: $cmd" );
436 $this->spawnProc = proc_open( $cmd, $spec, $pipes );
437 if ( !$this->spawnProc ) {
438 // shit
439 $this->progress( "Subprocess spawn failed." );
440 return false;
441 }
442 list(
443 $this->spawnWrite, // -> stdin
444 $this->spawnRead, // <- stdout
445 ) = $pipes;
446
447 return true;
448 }
449
450 private function closeSpawn() {
451 wfSuppressWarnings();
452 if ( $this->spawnRead )
453 fclose( $this->spawnRead );
454 $this->spawnRead = false;
455 if ( $this->spawnWrite )
456 fclose( $this->spawnWrite );
457 $this->spawnWrite = false;
458 if ( $this->spawnErr )
459 fclose( $this->spawnErr );
460 $this->spawnErr = false;
461 if ( $this->spawnProc )
462 pclose( $this->spawnProc );
463 $this->spawnProc = false;
464 wfRestoreWarnings();
465 }
466
467 private function getTextSpawnedOnce( $id ) {
468 global $wgContLang;
469
470 $ok = fwrite( $this->spawnWrite, "$id\n" );
471 // $this->progress( ">> $id" );
472 if ( !$ok ) return false;
473
474 $ok = fflush( $this->spawnWrite );
475 // $this->progress( ">> [flush]" );
476 if ( !$ok ) return false;
477
478 // check that the text id they are sending is the one we asked for
479 // this avoids out of sync revision text errors we have encountered in the past
480 $newId = fgets( $this->spawnRead );
481 if ( $newId === false ) {
482 return false;
483 }
484 if ( $id != intval( $newId ) ) {
485 return false;
486 }
487
488 $len = fgets( $this->spawnRead );
489 // $this->progress( "<< " . trim( $len ) );
490 if ( $len === false ) return false;
491
492 $nbytes = intval( $len );
493 // actual error, not zero-length text
494 if ($nbytes < 0 ) return false;
495
496 $text = "";
497
498 // Subprocess may not send everything at once, we have to loop.
499 while ( $nbytes > strlen( $text ) ) {
500 $buffer = fread( $this->spawnRead, $nbytes - strlen( $text ) );
501 if ( $buffer === false ) break;
502 $text .= $buffer;
503 }
504
505 $gotbytes = strlen( $text );
506 if ( $gotbytes != $nbytes ) {
507 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
508 return false;
509 }
510
511 // Do normalization in the dump thread...
512 $stripped = str_replace( "\r", "", $text );
513 $normalized = $wgContLang->normalize( $stripped );
514 return $normalized;
515 }
516
517 function startElement( $parser, $name, $attribs ) {
518 $this->checkpointJustWritten = false;
519
520 $this->clearOpenElement( null );
521 $this->lastName = $name;
522
523 if ( $name == 'revision' ) {
524 $this->state = $name;
525 $this->egress->writeOpenPage( null, $this->buffer );
526 $this->buffer = "";
527 } elseif ( $name == 'page' ) {
528 $this->state = $name;
529 if ( $this->atStart ) {
530 $this->egress->writeOpenStream( $this->buffer );
531 $this->buffer = "";
532 $this->atStart = false;
533 }
534 }
535
536 if ( $name == "text" && isset( $attribs['id'] ) ) {
537 $text = $this->getText( $attribs['id'] );
538 $this->openElement = array( $name, array( 'xml:space' => 'preserve' ) );
539 if ( strlen( $text ) > 0 ) {
540 $this->characterData( $parser, $text );
541 }
542 } else {
543 $this->openElement = array( $name, $attribs );
544 }
545 }
546
547 function endElement( $parser, $name ) {
548 $this->checkpointJustWritten = false;
549
550 if ( $this->openElement ) {
551 $this->clearOpenElement( "" );
552 } else {
553 $this->buffer .= "</$name>";
554 }
555
556 if ( $name == 'revision' ) {
557 $this->egress->writeRevision( null, $this->buffer );
558 $this->buffer = "";
559 $this->thisRev = "";
560 } elseif ( $name == 'page' ) {
561 if (! $this->firstPageWritten) {
562 $this->firstPageWritten = trim($this->thisPage);
563 }
564 $this->lastPageWritten = trim($this->thisPage);
565 if ($this->timeExceeded) {
566 $this->egress->writeClosePage( $this->buffer );
567 # nasty hack, we can't just write the chardata after the
568 # page tag, it will include leading blanks from the next line
569 $this->egress->sink->write("\n");
570
571 $this->buffer = $this->xmlwriterobj->closeStream();
572 $this->egress->writeCloseStream( $this->buffer );
573
574 $this->buffer = "";
575 $this->thisPage = "";
576 /* this could be more than one file if we had more than one output arg */
577 $checkpointFilenames = array();
578 $filenameList = $this->egress->getFilename();
579
580 if (! is_array($filenameList)) {
581 $filenameList = array( $filenameList );
582 }
583 $newFilenames = array();
584 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
585 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
586 for ($i =0; $i < count($filenameList); $i++) {
587 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
588 $fileinfo = pathinfo($filenameList[$i]);
589 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
590 }
591 $this->egress->closeRenameAndReopen( $newFilenames );
592 $this->buffer = $this->xmlwriterobj->openStream();
593 $this->timeExceeded = false;
594 $this->timeOfCheckpoint = $this->lastTime;
595 $this->firstPageWritten = false;
596 $this->checkpointJustWritten = true;
597 }
598 else {
599 $this->egress->writeClosePage( $this->buffer );
600 $this->buffer = "";
601 $this->thisPage = "";
602 }
603
604 } elseif ( $name == 'mediawiki' ) {
605 $this->egress->writeCloseStream( $this->buffer );
606 $this->buffer = "";
607 }
608 }
609
610 function characterData( $parser, $data ) {
611 $this->clearOpenElement( null );
612 if ( $this->lastName == "id" ) {
613 if ( $this->state == "revision" ) {
614 $this->thisRev .= $data;
615 } elseif ( $this->state == "page" ) {
616 $this->thisPage .= $data;
617 }
618 }
619 # have to skip the newline left over from closepagetag line of
620 # end of checkpoint files. nasty hack!!
621 if ($this->checkpointJustWritten) {
622 if ($data[0] == "\n") {
623 $data = substr($data,1);
624 }
625 $this->checkpointJustWritten = false;
626 }
627 $this->buffer .= htmlspecialchars( $data );
628 }
629
630 function clearOpenElement( $style ) {
631 if ( $this->openElement ) {
632 $this->buffer .= Xml::element( $this->openElement[0], $this->openElement[1], $style );
633 $this->openElement = false;
634 }
635 }
636 }
637
638
639 $dumper = new TextPassDumper( $argv );
640
641 if ( !isset( $options['help'] ) ) {
642 $dumper->dump( true );
643 } else {
644 $dumper->progress( <<<ENDS
645 This script postprocesses XML dumps from dumpBackup.php to add
646 page text which was stubbed out (using --stub).
647
648 XML input is accepted on stdin.
649 XML output is sent to stdout; progress reports are sent to stderr.
650
651 Usage: php dumpTextPass.php [<options>]
652 Options:
653 --stub=<type>:<file> To load a compressed stub dump instead of stdin
654 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
655 pressure on the database.
656 (Requires the XMLReader extension)
657 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
658 out complete page, closing xml file properly, and opening new one
659 with header). This option requires the checkpointfile option.
660 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
661 substituting first pageid written for the first %s (required) and the
662 last pageid written for the second %s if it exists.
663 --quiet Don't dump status reports to stderr.
664 --report=n Report position and speed after every n pages processed.
665 (Default: 100)
666 --server=h Force reading from MySQL server h
667 --current Base ETA on number of pages in database instead of all revisions
668 --spawn Spawn a subprocess for loading text records
669 --help Display this help message
670 ENDS
671 );
672 }
673
674