const LOGS = 8;
const RANGE = 16;
- const BUFFER = 0;
- const STREAM = 1;
-
const TEXT = 0;
const STUB = 1;
- /** @var int */
- public $buffer;
+ const BATCH_SIZE = 1000;
/** @var int */
public $text;
}
/**
- * If using WikiExporter::STREAM to stream a large amount of data,
- * provide a database connection which is not managed by
- * LoadBalancer to read from: some history blob types will
- * make additional queries to pull source data while the
- * main query is still running.
- *
* @param IDatabase $db
* @param int|array $history One of WikiExporter::FULL, WikiExporter::CURRENT,
* WikiExporter::RANGE or WikiExporter::STABLE, or an associative array:
* - offset: non-inclusive offset at which to start the query
* - limit: maximum number of rows to return
* - dir: "asc" or "desc" timestamp order
- * @param int $buffer One of WikiExporter::BUFFER or WikiExporter::STREAM
* @param int $text One of WikiExporter::TEXT or WikiExporter::STUB
*/
- function __construct( $db, $history = self::CURRENT,
- $buffer = self::BUFFER, $text = self::TEXT ) {
+ function __construct( $db, $history = self::CURRENT, $text = self::TEXT ) {
$this->db = $db;
$this->history = $history;
- $this->buffer = $buffer;
$this->writer = new XmlDumpWriter();
$this->sink = new DumpOutput();
$this->text = $text;
* @throws Exception
*/
protected function dumpFrom( $cond = '', $orderRevs = false ) {
- global $wgMultiContentRevisionSchemaMigrationStage;
-
- # For logging dumps...
if ( $this->history & self::LOGS ) {
- $where = [];
- # Hide private logs
- $hideLogs = LogEventsList::getExcludeClause( $this->db );
- if ( $hideLogs ) {
- $where[] = $hideLogs;
- }
- # Add on any caller specified conditions
- if ( $cond ) {
- $where[] = $cond;
- }
- # Get logging table name for logging.* clause
- $logging = $this->db->tableName( 'logging' );
+ $this->dumpLogs( $cond );
+ } else {
+ $this->dumpPages( $cond, $orderRevs );
+ }
+ }
- if ( $this->buffer == self::STREAM ) {
- $prev = $this->db->bufferResults( false );
+ /**
+ * @param string $cond
+ * @throws Exception
+ */
+ protected function dumpLogs( $cond ) {
+ $where = [];
+ # Hide private logs
+ $hideLogs = LogEventsList::getExcludeClause( $this->db );
+ if ( $hideLogs ) {
+ $where[] = $hideLogs;
+ }
+ # Add on any caller specified conditions
+ if ( $cond ) {
+ $where[] = $cond;
+ }
+ # Get logging table name for logging.* clause
+ $logging = $this->db->tableName( 'logging' );
+
+ $result = null; // Assuring $result is not undefined, if exception occurs early
+
+ $commentQuery = CommentStore::getStore()->getJoin( 'log_comment' );
+ $actorQuery = ActorMigration::newMigration()->getJoin( 'log_user' );
+
+ $lastLogId = 0;
+ while ( true ) {
+ $result = $this->db->select(
+ array_merge( [ 'logging' ], $commentQuery['tables'], $actorQuery['tables'], [ 'user' ] ),
+ [ "{$logging}.*", 'user_name' ] + $commentQuery['fields'] + $actorQuery['fields'],
+ array_merge( $where, [ 'log_id > ' . intval( $lastLogId ) ] ),
+ __METHOD__,
+ [
+ 'ORDER BY' => 'log_id',
+ 'USE INDEX' => [ 'logging' => 'PRIMARY' ],
+ 'LIMIT' => self::BATCH_SIZE,
+ ],
+ [
+ 'user' => [ 'JOIN', 'user_id = ' . $actorQuery['fields']['log_user'] ]
+ ] + $commentQuery['joins'] + $actorQuery['joins']
+ );
+
+ if ( !$result->numRows() ) {
+ break;
}
- $result = null; // Assuring $result is not undefined, if exception occurs early
-
- $commentQuery = CommentStore::getStore()->getJoin( 'log_comment' );
- $actorQuery = ActorMigration::newMigration()->getJoin( 'log_user' );
-
- try {
- $result = $this->db->select(
- array_merge( [ 'logging' ], $commentQuery['tables'], $actorQuery['tables'], [ 'user' ] ),
- [ "{$logging}.*", 'user_name' ] + $commentQuery['fields'] + $actorQuery['fields'],
- $where,
- __METHOD__,
- [ 'ORDER BY' => 'log_id', 'USE INDEX' => [ 'logging' => 'PRIMARY' ] ],
- [
- 'user' => [ 'JOIN', 'user_id = ' . $actorQuery['fields']['log_user'] ]
- ] + $commentQuery['joins'] + $actorQuery['joins']
- );
- $this->outputLogStream( $result );
- if ( $this->buffer == self::STREAM ) {
- $this->db->bufferResults( $prev );
- }
- } catch ( Exception $e ) {
- // Throwing the exception does not reliably free the resultset, and
- // would also leave the connection in unbuffered mode.
-
- // Freeing result
- try {
- if ( $result ) {
- $result->free();
- }
- } catch ( Exception $e2 ) {
- // Already in panic mode -> ignoring $e2 as $e has
- // higher priority
- }
- // Putting database back in previous buffer mode
- try {
- if ( $this->buffer == self::STREAM ) {
- $this->db->bufferResults( $prev );
- }
- } catch ( Exception $e2 ) {
- // Already in panic mode -> ignoring $e2 as $e has
- // higher priority
- }
+ $lastLogId = $this->outputLogStream( $result );
+ };
+ }
- // Inform caller about problem
- throw $e;
- }
- # For page dumps...
- } else {
- if ( !( $wgMultiContentRevisionSchemaMigrationStage & SCHEMA_COMPAT_WRITE_OLD ) ) {
- // TODO: Make XmlDumpWriter use a RevisionStore! (see T198706 and T174031)
- throw new MWException(
- 'Cannot use WikiExporter with SCHEMA_COMPAT_WRITE_OLD mode disabled!'
- . ' Support for dumping from the new schema is not implemented yet!'
- );
- }
+ /**
+ * @param string $cond
+ * @param bool $orderRevs
+ * @throws MWException
+ * @throws Exception
+ */
+ protected function dumpPages( $cond, $orderRevs ) {
+ global $wgMultiContentRevisionSchemaMigrationStage;
+ if ( !( $wgMultiContentRevisionSchemaMigrationStage & SCHEMA_COMPAT_WRITE_OLD ) ) {
+ // TODO: Make XmlDumpWriter use a RevisionStore! (see T198706 and T174031)
+ throw new MWException(
+ 'Cannot use WikiExporter with SCHEMA_COMPAT_WRITE_OLD mode disabled!'
+ . ' Support for dumping from the new schema is not implemented yet!'
+ );
+ }
- $revOpts = [ 'page' ];
- if ( $this->text != self::STUB ) {
- // TODO: remove the text and make XmlDumpWriter use a RevisionStore instead! (T198706)
- $revOpts[] = 'text';
- }
- $revQuery = Revision::getQueryInfo( $revOpts );
+ $revOpts = [ 'page' ];
+ if ( $this->text != self::STUB ) {
+ // TODO: remove the text and make XmlDumpWriter use a RevisionStore instead! (T198706)
+ $revOpts[] = 'text';
+ }
+ $revQuery = Revision::getQueryInfo( $revOpts );
- // We want page primary rather than revision
- $tables = array_merge( [ 'page' ], array_diff( $revQuery['tables'], [ 'page' ] ) );
- $join = $revQuery['joins'] + [
+ // We want page primary rather than revision
+ $tables = array_merge( [ 'page' ], array_diff( $revQuery['tables'], [ 'page' ] ) );
+ $join = $revQuery['joins'] + [
'revision' => $revQuery['joins']['page']
];
- unset( $join['page'] );
+ unset( $join['page'] );
- // TODO: remove rev_text_id and make XmlDumpWriter use a RevisionStore instead! (T198706)
- $fields = array_merge( $revQuery['fields'], [ 'page_restrictions, rev_text_id' ] );
+ // TODO: remove rev_text_id and make XmlDumpWriter use a RevisionStore instead! (T198706)
+ $fields = array_merge( $revQuery['fields'], [ 'page_restrictions, rev_text_id' ] );
- $conds = [];
- if ( $cond !== '' ) {
- $conds[] = $cond;
- }
- $opts = [ 'ORDER BY' => 'page_id ASC' ];
- $opts['USE INDEX'] = [];
- if ( is_array( $this->history ) ) {
- # Time offset/limit for all pages/history...
- # Set time order
- if ( $this->history['dir'] == 'asc' ) {
- $op = '>';
- $opts['ORDER BY'] = 'rev_timestamp ASC';
- } else {
- $op = '<';
- $opts['ORDER BY'] = 'rev_timestamp DESC';
- }
- # Set offset
- if ( !empty( $this->history['offset'] ) ) {
- $conds[] = "rev_timestamp $op " .
- $this->db->addQuotes( $this->db->timestamp( $this->history['offset'] ) );
- }
- # Set query limit
- if ( !empty( $this->history['limit'] ) ) {
- $opts['LIMIT'] = intval( $this->history['limit'] );
- }
- } elseif ( $this->history & self::FULL ) {
- # Full history dumps...
- # query optimization for history stub dumps
- if ( $this->text == self::STUB && $orderRevs ) {
- $tables = $revQuery['tables'];
- $opts['ORDER BY'] = [ 'rev_page ASC', 'rev_id ASC' ];
- $opts['USE INDEX']['revision'] = 'rev_page_id';
- unset( $join['revision'] );
- $join['page'] = [ 'INNER JOIN', 'rev_page=page_id' ];
- }
- } elseif ( $this->history & self::CURRENT ) {
- # Latest revision dumps...
- if ( $this->list_authors && $cond != '' ) { // List authors, if so desired
- $this->do_list_authors( $cond );
- }
- $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ];
- } elseif ( $this->history & self::STABLE ) {
- # "Stable" revision dumps...
- # Default JOIN, to be overridden...
- $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ];
- # One, and only one hook should set this, and return false
- if ( Hooks::run( 'WikiExporter::dumpStableQuery', [ &$tables, &$opts, &$join ] ) ) {
- throw new MWException( __METHOD__ . " given invalid history dump type." );
- }
- } elseif ( $this->history & self::RANGE ) {
- # Dump of revisions within a specified range
- $opts['ORDER BY'] = [ 'rev_page ASC', 'rev_id ASC' ];
+ $conds = [];
+ if ( $cond !== '' ) {
+ $conds[] = $cond;
+ }
+ $opts = [ 'ORDER BY' => [ 'rev_page ASC', 'rev_id ASC' ] ];
+ $opts['USE INDEX'] = [];
+
+ $op = '>';
+ if ( is_array( $this->history ) ) {
+ # Time offset/limit for all pages/history...
+ # Set time order
+ if ( $this->history['dir'] == 'asc' ) {
+ $opts['ORDER BY'] = 'rev_timestamp ASC';
} else {
- # Unknown history specification parameter?
+ $op = '<';
+ $opts['ORDER BY'] = 'rev_timestamp DESC';
+ }
+ # Set offset
+ if ( !empty( $this->history['offset'] ) ) {
+ $conds[] = "rev_timestamp $op " .
+ $this->db->addQuotes( $this->db->timestamp( $this->history['offset'] ) );
+ }
+ # Set query limit
+ if ( !empty( $this->history['limit'] ) ) {
+ $maxRowCount = intval( $this->history['limit'] );
+ }
+ } elseif ( $this->history & self::FULL ) {
+ # Full history dumps...
+ # query optimization for history stub dumps
+ if ( $this->text == self::STUB && $orderRevs ) {
+ $tables = $revQuery['tables'];
+ $opts['USE INDEX']['revision'] = 'rev_page_id';
+ unset( $join['revision'] );
+ $join['page'] = [ 'INNER JOIN', 'rev_page=page_id' ];
+ }
+ } elseif ( $this->history & self::CURRENT ) {
+ # Latest revision dumps...
+ if ( $this->list_authors && $cond != '' ) { // List authors, if so desired
+ $this->do_list_authors( $cond );
+ }
+ $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ];
+ } elseif ( $this->history & self::STABLE ) {
+ # "Stable" revision dumps...
+ # Default JOIN, to be overridden...
+ $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ];
+ # One, and only one hook should set this, and return false
+ if ( Hooks::run( 'WikiExporter::dumpStableQuery', [ &$tables, &$opts, &$join ] ) ) {
throw new MWException( __METHOD__ . " given invalid history dump type." );
}
+ } elseif ( $this->history & self::RANGE ) {
+ # Dump of revisions within a specified range. Condition already set in revsByRange().
+ } else {
+ # Unknown history specification parameter?
+ throw new MWException( __METHOD__ . " given invalid history dump type." );
+ }
- if ( $this->buffer == self::STREAM ) {
- $prev = $this->db->bufferResults( false );
- }
- $result = null; // Assuring $result is not undefined, if exception occurs early
- try {
- Hooks::run( 'ModifyExportQuery',
- [ $this->db, &$tables, &$cond, &$opts, &$join ] );
-
- # Do the query!
- $result = $this->db->select(
- $tables,
- $fields,
- $conds,
- __METHOD__,
- $opts,
- $join
- );
- # Output dump results
- $this->outputPageStream( $result );
-
- if ( $this->buffer == self::STREAM ) {
- $this->db->bufferResults( $prev );
- }
- } catch ( Exception $e ) {
- // Throwing the exception does not reliably free the resultset, and
- // would also leave the connection in unbuffered mode.
-
- // Freeing result
- try {
- if ( $result ) {
- $result->free();
- }
- } catch ( Exception $e2 ) {
- // Already in panic mode -> ignoring $e2 as $e has
- // higher priority
- }
+ $result = null; // Assuring $result is not undefined, if exception occurs early
+ $done = false;
+ $lastRow = null;
+ $revPage = 0;
+ $revId = 0;
+ $rowCount = 0;
- // Putting database back in previous buffer mode
- try {
- if ( $this->buffer == self::STREAM ) {
- $this->db->bufferResults( $prev );
- }
- } catch ( Exception $e2 ) {
- // Already in panic mode -> ignoring $e2 as $e has
- // higher priority
- }
+ $opts['LIMIT'] = self::BATCH_SIZE;
- // Inform caller about problem
- throw $e;
+ Hooks::run( 'ModifyExportQuery',
+ [ $this->db, &$tables, &$cond, &$opts, &$join ] );
+
+ while ( !$done ) {
+ // If necessary, impose the overall maximum and stop looping after this iteration.
+ if ( !empty( $maxRowCount ) && $rowCount + self::BATCH_SIZE > $maxRowCount ) {
+ $opts['LIMIT'] = $maxRowCount - $rowCount;
+ $done = true;
+ }
+
+ $queryConds = $conds;
+ $queryConds[] = 'rev_page>' . intval( $revPage ) . ' OR (rev_page=' .
+ intval( $revPage ) . ' AND rev_id' . $op . intval( $revId ) . ')';
+
+ # Do the query!
+ $result = $this->db->select(
+ $tables,
+ $fields,
+ $queryConds,
+ __METHOD__,
+ $opts,
+ $join
+ );
+ # Output dump results, get new max ids.
+ $lastRow = $this->outputPageStream( $result, $lastRow );
+
+ if ( !$result->numRows() || !$lastRow ) {
+ $done = true;
+ } else {
+ $rowCount += $result->numRows();
+ $revPage = $lastRow->rev_page;
+ $revId = $lastRow->rev_id;
}
}
}
* The result set should be sorted/grouped by page to avoid duplicate
* page records in the output.
*
- * Should be safe for
- * streaming (non-buffered) queries, as long as it was made on a
- * separate database connection not managed by LoadBalancer; some
- * blob storage types will make queries to pull source data.
- *
* @param ResultWrapper $resultset
+ * @param object $lastRow the last row output from the previous call (or null if none)
+ * @return object the last row processed
*/
- protected function outputPageStream( $resultset ) {
- $last = null;
- foreach ( $resultset as $row ) {
- if ( $last === null ||
- $last->page_namespace != $row->page_namespace ||
- $last->page_title != $row->page_title ) {
- if ( $last !== null ) {
- $output = '';
- if ( $this->dumpUploads ) {
- $output .= $this->writer->writeUploads( $last, $this->dumpUploadFileContents );
+ protected function outputPageStream( $resultset, $lastRow ) {
+ if ( $resultset->numRows() ) {
+ foreach ( $resultset as $row ) {
+ if ( $lastRow === null ||
+ $lastRow->page_namespace != $row->page_namespace ||
+ $lastRow->page_title != $row->page_title ) {
+ if ( $lastRow !== null ) {
+ $output = '';
+ if ( $this->dumpUploads ) {
+ $output .= $this->writer->writeUploads( $lastRow, $this->dumpUploadFileContents );
+ }
+ $output .= $this->writer->closePage();
+ $this->sink->writeClosePage( $output );
}
- $output .= $this->writer->closePage();
- $this->sink->writeClosePage( $output );
+ $output = $this->writer->openPage( $row );
+ $this->sink->writeOpenPage( $row, $output );
}
- $output = $this->writer->openPage( $row );
- $this->sink->writeOpenPage( $row, $output );
- $last = $row;
+ $output = $this->writer->writeRevision( $row );
+ $this->sink->writeRevision( $row, $output );
+ $lastRow = $row;
}
- $output = $this->writer->writeRevision( $row );
- $this->sink->writeRevision( $row, $output );
- }
- if ( $last !== null ) {
+ } elseif ( $lastRow !== null ) {
+ // Empty resultset means done with all batches Close off final page element (if any).
$output = '';
if ( $this->dumpUploads ) {
- $output .= $this->writer->writeUploads( $last, $this->dumpUploadFileContents );
+ $output .= $this->writer->writeUploads( $lastRow, $this->dumpUploadFileContents );
}
$output .= $this->author_list;
$output .= $this->writer->closePage();
$this->sink->writeClosePage( $output );
+ $lastRow = null;
}
+
+ return $lastRow;
}
/**
* @param ResultWrapper $resultset
+ * @return int the log_id value of the last item output, or null if none
*/
protected function outputLogStream( $resultset ) {
foreach ( $resultset as $row ) {
$output = $this->writer->writeLogItem( $row );
$this->sink->writeLogItem( $row, $output );
}
+ return isset( $row ) ? $row->log_id : null;
}
}