3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
24 * @ingroup Maintenance
27 $originalDir = getcwd();
29 require_once( dirname( __FILE__
) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
33 * @ingroup Maintenance
35 class TextPassDumper
extends BackupDumper
{
37 var $input = "php://stdin";
38 var $history = WikiExporter
::FULL
;
40 var $prefetchCount = 0;
42 var $pageCountLast = 0;
43 var $revCountLast = 0;
44 var $prefetchCountLast = 0;
45 var $fetchCountLast = 0;
49 var $failedTextRetrievals = 0;
50 var $maxConsecutiveFailedTextRetrievals = 200;
51 var $failureTimeout = 5; // Seconds to sleep after db failure
55 var $spawnProc = false;
56 var $spawnWrite = false;
57 var $spawnRead = false;
58 var $spawnErr = false;
62 var $xmlwriterobj = false;
64 # when we spend more than maxTimeAllowed seconds on this run, we continue
65 # processing until we write out the next complete page, then save output file(s),
66 # rename it/them and open new one(s)
67 var $maxTimeAllowed = 0; // 0 = no limit
68 var $timeExceeded = false;
69 var $firstPageWritten = false;
70 var $lastPageWritten = false;
71 var $checkpointJustWritten = false;
72 var $checkpointFiles = array();
74 function initProgress( $history ) {
75 parent
::initProgress();
76 $this->ID
= getmypid();
77 $this->lastTime
= $this->startTime
;
78 $this->timeOfCheckpoint
= $this->startTime
;
81 function dump( $history, $text = WikiExporter
::TEXT
) {
82 # This shouldn't happen if on console... ;)
83 header( 'Content-type: text/html; charset=UTF-8' );
85 # Notice messages will foul up your XML output even if they're
86 # relatively harmless.
87 if ( ini_get( 'display_errors' ) )
88 ini_set( 'display_errors', 'stderr' );
90 $this->initProgress( $this->history
);
92 $this->db
= $this->backupDb();
94 $this->egress
= new ExportProgressFilter( $this->sink
, $this );
96 # it would be nice to do it in the constructor, oh well. need egress set
97 $this->finalOptionCheck();
99 # we only want this so we know how to close a stream :-P
100 $this->xmlwriterobj
= new XmlDumpWriter();
102 $input = fopen( $this->input
, "rt" );
103 $result = $this->readDump( $input );
105 if ( WikiError
::isError( $result ) ) {
106 wfDie( $result->getMessage() );
109 if ( $this->spawnProc
) {
113 $this->report( true );
116 function processOption( $opt, $val, $param ) {
118 $url = $this->processFileOpt( $val, $param );
122 require_once "$IP/maintenance/backupPrefetch.inc";
123 $this->prefetch
= new BaseDump( $url );
129 $this->maxTimeAllowed
= intval($val)*60;
131 case 'checkpointfile':
132 $this->checkpointFiles
[] = $val;
135 $this->history
= WikiExporter
::CURRENT
;
138 $this->history
= WikiExporter
::FULL
;
149 function processFileOpt( $val, $param ) {
150 $fileURIs = explode(';',$param);
151 foreach ( $fileURIs as $URI ) {
157 $newURI = "compress.zlib://$URI";
160 $newURI = "compress.bzip2://$URI";
163 $newURI = "mediawiki.compress.7z://$URI";
168 $newFileURIs[] = $newURI;
170 $val = implode( ';', $newFileURIs );
175 * Overridden to include prefetch ratio if enabled.
177 function showReport() {
178 if ( !$this->prefetch
) {
179 return parent
::showReport();
182 if ( $this->reporting
) {
183 $now = wfTimestamp( TS_DB
);
185 $deltaAll = wfTime() - $this->startTime
;
186 $deltaPart = wfTime() - $this->lastTime
;
187 $this->pageCountPart
= $this->pageCount
- $this->pageCountLast
;
188 $this->revCountPart
= $this->revCount
- $this->revCountLast
;
191 $portion = $this->revCount
/ $this->maxCount
;
192 $eta = $this->startTime +
$deltaAll / $portion;
193 $etats = wfTimestamp( TS_DB
, intval( $eta ) );
194 if ( $this->fetchCount
) {
195 $fetchRate = 100.0 * $this->prefetchCount
/ $this->fetchCount
;
200 $pageRate = $this->pageCount
/ $deltaAll;
201 $revRate = $this->revCount
/ $deltaAll;
209 if ( $this->fetchCountLast
) {
210 $fetchRatePart = 100.0 * $this->prefetchCountLast
/ $this->fetchCountLast
;
213 $fetchRatePart = '-';
215 $pageRatePart = $this->pageCountPart
/ $deltaPart;
216 $revRatePart = $this->revCountPart
/ $deltaPart;
219 $fetchRatePart = '-';
223 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
224 $now, wfWikiID(), $this->ID
, $this->pageCount
, $pageRate, $pageRatePart, $this->revCount
, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount
) );
225 $this->lastTime
= $nowts;
226 $this->revCountLast
= $this->revCount
;
227 $this->prefetchCountLast
= $this->prefetchCount
;
228 $this->fetchCountLast
= $this->fetchCount
;
232 function setTimeExceeded() {
233 $this->timeExceeded
= True;
236 function checkIfTimeExceeded() {
237 $m1 = $this->maxTimeAllowed
;
238 $m2 = $this->lastTime
;
239 $m3 = $this->timeOfCheckpoint
;
240 $m4 = $this->lastTime
- $this->timeOfCheckpoint
;
241 if ( $this->maxTimeAllowed
&& ( $this->lastTime
- $this->timeOfCheckpoint
> $this->maxTimeAllowed
) ) {
247 function finalOptionCheck() {
248 if (($this->checkpointFiles
&& ! $this->maxTimeAllowed
) ||
249 ($this->maxTimeAllowed
&& !$this->checkpointFiles
)) {
250 wfDie("Options checkpointfile and maxtime must be specified together.\n");
252 foreach ($this->checkpointFiles
as $checkpointFile) {
253 $count = substr_count ($checkpointFile,"%s");
254 if (substr_count ($checkpointFile,"%s") != 2) {
255 wfDie("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
259 $filenameList = $this->egress
->getFilename();
260 if (! is_array($filenameList)) {
261 $filenameList = array( $filenameList );
263 if (count($filenameList) != count($this->checkpointFiles
)) {
264 wfDie("One checkpointfile must be specified for each output option, if maxtime is used.\n");
268 function readDump( $input ) {
270 $this->openElement
= false;
271 $this->atStart
= true;
273 $this->lastName
= "";
277 $parser = xml_parser_create( "UTF-8" );
278 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING
, false );
280 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
281 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
283 $offset = 0; // for context extraction on error reporting
284 $bufferSize = 512 * 1024;
286 if ($this->checkIfTimeExceeded()) {
287 $this->setTimeExceeded();
289 $chunk = fread( $input, $bufferSize );
290 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
291 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
292 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
294 $offset +
= strlen( $chunk );
295 } while ( $chunk !== false && !feof( $input ) );
296 if ($this->maxTimeAllowed
) {
297 $filenameList = $this->egress
->getFilename();
298 # we wrote some stuff after last checkpoint that needs renamed */
299 if (! is_array($filenameList)) {
300 $filenameList = array( $filenameList );
302 if (file_exists($filenameList[0])) {
303 $newFilenames = array();
304 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
305 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
306 for ($i =0; $i < count($filenameList); $i++
) {
307 $checkpointNameFilledIn = sprintf($this->checkpointFiles
[$i], $firstPageID, $lastPageID);
308 $fileinfo = pathinfo($filenameList[$i]);
309 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
311 $this->egress
->closeAndRename( $newFilenames );
314 xml_parser_free( $parser );
319 function getText( $id ) {
321 if ( isset( $this->prefetch
) ) {
322 $text = $this->prefetch
->prefetch( $this->thisPage
, $this->thisRev
);
323 if ( $text !== null ) { // Entry missing from prefetch dump
324 $dbr = wfGetDB( DB_SLAVE
);
325 $revID = intval( $this->thisRev
);
326 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
327 // if length of rev text in file doesn't match length in db, we reload
328 // this avoids carrying forward broken data from previous xml dumps
329 if( strlen( $text ) == $revLength ) {
330 $this->prefetchCount++
;
335 return $this->doGetText( $id );
338 private function doGetText( $id ) {
342 $ex = new MWException( "Graceful storage failure" );
344 if ( $this->spawn
) {
345 if ($this->failures
) {
346 // we don't know why it failed, could be the child process
347 // borked, could be db entry busted, could be db server out to lunch,
348 // so cover all bases
352 $text = $this->getTextSpawned( $id );
354 $text = $this->getTextDbSafe( $id );
356 if ( $text === false ) {
358 if ( $this->failures
> $this->maxFailures
) {
359 $this->progress( "Failed to retrieve revision text for text id ".
360 "$id after $this->maxFailures tries, giving up" );
361 // were there so many bad retrievals in a row we want to bail?
362 // at some point we have to declare the dump irretrievably broken
363 $this->failedTextRetrievals++
;
364 if ($this->failedTextRetrievals
> $this->maxConsecutiveFailedTextRetrievals
) {
368 // would be nice to return something better to the caller someday,
369 // log what we know about the failure and about the revision
373 $this->progress( "Error $this->failures " .
374 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
375 "Pausing $this->failureTimeout seconds before retry..." );
376 sleep( $this->failureTimeout
);
379 $this->failedTextRetrievals
= 0;
387 * Fetch a text revision from the database, retrying in case of failure.
388 * This may survive some transitory errors by reconnecting, but
389 * may not survive a long-term server outage.
391 private function getTextDbSafe( $id ) {
394 $text = $this->getTextDb( $id );
395 } catch ( DBQueryError
$ex ) {
403 * May throw a database error if, say, the server dies during query.
405 private function getTextDb( $id ) {
407 $row = $this->db
->selectRow( 'text',
408 array( 'old_text', 'old_flags' ),
409 array( 'old_id' => $id ),
411 $text = Revision
::getRevisionText( $row );
412 if ( $text === false ) {
415 $stripped = str_replace( "\r", "", $text );
416 $normalized = $wgContLang->normalize( $stripped );
420 private function getTextSpawned( $id ) {
421 wfSuppressWarnings();
422 if ( !$this->spawnProc
) {
426 $text = $this->getTextSpawnedOnce( $id );
431 function openSpawn() {
435 array_map( 'wfEscapeShellArg',
438 "$IP/maintenance/fetchText.php",
439 '--wiki', wfWikiID() ) ) );
441 0 => array( "pipe", "r" ),
442 1 => array( "pipe", "w" ),
443 2 => array( "file", "/dev/null", "a" ) );
446 $this->progress( "Spawning database subprocess: $cmd" );
447 $this->spawnProc
= proc_open( $cmd, $spec, $pipes );
448 if ( !$this->spawnProc
) {
450 $this->progress( "Subprocess spawn failed." );
454 $this->spawnWrite
, // -> stdin
455 $this->spawnRead
, // <- stdout
461 private function closeSpawn() {
462 wfSuppressWarnings();
463 if ( $this->spawnRead
)
464 fclose( $this->spawnRead
);
465 $this->spawnRead
= false;
466 if ( $this->spawnWrite
)
467 fclose( $this->spawnWrite
);
468 $this->spawnWrite
= false;
469 if ( $this->spawnErr
)
470 fclose( $this->spawnErr
);
471 $this->spawnErr
= false;
472 if ( $this->spawnProc
)
473 pclose( $this->spawnProc
);
474 $this->spawnProc
= false;
478 private function getTextSpawnedOnce( $id ) {
481 $ok = fwrite( $this->spawnWrite
, "$id\n" );
482 // $this->progress( ">> $id" );
483 if ( !$ok ) return false;
485 $ok = fflush( $this->spawnWrite
);
486 // $this->progress( ">> [flush]" );
487 if ( !$ok ) return false;
489 // check that the text id they are sending is the one we asked for
490 // this avoids out of sync revision text errors we have encountered in the past
491 $newId = fgets( $this->spawnRead
);
492 if ( $newId === false ) {
495 if ( $id != intval( $newId ) ) {
499 $len = fgets( $this->spawnRead
);
500 // $this->progress( "<< " . trim( $len ) );
501 if ( $len === false ) return false;
503 $nbytes = intval( $len );
504 // actual error, not zero-length text
505 if ($nbytes < 0 ) return false;
509 // Subprocess may not send everything at once, we have to loop.
510 while ( $nbytes > strlen( $text ) ) {
511 $buffer = fread( $this->spawnRead
, $nbytes - strlen( $text ) );
512 if ( $buffer === false ) break;
516 $gotbytes = strlen( $text );
517 if ( $gotbytes != $nbytes ) {
518 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
522 // Do normalization in the dump thread...
523 $stripped = str_replace( "\r", "", $text );
524 $normalized = $wgContLang->normalize( $stripped );
528 function startElement( $parser, $name, $attribs ) {
529 $this->checkpointJustWritten
= false;
531 $this->clearOpenElement( null );
532 $this->lastName
= $name;
534 if ( $name == 'revision' ) {
535 $this->state
= $name;
536 $this->egress
->writeOpenPage( null, $this->buffer
);
538 } elseif ( $name == 'page' ) {
539 $this->state
= $name;
540 if ( $this->atStart
) {
541 $this->egress
->writeOpenStream( $this->buffer
);
543 $this->atStart
= false;
547 if ( $name == "text" && isset( $attribs['id'] ) ) {
548 $text = $this->getText( $attribs['id'] );
549 $this->openElement
= array( $name, array( 'xml:space' => 'preserve' ) );
550 if ( strlen( $text ) > 0 ) {
551 $this->characterData( $parser, $text );
554 $this->openElement
= array( $name, $attribs );
558 function endElement( $parser, $name ) {
559 $this->checkpointJustWritten
= false;
561 if ( $this->openElement
) {
562 $this->clearOpenElement( "" );
564 $this->buffer
.= "</$name>";
567 if ( $name == 'revision' ) {
568 $this->egress
->writeRevision( null, $this->buffer
);
571 } elseif ( $name == 'page' ) {
572 if (! $this->firstPageWritten
) {
573 $this->firstPageWritten
= trim($this->thisPage
);
575 $this->lastPageWritten
= trim($this->thisPage
);
576 if ($this->timeExceeded
) {
577 $this->egress
->writeClosePage( $this->buffer
);
578 # nasty hack, we can't just write the chardata after the
579 # page tag, it will include leading blanks from the next line
580 $this->egress
->sink
->write("\n");
582 $this->buffer
= $this->xmlwriterobj
->closeStream();
583 $this->egress
->writeCloseStream( $this->buffer
);
586 $this->thisPage
= "";
587 /* this could be more than one file if we had more than one output arg */
588 $checkpointFilenames = array();
589 $filenameList = $this->egress
->getFilename();
591 if (! is_array($filenameList)) {
592 $filenameList = array( $filenameList );
594 $newFilenames = array();
595 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
596 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
597 for ($i =0; $i < count($filenameList); $i++
) {
598 $checkpointNameFilledIn = sprintf($this->checkpointFiles
[$i], $firstPageID, $lastPageID);
599 $fileinfo = pathinfo($filenameList[$i]);
600 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
602 $this->egress
->closeRenameAndReopen( $newFilenames );
603 $this->buffer
= $this->xmlwriterobj
->openStream();
604 $this->timeExceeded
= false;
605 $this->timeOfCheckpoint
= $this->lastTime
;
606 $this->firstPageWritten
= false;
607 $this->checkpointJustWritten
= true;
610 $this->egress
->writeClosePage( $this->buffer
);
612 $this->thisPage
= "";
615 } elseif ( $name == 'mediawiki' ) {
616 $this->egress
->writeCloseStream( $this->buffer
);
621 function characterData( $parser, $data ) {
622 $this->clearOpenElement( null );
623 if ( $this->lastName
== "id" ) {
624 if ( $this->state
== "revision" ) {
625 $this->thisRev
.= $data;
626 } elseif ( $this->state
== "page" ) {
627 $this->thisPage
.= $data;
630 # have to skip the newline left over from closepagetag line of
631 # end of checkpoint files. nasty hack!!
632 if ($this->checkpointJustWritten
) {
633 if ($data[0] == "\n") {
634 $data = substr($data,1);
636 $this->checkpointJustWritten
= false;
638 $this->buffer
.= htmlspecialchars( $data );
641 function clearOpenElement( $style ) {
642 if ( $this->openElement
) {
643 $this->buffer
.= Xml
::element( $this->openElement
[0], $this->openElement
[1], $style );
644 $this->openElement
= false;
650 $dumper = new TextPassDumper( $argv );
652 if ( !isset( $options['help'] ) ) {
653 $dumper->dump( true );
655 $dumper->progress( <<<ENDS
656 This script postprocesses XML dumps from dumpBackup.php to add
657 page text which was stubbed out (using --stub).
659 XML input is accepted on stdin.
660 XML output is sent to stdout; progress reports are sent to stderr.
662 Usage: php dumpTextPass.php [<options>]
664 --stub=<type>:<file> To load a compressed stub dump instead of stdin
665 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
666 pressure on the database.
667 (Requires the XMLReader extension)
668 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
669 out complete page, closing xml file properly, and opening new one
670 with header). This option requires the checkpointfile option.
671 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
672 substituting first pageid written for the first %s (required) and the
673 last pageid written for the second %s if it exists.
674 --quiet Don't dump status reports to stderr.
675 --report=n Report position and speed after every n pages processed.
677 --server=h Force reading from MySQL server h
678 --current Base ETA on number of pages in database instead of all revisions
679 --spawn Spawn a subprocess for loading text records
680 --help Display this help message