fix a couple bad lines in previous commit from bad merge attempt
[lhc/web/wiklou.git] / maintenance / dumpTextPass.php
1 <?php
2 /**
3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
4 *
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
22 *
23 * @file
24 * @ingroup Maintenance
25 */
26
27 $originalDir = getcwd();
28
29 require_once( dirname( __FILE__ ) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
31
32 /**
33 * @ingroup Maintenance
34 */
35 class TextPassDumper extends BackupDumper {
36 var $prefetch = null;
37 var $input = "php://stdin";
38 var $history = WikiExporter::FULL;
39 var $fetchCount = 0;
40 var $prefetchCount = 0;
41 var $lastTime = 0;
42 var $pageCountLast = 0;
43 var $revCountLast = 0;
44 var $prefetchCountLast = 0;
45 var $fetchCountLast = 0;
46
47 var $failures = 0;
48 var $maxFailures = 5;
49 var $failedTextRetrievals = 0;
50 var $maxConsecutiveFailedTextRetrievals = 200;
51 var $failureTimeout = 5; // Seconds to sleep after db failure
52
53 var $php = "php";
54 var $spawn = false;
55 var $spawnProc = false;
56 var $spawnWrite = false;
57 var $spawnRead = false;
58 var $spawnErr = false;
59
60 var $ID = 0;
61
62 var $xmlwriterobj = false;
63
64 # when we spend more than maxTimeAllowed seconds on this run, we continue
65 # processing until we write out the next complete page, then save output file(s),
66 # rename it/them and open new one(s)
67 var $maxTimeAllowed = 0; // 0 = no limit
68 var $timeExceeded = false;
69 var $firstPageWritten = false;
70 var $lastPageWritten = false;
71 var $checkpointJustWritten = false;
72 var $checkpointFiles = array();
73
74 function initProgress( $history ) {
75 parent::initProgress();
76 $this->ID = getmypid();
77 $this->lastTime = $this->startTime;
78 $this->timeOfCheckpoint = $this->startTime;
79 }
80
81 function dump( $history, $text = WikiExporter::TEXT ) {
82 # This shouldn't happen if on console... ;)
83 header( 'Content-type: text/html; charset=UTF-8' );
84
85 # Notice messages will foul up your XML output even if they're
86 # relatively harmless.
87 if ( ini_get( 'display_errors' ) )
88 ini_set( 'display_errors', 'stderr' );
89
90 $this->initProgress( $this->history );
91
92 $this->db = $this->backupDb();
93
94 $this->egress = new ExportProgressFilter( $this->sink, $this );
95
96 # it would be nice to do it in the constructor, oh well. need egress set
97 $this->finalOptionCheck();
98
99 # we only want this so we know how to close a stream :-P
100 $this->xmlwriterobj = new XmlDumpWriter();
101
102 $input = fopen( $this->input, "rt" );
103 $result = $this->readDump( $input );
104
105 if ( WikiError::isError( $result ) ) {
106 wfDie( $result->getMessage() );
107 }
108
109 if ( $this->spawnProc ) {
110 $this->closeSpawn();
111 }
112
113 $this->report( true );
114 }
115
116 function processOption( $opt, $val, $param ) {
117 global $IP;
118 $url = $this->processFileOpt( $val, $param );
119
120 switch( $opt ) {
121 case 'prefetch':
122 require_once "$IP/maintenance/backupPrefetch.inc";
123 $this->prefetch = new BaseDump( $url );
124 break;
125 case 'stub':
126 $this->input = $url;
127 break;
128 case 'maxtime':
129 $this->maxTimeAllowed = intval($val)*60;
130 break;
131 case 'checkpointfile':
132 $this->checkpointFiles[] = $val;
133 break;
134 case 'current':
135 $this->history = WikiExporter::CURRENT;
136 break;
137 case 'full':
138 $this->history = WikiExporter::FULL;
139 break;
140 case 'spawn':
141 $this->spawn = true;
142 if ( $val ) {
143 $this->php = $val;
144 }
145 break;
146 }
147 }
148
149 function processFileOpt( $val, $param ) {
150 $fileURIs = explode(';',$param);
151 foreach ( $fileURIs as $URI ) {
152 switch( $val ) {
153 case "file":
154 $newURI = $URI;
155 break;
156 case "gzip":
157 $newURI = "compress.zlib://$URI";
158 break;
159 case "bzip2":
160 $newURI = "compress.bzip2://$URI";
161 break;
162 case "7zip":
163 $newURI = "mediawiki.compress.7z://$URI";
164 break;
165 default:
166 $newURI = $URI;
167 }
168 $newFileURIs[] = $newURI;
169 }
170 $val = implode( ';', $newFileURIs );
171 return $val;
172 }
173
174 /**
175 * Overridden to include prefetch ratio if enabled.
176 */
177 function showReport() {
178 if ( !$this->prefetch ) {
179 return parent::showReport();
180 }
181
182 if ( $this->reporting ) {
183 $now = wfTimestamp( TS_DB );
184 $deltaAll = wfTime() - $this->startTime;
185 $deltaPart = wfTime() - $this->lastTime;
186 $this->pageCountPart = $this->pageCount - $this->pageCountLast;
187 $this->revCountPart = $this->revCount - $this->revCountLast;
188
189 if ( $deltaAll ) {
190 $portion = $this->revCount / $this->maxCount;
191 $eta = $this->startTime + $deltaAll / $portion;
192 $etats = wfTimestamp( TS_DB, intval( $eta ) );
193 if ( $this->fetchCount ) {
194 $fetchRate = 100.0 * $this->prefetchCount / $this->fetchCount;
195 }
196 else {
197 $fetchRate = '-';
198 }
199 $pageRate = $this->pageCount / $deltaAll;
200 $revRate = $this->revCount / $deltaAll;
201 } else {
202 $pageRate = '-';
203 $revRate = '-';
204 $etats = '-';
205 $fetchRate = '-';
206 }
207 if ( $deltaPart ) {
208 if ( $this->fetchCountLast ) {
209 $fetchRatePart = 100.0 * $this->prefetchCountLast / $this->fetchCountLast;
210 }
211 else {
212 $fetchRatePart = '-';
213 }
214 $pageRatePart = $this->pageCountPart / $deltaPart;
215 $revRatePart = $this->revCountPart / $deltaPart;
216
217 } else {
218 $fetchRatePart = '-';
219 $pageRatePart = '-';
220 $revRatePart = '-';
221 }
222 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",-
223 $now, wfWikiID(), $this->ID, $this->pageCount, $pageRate, $pageRatePart, $this->revCount, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount ) );
224 $this->lastTime = $now;
225 $this->revCountLast = $this->revCount;
226 $this->prefetchCountLast = $this->prefetchCount;
227 $this->fetchCountLast = $this->fetchCount;
228 }
229 }
230
231 function setTimeExceeded() {
232 $this->timeExceeded = True;
233 }
234
235 function checkIfTimeExceeded() {
236 if ( $this->maxTimeAllowed && ( $this->lastTime - $this->timeOfCheckpoint > $this->maxTimeAllowed ) ) {
237 return True;
238 }
239 return False;
240 }
241
242 function finalOptionCheck() {
243 if (($this->checkpointFiles && ! $this->maxTimeAllowed) ||
244 ($this->maxTimeAllowed && !$this->checkpointFiles)) {
245 wfDie("Options checkpointfile and maxtime must be specified together.\n");
246 }
247 foreach ($this->checkpointFiles as $checkpointFile) {
248 $count = substr_count ($checkpointFile,"%s");
249 if (substr_count ($checkpointFile,"%s") != 2) {
250 wfDie("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
251 }
252 }
253
254 $filenameList = $this->egress->getFilename();
255 if (! is_array($filenameList)) {
256 $filenameList = array( $filenameList );
257 }
258 if (count($filenameList) != count($this->checkpointFiles)) {
259 wfDie("One checkpointfile must be specified for each output option, if maxtime is used.\n");
260 }
261 }
262
263 function readDump( $input ) {
264 $this->buffer = "";
265 $this->openElement = false;
266 $this->atStart = true;
267 $this->state = "";
268 $this->lastName = "";
269 $this->thisPage = 0;
270 $this->thisRev = 0;
271
272 $parser = xml_parser_create( "UTF-8" );
273 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING, false );
274
275 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
276 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
277
278 $offset = 0; // for context extraction on error reporting
279 $bufferSize = 512 * 1024;
280 do {
281 if ($this->checkIfTimeExceeded()) {
282 $this->setTimeExceeded();
283 }
284 $chunk = fread( $input, $bufferSize );
285 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
286 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
287 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
288 }
289 $offset += strlen( $chunk );
290 } while ( $chunk !== false && !feof( $input ) );
291 if ($this->maxTimeAllowed) {
292 $filenameList = $this->egress->getFilename();
293 # we wrote some stuff after last checkpoint that needs renamed */
294 if (! is_array($filenameList)) {
295 $filenameList = array( $filenameList );
296 }
297 if (file_exists($filenameList[0])) {
298 $newFilenames = array();
299 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
300 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
301 for ($i =0; $i < count($filenameList); $i++) {
302 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
303 $fileinfo = pathinfo($filenameList[$i]);
304 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
305 }
306 $this->egress->rename( $newFilenames );
307 }
308 }
309 xml_parser_free( $parser );
310
311 return true;
312 }
313
314 function getText( $id ) {
315 $this->fetchCount++;
316 if ( isset( $this->prefetch ) ) {
317 $text = $this->prefetch->prefetch( $this->thisPage, $this->thisRev );
318 if ( $text !== null ) { // Entry missing from prefetch dump
319 $dbr = wfGetDB( DB_SLAVE );
320 $revID = intval( $this->thisRev );
321 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
322 // if length of rev text in file doesn't match length in db, we reload
323 // this avoids carrying forward broken data from previous xml dumps
324 if( strlen( $text ) == $revLength ) {
325 $this->prefetchCount++;
326 return $text;
327 }
328 }
329 }
330 return $this->doGetText( $id );
331 }
332
333 private function doGetText( $id ) {
334
335 $id = intval( $id );
336 $this->failures = 0;
337 $ex = new MWException( "Graceful storage failure" );
338 while (true) {
339 if ( $this->spawn ) {
340 if ($this->failures) {
341 // we don't know why it failed, could be the child process
342 // borked, could be db entry busted, could be db server out to lunch,
343 // so cover all bases
344 $this->closeSpawn();
345 $this->openSpawn();
346 }
347 $text = $this->getTextSpawned( $id );
348 } else {
349 $text = $this->getTextDbSafe( $id );
350 }
351 if ( $text === false ) {
352 $this->failures++;
353 if ( $this->failures > $this->maxFailures) {
354 $this->progress( "Failed to retrieve revision text for text id ".
355 "$id after $this->maxFailures tries, giving up" );
356 // were there so many bad retrievals in a row we want to bail?
357 // at some point we have to declare the dump irretrievably broken
358 $this->failedTextRetrievals++;
359 if ($this->failedTextRetrievals > $this->maxConsecutiveFailedTextRetrievals) {
360 throw $ex;
361 }
362 else {
363 // would be nice to return something better to the caller someday,
364 // log what we know about the failure and about the revision
365 return("");
366 }
367 } else {
368 $this->progress( "Error $this->failures " .
369 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
370 "Pausing $this->failureTimeout seconds before retry..." );
371 sleep( $this->failureTimeout );
372 }
373 } else {
374 $this->failedTextRetrievals= 0;
375 return( $text );
376 }
377 }
378
379 }
380
381 /**
382 * Fetch a text revision from the database, retrying in case of failure.
383 * This may survive some transitory errors by reconnecting, but
384 * may not survive a long-term server outage.
385 */
386 private function getTextDbSafe( $id ) {
387 while ( true ) {
388 try {
389 $text = $this->getTextDb( $id );
390 } catch ( DBQueryError $ex ) {
391 $text = false;
392 }
393 return $text;
394 }
395 }
396
397 /**
398 * May throw a database error if, say, the server dies during query.
399 */
400 private function getTextDb( $id ) {
401 global $wgContLang;
402 $row = $this->db->selectRow( 'text',
403 array( 'old_text', 'old_flags' ),
404 array( 'old_id' => $id ),
405 __METHOD__ );
406 $text = Revision::getRevisionText( $row );
407 if ( $text === false ) {
408 return false;
409 }
410 $stripped = str_replace( "\r", "", $text );
411 $normalized = $wgContLang->normalize( $stripped );
412 return $normalized;
413 }
414
415 private function getTextSpawned( $id ) {
416 wfSuppressWarnings();
417 if ( !$this->spawnProc ) {
418 // First time?
419 $this->openSpawn();
420 }
421 $text = $this->getTextSpawnedOnce( $id );
422 wfRestoreWarnings();
423 return $text;
424 }
425
426 function openSpawn() {
427 global $IP;
428
429 $cmd = implode( " ",
430 array_map( 'wfEscapeShellArg',
431 array(
432 $this->php,
433 "$IP/maintenance/fetchText.php",
434 '--wiki', wfWikiID() ) ) );
435 $spec = array(
436 0 => array( "pipe", "r" ),
437 1 => array( "pipe", "w" ),
438 2 => array( "file", "/dev/null", "a" ) );
439 $pipes = array();
440
441 $this->progress( "Spawning database subprocess: $cmd" );
442 $this->spawnProc = proc_open( $cmd, $spec, $pipes );
443 if ( !$this->spawnProc ) {
444 // shit
445 $this->progress( "Subprocess spawn failed." );
446 return false;
447 }
448 list(
449 $this->spawnWrite, // -> stdin
450 $this->spawnRead, // <- stdout
451 ) = $pipes;
452
453 return true;
454 }
455
456 private function closeSpawn() {
457 wfSuppressWarnings();
458 if ( $this->spawnRead )
459 fclose( $this->spawnRead );
460 $this->spawnRead = false;
461 if ( $this->spawnWrite )
462 fclose( $this->spawnWrite );
463 $this->spawnWrite = false;
464 if ( $this->spawnErr )
465 fclose( $this->spawnErr );
466 $this->spawnErr = false;
467 if ( $this->spawnProc )
468 pclose( $this->spawnProc );
469 $this->spawnProc = false;
470 wfRestoreWarnings();
471 }
472
473 private function getTextSpawnedOnce( $id ) {
474 global $wgContLang;
475
476 $ok = fwrite( $this->spawnWrite, "$id\n" );
477 // $this->progress( ">> $id" );
478 if ( !$ok ) return false;
479
480 $ok = fflush( $this->spawnWrite );
481 // $this->progress( ">> [flush]" );
482 if ( !$ok ) return false;
483
484 // check that the text id they are sending is the one we asked for
485 // this avoids out of sync revision text errors we have encountered in the past
486 $newId = fgets( $this->spawnRead );
487 if ( $newId === false ) {
488 return false;
489 }
490 if ( $id != intval( $newId ) ) {
491 return false;
492 }
493
494 $len = fgets( $this->spawnRead );
495 // $this->progress( "<< " . trim( $len ) );
496 if ( $len === false ) return false;
497
498 $nbytes = intval( $len );
499 // actual error, not zero-length text
500 if ($nbytes < 0 ) return false;
501
502 $text = "";
503
504 // Subprocess may not send everything at once, we have to loop.
505 while ( $nbytes > strlen( $text ) ) {
506 $buffer = fread( $this->spawnRead, $nbytes - strlen( $text ) );
507 if ( $buffer === false ) break;
508 $text .= $buffer;
509 }
510
511 $gotbytes = strlen( $text );
512 if ( $gotbytes != $nbytes ) {
513 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
514 return false;
515 }
516
517 // Do normalization in the dump thread...
518 $stripped = str_replace( "\r", "", $text );
519 $normalized = $wgContLang->normalize( $stripped );
520 return $normalized;
521 }
522
523 function startElement( $parser, $name, $attribs ) {
524 $this->checkpointJustWritten = false;
525
526 $this->clearOpenElement( null );
527 $this->lastName = $name;
528
529 if ( $name == 'revision' ) {
530 $this->state = $name;
531 $this->egress->writeOpenPage( null, $this->buffer );
532 $this->buffer = "";
533 } elseif ( $name == 'page' ) {
534 $this->state = $name;
535 if ( $this->atStart ) {
536 $this->egress->writeOpenStream( $this->buffer );
537 $this->buffer = "";
538 $this->atStart = false;
539 }
540 }
541
542 if ( $name == "text" && isset( $attribs['id'] ) ) {
543 $text = $this->getText( $attribs['id'] );
544 $this->openElement = array( $name, array( 'xml:space' => 'preserve' ) );
545 if ( strlen( $text ) > 0 ) {
546 $this->characterData( $parser, $text );
547 }
548 } else {
549 $this->openElement = array( $name, $attribs );
550 }
551 }
552
553 function endElement( $parser, $name ) {
554 $this->checkpointJustWritten = false;
555
556 if ( $this->openElement ) {
557 $this->clearOpenElement( "" );
558 } else {
559 $this->buffer .= "</$name>";
560 }
561
562 if ( $name == 'revision' ) {
563 $this->egress->writeRevision( null, $this->buffer );
564 $this->buffer = "";
565 $this->thisRev = "";
566 } elseif ( $name == 'page' ) {
567 if (! $this->firstPageWritten) {
568 $this->firstPageWritten = trim($this->thisPage);
569 }
570 $this->lastPageWritten = trim($this->thisPage);
571 if ($this->timeExceeded) {
572 $this->egress->writeClosePage( $this->buffer );
573 # nasty hack, we can't just write the chardata after the
574 # page tag, it will include leading blanks from the next line
575 $this->egress->sink->write("\n");
576
577 $this->buffer = $this->xmlwriterobj->closeStream();
578 $this->egress->writeCloseStream( $this->buffer );
579
580 $this->buffer = "";
581 $this->thisPage = "";
582 /* this could be more than one file if we had more than one output arg */
583 $checkpointFilenames = array();
584 $filenameList = $this->egress->getFilename();
585
586 if (! is_array($filenameList)) {
587 $filenameList = array( $filenameList );
588 }
589 $newFilenames = array();
590 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
591 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
592 for ($i =0; $i < count($filenameList); $i++) {
593 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
594 $fileinfo = pathinfo($filenameList[$i]);
595 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
596 }
597 $this->egress->closeRenameAndReopen( $newFilenames );
598 $this->buffer = $this->xmlwriterobj->openStream();
599 $this->timeExceeded = false;
600 $this->timeOfCheckpoint = $this->lastTime;
601 $this->firstPageWritten = false;
602 $this->checkpointJustWritten = true;
603 }
604 else {
605 $this->egress->writeClosePage( $this->buffer );
606 $this->buffer = "";
607 $this->thisPage = "";
608 }
609
610 } elseif ( $name == 'mediawiki' ) {
611 $this->egress->writeCloseStream( $this->buffer );
612 $this->buffer = "";
613 }
614 }
615
616 function characterData( $parser, $data ) {
617 $this->clearOpenElement( null );
618 if ( $this->lastName == "id" ) {
619 if ( $this->state == "revision" ) {
620 $this->thisRev .= $data;
621 } elseif ( $this->state == "page" ) {
622 $this->thisPage .= $data;
623 }
624 }
625 # have to skip the newline left over from closepagetag line of
626 # end of checkpoint files. nasty hack!!
627 if ($this->checkpointJustWritten) {
628 if ($data[0] == "\n") {
629 $data = substr($data,1);
630 }
631 $this->checkpointJustWritten = false;
632 }
633 $this->buffer .= htmlspecialchars( $data );
634 }
635
636 function clearOpenElement( $style ) {
637 if ( $this->openElement ) {
638 $this->buffer .= Xml::element( $this->openElement[0], $this->openElement[1], $style );
639 $this->openElement = false;
640 }
641 }
642 }
643
644
645 $dumper = new TextPassDumper( $argv );
646
647 if ( !isset( $options['help'] ) ) {
648 $dumper->dump( true );
649 } else {
650 $dumper->progress( <<<ENDS
651 This script postprocesses XML dumps from dumpBackup.php to add
652 page text which was stubbed out (using --stub).
653
654 XML input is accepted on stdin.
655 XML output is sent to stdout; progress reports are sent to stderr.
656
657 Usage: php dumpTextPass.php [<options>]
658 Options:
659 --stub=<type>:<file> To load a compressed stub dump instead of stdin
660 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
661 pressure on the database.
662 (Requires the XMLReader extension)
663 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
664 out complete page, closing xml file properly, and opening new one
665 with header). This option requires the checkpointfile option.
666 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
667 substituting first pageid written for the first %s (required) and the
668 last pageid written for the second %s if it exists.
669 --quiet Don't dump status reports to stderr.
670 --report=n Report position and speed after every n pages processed.
671 (Default: 100)
672 --server=h Force reading from MySQL server h
673 --current Base ETA on number of pages in database instead of all revisions
674 --spawn Spawn a subprocess for loading text records
675 --help Display this help message
676 ENDS
677 );
678 }
679
680