From 085b6e4787c3b204c4121ce18c58803c4f5efcff Mon Sep 17 00:00:00 2001 From: Bill Pirkle Date: Tue, 11 Sep 2018 17:51:01 -0500 Subject: [PATCH] Replace WikiExporter streaming (unbuffered) mode with batched queries WikiExporter allows streaming mode, using unbuffered mode on the database connection. We are moving away from this technique. Instead, do multiple normal queries and retrieve the information in batches. Bug: T203424 Change-Id: I582240b67c91a8be993a68c23831c9d86617350e --- includes/export/WikiExporter.php | 421 ++++++++++++-------------- includes/specials/SpecialExport.php | 20 +- maintenance/includes/BackupDumper.php | 2 +- 3 files changed, 202 insertions(+), 241 deletions(-) diff --git a/includes/export/WikiExporter.php b/includes/export/WikiExporter.php index b0185843e2..1f2b81d3a8 100644 --- a/includes/export/WikiExporter.php +++ b/includes/export/WikiExporter.php @@ -52,14 +52,10 @@ class WikiExporter { const LOGS = 8; const RANGE = 16; - const BUFFER = 0; - const STREAM = 1; - const TEXT = 0; const STUB = 1; - /** @var int */ - public $buffer; + const BATCH_SIZE = 1000; /** @var int */ public $text; @@ -76,26 +72,17 @@ class WikiExporter { } /** - * If using WikiExporter::STREAM to stream a large amount of data, - * provide a database connection which is not managed by - * LoadBalancer to read from: some history blob types will - * make additional queries to pull source data while the - * main query is still running. - * * @param IDatabase $db * @param int|array $history One of WikiExporter::FULL, WikiExporter::CURRENT, * WikiExporter::RANGE or WikiExporter::STABLE, or an associative array: * - offset: non-inclusive offset at which to start the query * - limit: maximum number of rows to return * - dir: "asc" or "desc" timestamp order - * @param int $buffer One of WikiExporter::BUFFER or WikiExporter::STREAM * @param int $text One of WikiExporter::TEXT or WikiExporter::STUB */ - function __construct( $db, $history = self::CURRENT, - $buffer = self::BUFFER, $text = self::TEXT ) { + function __construct( $db, $history = self::CURRENT, $text = self::TEXT ) { $this->db = $db; $this->history = $history; - $this->buffer = $buffer; $this->writer = new XmlDumpWriter(); $this->sink = new DumpOutput(); $this->text = $text; @@ -263,206 +250,191 @@ class WikiExporter { * @throws Exception */ protected function dumpFrom( $cond = '', $orderRevs = false ) { - global $wgMultiContentRevisionSchemaMigrationStage; - - # For logging dumps... if ( $this->history & self::LOGS ) { - $where = []; - # Hide private logs - $hideLogs = LogEventsList::getExcludeClause( $this->db ); - if ( $hideLogs ) { - $where[] = $hideLogs; - } - # Add on any caller specified conditions - if ( $cond ) { - $where[] = $cond; - } - # Get logging table name for logging.* clause - $logging = $this->db->tableName( 'logging' ); + $this->dumpLogs( $cond ); + } else { + $this->dumpPages( $cond, $orderRevs ); + } + } - if ( $this->buffer == self::STREAM ) { - $prev = $this->db->bufferResults( false ); + /** + * @param string $cond + * @throws Exception + */ + protected function dumpLogs( $cond ) { + $where = []; + # Hide private logs + $hideLogs = LogEventsList::getExcludeClause( $this->db ); + if ( $hideLogs ) { + $where[] = $hideLogs; + } + # Add on any caller specified conditions + if ( $cond ) { + $where[] = $cond; + } + # Get logging table name for logging.* clause + $logging = $this->db->tableName( 'logging' ); + + $result = null; // Assuring $result is not undefined, if exception occurs early + + $commentQuery = CommentStore::getStore()->getJoin( 'log_comment' ); + $actorQuery = ActorMigration::newMigration()->getJoin( 'log_user' ); + + $lastLogId = 0; + while ( true ) { + $result = $this->db->select( + array_merge( [ 'logging' ], $commentQuery['tables'], $actorQuery['tables'], [ 'user' ] ), + [ "{$logging}.*", 'user_name' ] + $commentQuery['fields'] + $actorQuery['fields'], + array_merge( $where, [ 'log_id > ' . intval( $lastLogId ) ] ), + __METHOD__, + [ + 'ORDER BY' => 'log_id', + 'USE INDEX' => [ 'logging' => 'PRIMARY' ], + 'LIMIT' => self::BATCH_SIZE, + ], + [ + 'user' => [ 'JOIN', 'user_id = ' . $actorQuery['fields']['log_user'] ] + ] + $commentQuery['joins'] + $actorQuery['joins'] + ); + + if ( !$result->numRows() ) { + break; } - $result = null; // Assuring $result is not undefined, if exception occurs early - - $commentQuery = CommentStore::getStore()->getJoin( 'log_comment' ); - $actorQuery = ActorMigration::newMigration()->getJoin( 'log_user' ); - - try { - $result = $this->db->select( - array_merge( [ 'logging' ], $commentQuery['tables'], $actorQuery['tables'], [ 'user' ] ), - [ "{$logging}.*", 'user_name' ] + $commentQuery['fields'] + $actorQuery['fields'], - $where, - __METHOD__, - [ 'ORDER BY' => 'log_id', 'USE INDEX' => [ 'logging' => 'PRIMARY' ] ], - [ - 'user' => [ 'JOIN', 'user_id = ' . $actorQuery['fields']['log_user'] ] - ] + $commentQuery['joins'] + $actorQuery['joins'] - ); - $this->outputLogStream( $result ); - if ( $this->buffer == self::STREAM ) { - $this->db->bufferResults( $prev ); - } - } catch ( Exception $e ) { - // Throwing the exception does not reliably free the resultset, and - // would also leave the connection in unbuffered mode. - - // Freeing result - try { - if ( $result ) { - $result->free(); - } - } catch ( Exception $e2 ) { - // Already in panic mode -> ignoring $e2 as $e has - // higher priority - } - // Putting database back in previous buffer mode - try { - if ( $this->buffer == self::STREAM ) { - $this->db->bufferResults( $prev ); - } - } catch ( Exception $e2 ) { - // Already in panic mode -> ignoring $e2 as $e has - // higher priority - } + $lastLogId = $this->outputLogStream( $result ); + }; + } - // Inform caller about problem - throw $e; - } - # For page dumps... - } else { - if ( !( $wgMultiContentRevisionSchemaMigrationStage & SCHEMA_COMPAT_WRITE_OLD ) ) { - // TODO: Make XmlDumpWriter use a RevisionStore! (see T198706 and T174031) - throw new MWException( - 'Cannot use WikiExporter with SCHEMA_COMPAT_WRITE_OLD mode disabled!' - . ' Support for dumping from the new schema is not implemented yet!' - ); - } + /** + * @param string $cond + * @param bool $orderRevs + * @throws MWException + * @throws Exception + */ + protected function dumpPages( $cond, $orderRevs ) { + global $wgMultiContentRevisionSchemaMigrationStage; + if ( !( $wgMultiContentRevisionSchemaMigrationStage & SCHEMA_COMPAT_WRITE_OLD ) ) { + // TODO: Make XmlDumpWriter use a RevisionStore! (see T198706 and T174031) + throw new MWException( + 'Cannot use WikiExporter with SCHEMA_COMPAT_WRITE_OLD mode disabled!' + . ' Support for dumping from the new schema is not implemented yet!' + ); + } - $revOpts = [ 'page' ]; - if ( $this->text != self::STUB ) { - // TODO: remove the text and make XmlDumpWriter use a RevisionStore instead! (T198706) - $revOpts[] = 'text'; - } - $revQuery = Revision::getQueryInfo( $revOpts ); + $revOpts = [ 'page' ]; + if ( $this->text != self::STUB ) { + // TODO: remove the text and make XmlDumpWriter use a RevisionStore instead! (T198706) + $revOpts[] = 'text'; + } + $revQuery = Revision::getQueryInfo( $revOpts ); - // We want page primary rather than revision - $tables = array_merge( [ 'page' ], array_diff( $revQuery['tables'], [ 'page' ] ) ); - $join = $revQuery['joins'] + [ + // We want page primary rather than revision + $tables = array_merge( [ 'page' ], array_diff( $revQuery['tables'], [ 'page' ] ) ); + $join = $revQuery['joins'] + [ 'revision' => $revQuery['joins']['page'] ]; - unset( $join['page'] ); + unset( $join['page'] ); - // TODO: remove rev_text_id and make XmlDumpWriter use a RevisionStore instead! (T198706) - $fields = array_merge( $revQuery['fields'], [ 'page_restrictions, rev_text_id' ] ); + // TODO: remove rev_text_id and make XmlDumpWriter use a RevisionStore instead! (T198706) + $fields = array_merge( $revQuery['fields'], [ 'page_restrictions, rev_text_id' ] ); - $conds = []; - if ( $cond !== '' ) { - $conds[] = $cond; - } - $opts = [ 'ORDER BY' => 'page_id ASC' ]; - $opts['USE INDEX'] = []; - if ( is_array( $this->history ) ) { - # Time offset/limit for all pages/history... - # Set time order - if ( $this->history['dir'] == 'asc' ) { - $op = '>'; - $opts['ORDER BY'] = 'rev_timestamp ASC'; - } else { - $op = '<'; - $opts['ORDER BY'] = 'rev_timestamp DESC'; - } - # Set offset - if ( !empty( $this->history['offset'] ) ) { - $conds[] = "rev_timestamp $op " . - $this->db->addQuotes( $this->db->timestamp( $this->history['offset'] ) ); - } - # Set query limit - if ( !empty( $this->history['limit'] ) ) { - $opts['LIMIT'] = intval( $this->history['limit'] ); - } - } elseif ( $this->history & self::FULL ) { - # Full history dumps... - # query optimization for history stub dumps - if ( $this->text == self::STUB && $orderRevs ) { - $tables = $revQuery['tables']; - $opts['ORDER BY'] = [ 'rev_page ASC', 'rev_id ASC' ]; - $opts['USE INDEX']['revision'] = 'rev_page_id'; - unset( $join['revision'] ); - $join['page'] = [ 'INNER JOIN', 'rev_page=page_id' ]; - } - } elseif ( $this->history & self::CURRENT ) { - # Latest revision dumps... - if ( $this->list_authors && $cond != '' ) { // List authors, if so desired - $this->do_list_authors( $cond ); - } - $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ]; - } elseif ( $this->history & self::STABLE ) { - # "Stable" revision dumps... - # Default JOIN, to be overridden... - $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ]; - # One, and only one hook should set this, and return false - if ( Hooks::run( 'WikiExporter::dumpStableQuery', [ &$tables, &$opts, &$join ] ) ) { - throw new MWException( __METHOD__ . " given invalid history dump type." ); - } - } elseif ( $this->history & self::RANGE ) { - # Dump of revisions within a specified range - $opts['ORDER BY'] = [ 'rev_page ASC', 'rev_id ASC' ]; + $conds = []; + if ( $cond !== '' ) { + $conds[] = $cond; + } + $opts = [ 'ORDER BY' => [ 'rev_page ASC', 'rev_id ASC' ] ]; + $opts['USE INDEX'] = []; + + $op = '>'; + if ( is_array( $this->history ) ) { + # Time offset/limit for all pages/history... + # Set time order + if ( $this->history['dir'] == 'asc' ) { + $opts['ORDER BY'] = 'rev_timestamp ASC'; } else { - # Unknown history specification parameter? + $op = '<'; + $opts['ORDER BY'] = 'rev_timestamp DESC'; + } + # Set offset + if ( !empty( $this->history['offset'] ) ) { + $conds[] = "rev_timestamp $op " . + $this->db->addQuotes( $this->db->timestamp( $this->history['offset'] ) ); + } + # Set query limit + if ( !empty( $this->history['limit'] ) ) { + $maxRowCount = intval( $this->history['limit'] ); + } + } elseif ( $this->history & self::FULL ) { + # Full history dumps... + # query optimization for history stub dumps + if ( $this->text == self::STUB && $orderRevs ) { + $tables = $revQuery['tables']; + $opts['USE INDEX']['revision'] = 'rev_page_id'; + unset( $join['revision'] ); + $join['page'] = [ 'INNER JOIN', 'rev_page=page_id' ]; + } + } elseif ( $this->history & self::CURRENT ) { + # Latest revision dumps... + if ( $this->list_authors && $cond != '' ) { // List authors, if so desired + $this->do_list_authors( $cond ); + } + $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ]; + } elseif ( $this->history & self::STABLE ) { + # "Stable" revision dumps... + # Default JOIN, to be overridden... + $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ]; + # One, and only one hook should set this, and return false + if ( Hooks::run( 'WikiExporter::dumpStableQuery', [ &$tables, &$opts, &$join ] ) ) { throw new MWException( __METHOD__ . " given invalid history dump type." ); } + } elseif ( $this->history & self::RANGE ) { + # Dump of revisions within a specified range. Condition already set in revsByRange(). + } else { + # Unknown history specification parameter? + throw new MWException( __METHOD__ . " given invalid history dump type." ); + } - if ( $this->buffer == self::STREAM ) { - $prev = $this->db->bufferResults( false ); - } - $result = null; // Assuring $result is not undefined, if exception occurs early - try { - Hooks::run( 'ModifyExportQuery', - [ $this->db, &$tables, &$cond, &$opts, &$join ] ); - - # Do the query! - $result = $this->db->select( - $tables, - $fields, - $conds, - __METHOD__, - $opts, - $join - ); - # Output dump results - $this->outputPageStream( $result ); - - if ( $this->buffer == self::STREAM ) { - $this->db->bufferResults( $prev ); - } - } catch ( Exception $e ) { - // Throwing the exception does not reliably free the resultset, and - // would also leave the connection in unbuffered mode. - - // Freeing result - try { - if ( $result ) { - $result->free(); - } - } catch ( Exception $e2 ) { - // Already in panic mode -> ignoring $e2 as $e has - // higher priority - } + $result = null; // Assuring $result is not undefined, if exception occurs early + $done = false; + $lastRow = null; + $revPage = 0; + $revId = 0; + $rowCount = 0; - // Putting database back in previous buffer mode - try { - if ( $this->buffer == self::STREAM ) { - $this->db->bufferResults( $prev ); - } - } catch ( Exception $e2 ) { - // Already in panic mode -> ignoring $e2 as $e has - // higher priority - } + $opts['LIMIT'] = self::BATCH_SIZE; - // Inform caller about problem - throw $e; + Hooks::run( 'ModifyExportQuery', + [ $this->db, &$tables, &$cond, &$opts, &$join ] ); + + while ( !$done ) { + // If necessary, impose the overall maximum and stop looping after this iteration. + if ( !empty( $maxRowCount ) && $rowCount + self::BATCH_SIZE > $maxRowCount ) { + $opts['LIMIT'] = $maxRowCount - $rowCount; + $done = true; + } + + $queryConds = $conds; + $queryConds[] = 'rev_page>' . intval( $revPage ) . ' OR (rev_page=' . + intval( $revPage ) . ' AND rev_id' . $op . intval( $revId ) . ')'; + + # Do the query! + $result = $this->db->select( + $tables, + $fields, + $queryConds, + __METHOD__, + $opts, + $join + ); + # Output dump results, get new max ids. + $lastRow = $this->outputPageStream( $result, $lastRow ); + + if ( !$result->numRows() || !$lastRow ) { + $done = true; + } else { + $rowCount += $result->numRows(); + $revPage = $lastRow->rev_page; + $revId = $lastRow->rev_id; } } } @@ -472,52 +444,55 @@ class WikiExporter { * The result set should be sorted/grouped by page to avoid duplicate * page records in the output. * - * Should be safe for - * streaming (non-buffered) queries, as long as it was made on a - * separate database connection not managed by LoadBalancer; some - * blob storage types will make queries to pull source data. - * * @param ResultWrapper $resultset + * @param object $lastRow the last row output from the previous call (or null if none) + * @return object the last row processed */ - protected function outputPageStream( $resultset ) { - $last = null; - foreach ( $resultset as $row ) { - if ( $last === null || - $last->page_namespace != $row->page_namespace || - $last->page_title != $row->page_title ) { - if ( $last !== null ) { - $output = ''; - if ( $this->dumpUploads ) { - $output .= $this->writer->writeUploads( $last, $this->dumpUploadFileContents ); + protected function outputPageStream( $resultset, $lastRow ) { + if ( $resultset->numRows() ) { + foreach ( $resultset as $row ) { + if ( $lastRow === null || + $lastRow->page_namespace != $row->page_namespace || + $lastRow->page_title != $row->page_title ) { + if ( $lastRow !== null ) { + $output = ''; + if ( $this->dumpUploads ) { + $output .= $this->writer->writeUploads( $lastRow, $this->dumpUploadFileContents ); + } + $output .= $this->writer->closePage(); + $this->sink->writeClosePage( $output ); } - $output .= $this->writer->closePage(); - $this->sink->writeClosePage( $output ); + $output = $this->writer->openPage( $row ); + $this->sink->writeOpenPage( $row, $output ); } - $output = $this->writer->openPage( $row ); - $this->sink->writeOpenPage( $row, $output ); - $last = $row; + $output = $this->writer->writeRevision( $row ); + $this->sink->writeRevision( $row, $output ); + $lastRow = $row; } - $output = $this->writer->writeRevision( $row ); - $this->sink->writeRevision( $row, $output ); - } - if ( $last !== null ) { + } elseif ( $lastRow !== null ) { + // Empty resultset means done with all batches Close off final page element (if any). $output = ''; if ( $this->dumpUploads ) { - $output .= $this->writer->writeUploads( $last, $this->dumpUploadFileContents ); + $output .= $this->writer->writeUploads( $lastRow, $this->dumpUploadFileContents ); } $output .= $this->author_list; $output .= $this->writer->closePage(); $this->sink->writeClosePage( $output ); + $lastRow = null; } + + return $lastRow; } /** * @param ResultWrapper $resultset + * @return int the log_id value of the last item output, or null if none */ protected function outputLogStream( $resultset ) { foreach ( $resultset as $row ) { $output = $this->writer->writeLogItem( $row ); $this->sink->writeLogItem( $row, $output ); } + return isset( $row ) ? $row->log_id : null; } } diff --git a/includes/specials/SpecialExport.php b/includes/specials/SpecialExport.php index 3a7e9cdf9d..513e7a96b8 100644 --- a/includes/specials/SpecialExport.php +++ b/includes/specials/SpecialExport.php @@ -23,7 +23,6 @@ * @ingroup SpecialPage */ -use MediaWiki\MediaWikiServices; use MediaWiki\Logger\LoggerFactory; /** @@ -379,23 +378,10 @@ class SpecialExport extends SpecialPage { } /* Ok, let's get to it... */ - if ( $history == WikiExporter::CURRENT ) { - $lb = false; - $db = wfGetDB( DB_REPLICA ); - $buffer = WikiExporter::BUFFER; - } else { - // Use an unbuffered query; histories may be very long! - $lb = MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->newMainLB(); - $db = $lb->getConnection( DB_REPLICA ); - $buffer = WikiExporter::STREAM; - - // This might take a while... :D - Wikimedia\suppressWarnings(); - set_time_limit( 0 ); - Wikimedia\restoreWarnings(); - } + $lb = false; + $db = wfGetDB( DB_REPLICA ); - $exporter = new WikiExporter( $db, $history, $buffer ); + $exporter = new WikiExporter( $db, $history ); $exporter->list_authors = $list_authors; $exporter->openStream(); diff --git a/maintenance/includes/BackupDumper.php b/maintenance/includes/BackupDumper.php index e8993e4200..4c2b64c903 100644 --- a/maintenance/includes/BackupDumper.php +++ b/maintenance/includes/BackupDumper.php @@ -257,7 +257,7 @@ abstract class BackupDumper extends Maintenance { $this->initProgress( $history ); $db = $this->backupDb(); - $exporter = new WikiExporter( $db, $history, WikiExporter::STREAM, $text ); + $exporter = new WikiExporter( $db, $history, $text ); $exporter->dumpUploads = $this->dumpUploads; $exporter->dumpUploadFileContents = $this->dumpUploadFileContents; -- 2.20.1