3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
24 * @ingroup Maintenance
27 $originalDir = getcwd();
29 require_once( dirname( __FILE__
) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
33 * @ingroup Maintenance
35 class TextPassDumper
extends BackupDumper
{
37 var $input = "php://stdin";
38 var $history = WikiExporter
::FULL
;
40 var $prefetchCount = 0;
42 var $pageCountLast = 0;
43 var $revCountLast = 0;
44 var $prefetchCountLast = 0;
45 var $fetchCountLast = 0;
49 var $failedTextRetrievals = 0;
50 var $maxConsecutiveFailedTextRetrievals = 200;
51 var $failureTimeout = 5; // Seconds to sleep after db failure
55 var $spawnProc = false;
56 var $spawnWrite = false;
57 var $spawnRead = false;
58 var $spawnErr = false;
62 var $xmlwriterobj = false;
64 # when we spend more than maxTimeAllowed seconds on this run, we continue
65 # processing until we write out the next complete page, then save output file(s),
66 # rename it/them and open new one(s)
67 var $maxTimeAllowed = 0; // 0 = no limit
68 var $timeExceeded = false;
69 var $firstPageWritten = false;
70 var $lastPageWritten = false;
71 var $checkpointJustWritten = false;
72 var $checkpointFiles = array();
74 function initProgress( $history ) {
75 parent
::initProgress();
76 $this->ID
= getmypid();
77 $this->lastTime
= $this->startTime
;
78 $this->timeOfCheckpoint
= $this->startTime
;
81 function dump( $history, $text = WikiExporter
::TEXT
) {
82 # This shouldn't happen if on console... ;)
83 header( 'Content-type: text/html; charset=UTF-8' );
85 # Notice messages will foul up your XML output even if they're
86 # relatively harmless.
87 if ( ini_get( 'display_errors' ) )
88 ini_set( 'display_errors', 'stderr' );
90 $this->initProgress( $this->history
);
92 $this->db
= $this->backupDb();
94 $this->egress
= new ExportProgressFilter( $this->sink
, $this );
96 # it would be nice to do it in the constructor, oh well. need egress set
97 $this->finalOptionCheck();
99 # we only want this so we know how to close a stream :-P
100 $this->xmlwriterobj
= new XmlDumpWriter();
102 $input = fopen( $this->input
, "rt" );
103 $result = $this->readDump( $input );
105 if ( WikiError
::isError( $result ) ) {
106 wfDie( $result->getMessage() );
109 if ( $this->spawnProc
) {
113 $this->report( true );
116 function processOption( $opt, $val, $param ) {
118 $url = $this->processFileOpt( $val, $param );
122 require_once "$IP/maintenance/backupPrefetch.inc";
123 $this->prefetch
= new BaseDump( $url );
129 $this->maxTimeAllowed
= intval($val)*60;
131 case 'checkpointfile':
132 $this->checkpointFiles
[] = $val;
135 $this->history
= WikiExporter
::CURRENT
;
138 $this->history
= WikiExporter
::FULL
;
149 function processFileOpt( $val, $param ) {
150 $fileURIs = explode(';',$param);
151 foreach ( $fileURIs as $URI ) {
157 $newURI = "compress.zlib://$URI";
160 $newURI = "compress.bzip2://$URI";
163 $newURI = "mediawiki.compress.7z://$URI";
168 $newFileURIs[] = $newURI;
170 $val = implode( ';', $newFileURIs );
175 * Overridden to include prefetch ratio if enabled.
177 function showReport() {
178 if ( !$this->prefetch
) {
179 return parent
::showReport();
182 if ( $this->reporting
) {
183 $now = wfTimestamp( TS_DB
);
184 $deltaAll = wfTime() - $this->startTime
;
185 $deltaPart = wfTime() - $this->lastTime
;
186 $this->pageCountPart
= $this->pageCount
- $this->pageCountLast
;
187 $this->revCountPart
= $this->revCount
- $this->revCountLast
;
190 $portion = $this->revCount
/ $this->maxCount
;
191 $eta = $this->startTime +
$deltaAll / $portion;
192 $etats = wfTimestamp( TS_DB
, intval( $eta ) );
193 if ( $this->fetchCount
) {
194 $fetchRate = 100.0 * $this->prefetchCount
/ $this->fetchCount
;
199 $pageRate = $this->pageCount
/ $deltaAll;
200 $revRate = $this->revCount
/ $deltaAll;
208 if ( $this->fetchCountLast
) {
209 $fetchRatePart = 100.0 * $this->prefetchCountLast
/ $this->fetchCountLast
;
212 $fetchRatePart = '-';
214 $pageRatePart = $this->pageCountPart
/ $deltaPart;
215 $revRatePart = $this->revCountPart
/ $deltaPart;
218 $fetchRatePart = '-';
222 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",-
223 $now, wfWikiID(), $this->ID
, $this->pageCount
, $pageRate, $pageRatePart, $this->revCount
, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount
) );
224 $this->lastTime
= $now;
225 $this->revCountLast
= $this->revCount
;
226 $this->prefetchCountLast
= $this->prefetchCount
;
227 $this->fetchCountLast
= $this->fetchCount
;
231 function setTimeExceeded() {
232 $this->timeExceeded
= True;
235 function checkIfTimeExceeded() {
236 if ( $this->maxTimeAllowed
&& ( $this->lastTime
- $this->timeOfCheckpoint
> $this->maxTimeAllowed
) ) {
242 function finalOptionCheck() {
243 if (($this->checkpointFiles
&& ! $this->maxTimeAllowed
) ||
244 ($this->maxTimeAllowed
&& !$this->checkpointFiles
)) {
245 wfDie("Options checkpointfile and maxtime must be specified together.\n");
247 foreach ($this->checkpointFiles
as $checkpointFile) {
248 $count = substr_count ($checkpointFile,"%s");
249 if (substr_count ($checkpointFile,"%s") != 2) {
250 wfDie("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
254 $filenameList = $this->egress
->getFilename();
255 if (! is_array($filenameList)) {
256 $filenameList = array( $filenameList );
258 if (count($filenameList) != count($this->checkpointFiles
)) {
259 wfDie("One checkpointfile must be specified for each output option, if maxtime is used.\n");
263 function readDump( $input ) {
265 $this->openElement
= false;
266 $this->atStart
= true;
268 $this->lastName
= "";
272 $parser = xml_parser_create( "UTF-8" );
273 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING
, false );
275 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
276 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
278 $offset = 0; // for context extraction on error reporting
279 $bufferSize = 512 * 1024;
281 if ($this->checkIfTimeExceeded()) {
282 $this->setTimeExceeded();
284 $chunk = fread( $input, $bufferSize );
285 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
286 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
287 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
289 $offset +
= strlen( $chunk );
290 } while ( $chunk !== false && !feof( $input ) );
291 if ($this->maxTimeAllowed
) {
292 $filenameList = $this->egress
->getFilename();
293 # we wrote some stuff after last checkpoint that needs renamed */
294 if (! is_array($filenameList)) {
295 $filenameList = array( $filenameList );
297 if (file_exists($filenameList[0])) {
298 $newFilenames = array();
299 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
300 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
301 for ($i =0; $i < count($filenameList); $i++
) {
302 $checkpointNameFilledIn = sprintf($this->checkpointFiles
[$i], $firstPageID, $lastPageID);
303 $fileinfo = pathinfo($filenameList[$i]);
304 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
306 $this->egress
->rename( $newFilenames );
309 xml_parser_free( $parser );
314 function getText( $id ) {
316 if ( isset( $this->prefetch
) ) {
317 $text = $this->prefetch
->prefetch( $this->thisPage
, $this->thisRev
);
318 if ( $text !== null ) { // Entry missing from prefetch dump
319 $dbr = wfGetDB( DB_SLAVE
);
320 $revID = intval( $this->thisRev
);
321 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
322 // if length of rev text in file doesn't match length in db, we reload
323 // this avoids carrying forward broken data from previous xml dumps
324 if( strlen( $text ) == $revLength ) {
325 $this->prefetchCount++
;
330 return $this->doGetText( $id );
333 private function doGetText( $id ) {
337 $ex = new MWException( "Graceful storage failure" );
339 if ( $this->spawn
) {
340 if ($this->failures
) {
341 // we don't know why it failed, could be the child process
342 // borked, could be db entry busted, could be db server out to lunch,
343 // so cover all bases
347 $text = $this->getTextSpawned( $id );
349 $text = $this->getTextDbSafe( $id );
351 if ( $text === false ) {
353 if ( $this->failures
> $this->maxFailures
) {
354 $this->progress( "Failed to retrieve revision text for text id ".
355 "$id after $this->maxFailures tries, giving up" );
356 // were there so many bad retrievals in a row we want to bail?
357 // at some point we have to declare the dump irretrievably broken
358 $this->failedTextRetrievals++
;
359 if ($this->failedTextRetrievals
> $this->maxConsecutiveFailedTextRetrievals
) {
363 // would be nice to return something better to the caller someday,
364 // log what we know about the failure and about the revision
368 $this->progress( "Error $this->failures " .
369 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
370 "Pausing $this->failureTimeout seconds before retry..." );
371 sleep( $this->failureTimeout
);
374 $this->failedTextRetrievals
= 0;
382 * Fetch a text revision from the database, retrying in case of failure.
383 * This may survive some transitory errors by reconnecting, but
384 * may not survive a long-term server outage.
386 private function getTextDbSafe( $id ) {
389 $text = $this->getTextDb( $id );
390 } catch ( DBQueryError
$ex ) {
398 * May throw a database error if, say, the server dies during query.
400 private function getTextDb( $id ) {
402 $row = $this->db
->selectRow( 'text',
403 array( 'old_text', 'old_flags' ),
404 array( 'old_id' => $id ),
406 $text = Revision
::getRevisionText( $row );
407 if ( $text === false ) {
410 $stripped = str_replace( "\r", "", $text );
411 $normalized = $wgContLang->normalize( $stripped );
415 private function getTextSpawned( $id ) {
416 wfSuppressWarnings();
417 if ( !$this->spawnProc
) {
421 $text = $this->getTextSpawnedOnce( $id );
426 function openSpawn() {
430 array_map( 'wfEscapeShellArg',
433 "$IP/maintenance/fetchText.php",
434 '--wiki', wfWikiID() ) ) );
436 0 => array( "pipe", "r" ),
437 1 => array( "pipe", "w" ),
438 2 => array( "file", "/dev/null", "a" ) );
441 $this->progress( "Spawning database subprocess: $cmd" );
442 $this->spawnProc
= proc_open( $cmd, $spec, $pipes );
443 if ( !$this->spawnProc
) {
445 $this->progress( "Subprocess spawn failed." );
449 $this->spawnWrite
, // -> stdin
450 $this->spawnRead
, // <- stdout
456 private function closeSpawn() {
457 wfSuppressWarnings();
458 if ( $this->spawnRead
)
459 fclose( $this->spawnRead
);
460 $this->spawnRead
= false;
461 if ( $this->spawnWrite
)
462 fclose( $this->spawnWrite
);
463 $this->spawnWrite
= false;
464 if ( $this->spawnErr
)
465 fclose( $this->spawnErr
);
466 $this->spawnErr
= false;
467 if ( $this->spawnProc
)
468 pclose( $this->spawnProc
);
469 $this->spawnProc
= false;
473 private function getTextSpawnedOnce( $id ) {
476 $ok = fwrite( $this->spawnWrite
, "$id\n" );
477 // $this->progress( ">> $id" );
478 if ( !$ok ) return false;
480 $ok = fflush( $this->spawnWrite
);
481 // $this->progress( ">> [flush]" );
482 if ( !$ok ) return false;
484 // check that the text id they are sending is the one we asked for
485 // this avoids out of sync revision text errors we have encountered in the past
486 $newId = fgets( $this->spawnRead
);
487 if ( $newId === false ) {
490 if ( $id != intval( $newId ) ) {
494 $len = fgets( $this->spawnRead
);
495 // $this->progress( "<< " . trim( $len ) );
496 if ( $len === false ) return false;
498 $nbytes = intval( $len );
499 // actual error, not zero-length text
500 if ($nbytes < 0 ) return false;
504 // Subprocess may not send everything at once, we have to loop.
505 while ( $nbytes > strlen( $text ) ) {
506 $buffer = fread( $this->spawnRead
, $nbytes - strlen( $text ) );
507 if ( $buffer === false ) break;
511 $gotbytes = strlen( $text );
512 if ( $gotbytes != $nbytes ) {
513 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
517 // Do normalization in the dump thread...
518 $stripped = str_replace( "\r", "", $text );
519 $normalized = $wgContLang->normalize( $stripped );
523 function startElement( $parser, $name, $attribs ) {
524 $this->checkpointJustWritten
= false;
526 $this->clearOpenElement( null );
527 $this->lastName
= $name;
529 if ( $name == 'revision' ) {
530 $this->state
= $name;
531 $this->egress
->writeOpenPage( null, $this->buffer
);
533 } elseif ( $name == 'page' ) {
534 $this->state
= $name;
535 if ( $this->atStart
) {
536 $this->egress
->writeOpenStream( $this->buffer
);
538 $this->atStart
= false;
542 if ( $name == "text" && isset( $attribs['id'] ) ) {
543 $text = $this->getText( $attribs['id'] );
544 $this->openElement
= array( $name, array( 'xml:space' => 'preserve' ) );
545 if ( strlen( $text ) > 0 ) {
546 $this->characterData( $parser, $text );
549 $this->openElement
= array( $name, $attribs );
553 function endElement( $parser, $name ) {
554 $this->checkpointJustWritten
= false;
556 if ( $this->openElement
) {
557 $this->clearOpenElement( "" );
559 $this->buffer
.= "</$name>";
562 if ( $name == 'revision' ) {
563 $this->egress
->writeRevision( null, $this->buffer
);
566 } elseif ( $name == 'page' ) {
567 if (! $this->firstPageWritten
) {
568 $this->firstPageWritten
= trim($this->thisPage
);
570 $this->lastPageWritten
= trim($this->thisPage
);
571 if ($this->timeExceeded
) {
572 $this->egress
->writeClosePage( $this->buffer
);
573 # nasty hack, we can't just write the chardata after the
574 # page tag, it will include leading blanks from the next line
575 $this->egress
->sink
->write("\n");
577 $this->buffer
= $this->xmlwriterobj
->closeStream();
578 $this->egress
->writeCloseStream( $this->buffer
);
581 $this->thisPage
= "";
582 /* this could be more than one file if we had more than one output arg */
583 $checkpointFilenames = array();
584 $filenameList = $this->egress
->getFilename();
586 if (! is_array($filenameList)) {
587 $filenameList = array( $filenameList );
589 $newFilenames = array();
590 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
591 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
592 for ($i =0; $i < count($filenameList); $i++
) {
593 $checkpointNameFilledIn = sprintf($this->checkpointFiles
[$i], $firstPageID, $lastPageID);
594 $fileinfo = pathinfo($filenameList[$i]);
595 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
597 $this->egress
->closeRenameAndReopen( $newFilenames );
598 $this->buffer
= $this->xmlwriterobj
->openStream();
599 $this->timeExceeded
= false;
600 $this->timeOfCheckpoint
= $this->lastTime
;
601 $this->firstPageWritten
= false;
602 $this->checkpointJustWritten
= true;
605 $this->egress
->writeClosePage( $this->buffer
);
607 $this->thisPage
= "";
610 } elseif ( $name == 'mediawiki' ) {
611 $this->egress
->writeCloseStream( $this->buffer
);
616 function characterData( $parser, $data ) {
617 $this->clearOpenElement( null );
618 if ( $this->lastName
== "id" ) {
619 if ( $this->state
== "revision" ) {
620 $this->thisRev
.= $data;
621 } elseif ( $this->state
== "page" ) {
622 $this->thisPage
.= $data;
625 # have to skip the newline left over from closepagetag line of
626 # end of checkpoint files. nasty hack!!
627 if ($this->checkpointJustWritten
) {
628 if ($data[0] == "\n") {
629 $data = substr($data,1);
631 $this->checkpointJustWritten
= false;
633 $this->buffer
.= htmlspecialchars( $data );
636 function clearOpenElement( $style ) {
637 if ( $this->openElement
) {
638 $this->buffer
.= Xml
::element( $this->openElement
[0], $this->openElement
[1], $style );
639 $this->openElement
= false;
645 $dumper = new TextPassDumper( $argv );
647 if ( !isset( $options['help'] ) ) {
648 $dumper->dump( true );
650 $dumper->progress( <<<ENDS
651 This script postprocesses XML dumps from dumpBackup.php to add
652 page text which was stubbed out (using --stub).
654 XML input is accepted on stdin.
655 XML output is sent to stdout; progress reports are sent to stderr.
657 Usage: php dumpTextPass.php [<options>]
659 --stub=<type>:<file> To load a compressed stub dump instead of stdin
660 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
661 pressure on the database.
662 (Requires the XMLReader extension)
663 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
664 out complete page, closing xml file properly, and opening new one
665 with header). This option requires the checkpointfile option.
666 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
667 substituting first pageid written for the first %s (required) and the
668 last pageid written for the second %s if it exists.
669 --quiet Don't dump status reports to stderr.
670 --report=n Report position and speed after every n pages processed.
672 --server=h Force reading from MySQL server h
673 --current Base ETA on number of pages in database instead of all revisions
674 --spawn Spawn a subprocess for loading text records
675 --help Display this help message