Replace WikiExporter streaming (unbuffered) mode with batched queries
authorBill Pirkle <bpirkle@wikimedia.org>
Tue, 11 Sep 2018 22:51:01 +0000 (17:51 -0500)
committerBill Pirkle <bpirkle@wikimedia.org>
Fri, 28 Sep 2018 15:55:05 +0000 (10:55 -0500)
WikiExporter allows streaming mode, using unbuffered mode on
the database connection. We are moving away from this technique.
Instead, do multiple normal queries and retrieve the
information in batches.

Bug: T203424
Change-Id: I582240b67c91a8be993a68c23831c9d86617350e

includes/export/WikiExporter.php
includes/specials/SpecialExport.php
maintenance/includes/BackupDumper.php

index b018584..1f2b81d 100644 (file)
@@ -52,14 +52,10 @@ class WikiExporter {
        const LOGS = 8;
        const RANGE = 16;
 
-       const BUFFER = 0;
-       const STREAM = 1;
-
        const TEXT = 0;
        const STUB = 1;
 
-       /** @var int */
-       public $buffer;
+       const BATCH_SIZE = 1000;
 
        /** @var int */
        public $text;
@@ -76,26 +72,17 @@ class WikiExporter {
        }
 
        /**
-        * If using WikiExporter::STREAM to stream a large amount of data,
-        * provide a database connection which is not managed by
-        * LoadBalancer to read from: some history blob types will
-        * make additional queries to pull source data while the
-        * main query is still running.
-        *
         * @param IDatabase $db
         * @param int|array $history One of WikiExporter::FULL, WikiExporter::CURRENT,
         *   WikiExporter::RANGE or WikiExporter::STABLE, or an associative array:
         *   - offset: non-inclusive offset at which to start the query
         *   - limit: maximum number of rows to return
         *   - dir: "asc" or "desc" timestamp order
-        * @param int $buffer One of WikiExporter::BUFFER or WikiExporter::STREAM
         * @param int $text One of WikiExporter::TEXT or WikiExporter::STUB
         */
-       function __construct( $db, $history = self::CURRENT,
-                       $buffer = self::BUFFER, $text = self::TEXT ) {
+       function __construct( $db, $history = self::CURRENT, $text = self::TEXT ) {
                $this->db = $db;
                $this->history = $history;
-               $this->buffer = $buffer;
                $this->writer = new XmlDumpWriter();
                $this->sink = new DumpOutput();
                $this->text = $text;
@@ -263,206 +250,191 @@ class WikiExporter {
         * @throws Exception
         */
        protected function dumpFrom( $cond = '', $orderRevs = false ) {
-               global $wgMultiContentRevisionSchemaMigrationStage;
-
-               # For logging dumps...
                if ( $this->history & self::LOGS ) {
-                       $where = [];
-                       # Hide private logs
-                       $hideLogs = LogEventsList::getExcludeClause( $this->db );
-                       if ( $hideLogs ) {
-                               $where[] = $hideLogs;
-                       }
-                       # Add on any caller specified conditions
-                       if ( $cond ) {
-                               $where[] = $cond;
-                       }
-                       # Get logging table name for logging.* clause
-                       $logging = $this->db->tableName( 'logging' );
+                       $this->dumpLogs( $cond );
+               } else {
+                       $this->dumpPages( $cond, $orderRevs );
+               }
+       }
 
-                       if ( $this->buffer == self::STREAM ) {
-                               $prev = $this->db->bufferResults( false );
+       /**
+        * @param string $cond
+        * @throws Exception
+        */
+       protected function dumpLogs( $cond ) {
+               $where = [];
+               # Hide private logs
+               $hideLogs = LogEventsList::getExcludeClause( $this->db );
+               if ( $hideLogs ) {
+                       $where[] = $hideLogs;
+               }
+               # Add on any caller specified conditions
+               if ( $cond ) {
+                       $where[] = $cond;
+               }
+               # Get logging table name for logging.* clause
+               $logging = $this->db->tableName( 'logging' );
+
+               $result = null; // Assuring $result is not undefined, if exception occurs early
+
+               $commentQuery = CommentStore::getStore()->getJoin( 'log_comment' );
+               $actorQuery = ActorMigration::newMigration()->getJoin( 'log_user' );
+
+               $lastLogId = 0;
+               while ( true ) {
+                       $result = $this->db->select(
+                               array_merge( [ 'logging' ], $commentQuery['tables'], $actorQuery['tables'], [ 'user' ] ),
+                               [ "{$logging}.*", 'user_name' ] + $commentQuery['fields'] + $actorQuery['fields'],
+                               array_merge( $where, [ 'log_id > ' . intval( $lastLogId ) ] ),
+                               __METHOD__,
+                               [
+                                       'ORDER BY' => 'log_id',
+                                       'USE INDEX' => [ 'logging' => 'PRIMARY' ],
+                                       'LIMIT' => self::BATCH_SIZE,
+                               ],
+                               [
+                                       'user' => [ 'JOIN', 'user_id = ' . $actorQuery['fields']['log_user'] ]
+                               ] + $commentQuery['joins'] + $actorQuery['joins']
+                       );
+
+                       if ( !$result->numRows() ) {
+                               break;
                        }
-                       $result = null; // Assuring $result is not undefined, if exception occurs early
-
-                       $commentQuery = CommentStore::getStore()->getJoin( 'log_comment' );
-                       $actorQuery = ActorMigration::newMigration()->getJoin( 'log_user' );
-
-                       try {
-                               $result = $this->db->select(
-                                       array_merge( [ 'logging' ], $commentQuery['tables'], $actorQuery['tables'], [ 'user' ] ),
-                                       [ "{$logging}.*", 'user_name' ] + $commentQuery['fields'] + $actorQuery['fields'],
-                                       $where,
-                                       __METHOD__,
-                                       [ 'ORDER BY' => 'log_id', 'USE INDEX' => [ 'logging' => 'PRIMARY' ] ],
-                                       [
-                                               'user' => [ 'JOIN', 'user_id = ' . $actorQuery['fields']['log_user'] ]
-                                       ] + $commentQuery['joins'] + $actorQuery['joins']
-                               );
-                               $this->outputLogStream( $result );
-                               if ( $this->buffer == self::STREAM ) {
-                                       $this->db->bufferResults( $prev );
-                               }
-                       } catch ( Exception $e ) {
-                               // Throwing the exception does not reliably free the resultset, and
-                               // would also leave the connection in unbuffered mode.
-
-                               // Freeing result
-                               try {
-                                       if ( $result ) {
-                                               $result->free();
-                                       }
-                               } catch ( Exception $e2 ) {
-                                       // Already in panic mode -> ignoring $e2 as $e has
-                                       // higher priority
-                               }
 
-                               // Putting database back in previous buffer mode
-                               try {
-                                       if ( $this->buffer == self::STREAM ) {
-                                               $this->db->bufferResults( $prev );
-                                       }
-                               } catch ( Exception $e2 ) {
-                                       // Already in panic mode -> ignoring $e2 as $e has
-                                       // higher priority
-                               }
+                       $lastLogId = $this->outputLogStream( $result );
+               };
+       }
 
-                               // Inform caller about problem
-                               throw $e;
-                       }
-               # For page dumps...
-               } else {
-                       if ( !( $wgMultiContentRevisionSchemaMigrationStage & SCHEMA_COMPAT_WRITE_OLD ) ) {
-                               // TODO: Make XmlDumpWriter use a RevisionStore! (see T198706 and T174031)
-                               throw new MWException(
-                                       'Cannot use WikiExporter with SCHEMA_COMPAT_WRITE_OLD mode disabled!'
-                                       . ' Support for dumping from the new schema is not implemented yet!'
-                               );
-                       }
+       /**
+        * @param string $cond
+        * @param bool $orderRevs
+        * @throws MWException
+        * @throws Exception
+        */
+       protected function dumpPages( $cond, $orderRevs ) {
+               global $wgMultiContentRevisionSchemaMigrationStage;
+               if ( !( $wgMultiContentRevisionSchemaMigrationStage & SCHEMA_COMPAT_WRITE_OLD ) ) {
+                       // TODO: Make XmlDumpWriter use a RevisionStore! (see T198706 and T174031)
+                       throw new MWException(
+                               'Cannot use WikiExporter with SCHEMA_COMPAT_WRITE_OLD mode disabled!'
+                               . ' Support for dumping from the new schema is not implemented yet!'
+                       );
+               }
 
-                       $revOpts = [ 'page' ];
-                       if ( $this->text != self::STUB ) {
-                               // TODO: remove the text and make XmlDumpWriter use a RevisionStore instead! (T198706)
-                               $revOpts[] = 'text';
-                       }
-                       $revQuery = Revision::getQueryInfo( $revOpts );
+               $revOpts = [ 'page' ];
+               if ( $this->text != self::STUB ) {
+                       // TODO: remove the text and make XmlDumpWriter use a RevisionStore instead! (T198706)
+                       $revOpts[] = 'text';
+               }
+               $revQuery = Revision::getQueryInfo( $revOpts );
 
-                       // We want page primary rather than revision
-                       $tables = array_merge( [ 'page' ], array_diff( $revQuery['tables'], [ 'page' ] ) );
-                       $join = $revQuery['joins'] + [
+               // We want page primary rather than revision
+               $tables = array_merge( [ 'page' ], array_diff( $revQuery['tables'], [ 'page' ] ) );
+               $join = $revQuery['joins'] + [
                                'revision' => $revQuery['joins']['page']
                        ];
-                       unset( $join['page'] );
+               unset( $join['page'] );
 
-                       // TODO: remove rev_text_id and make XmlDumpWriter use a RevisionStore instead! (T198706)
-                       $fields = array_merge( $revQuery['fields'], [ 'page_restrictions, rev_text_id' ] );
+               // TODO: remove rev_text_id and make XmlDumpWriter use a RevisionStore instead! (T198706)
+               $fields = array_merge( $revQuery['fields'], [ 'page_restrictions, rev_text_id' ] );
 
-                       $conds = [];
-                       if ( $cond !== '' ) {
-                               $conds[] = $cond;
-                       }
-                       $opts = [ 'ORDER BY' => 'page_id ASC' ];
-                       $opts['USE INDEX'] = [];
-                       if ( is_array( $this->history ) ) {
-                               # Time offset/limit for all pages/history...
-                               # Set time order
-                               if ( $this->history['dir'] == 'asc' ) {
-                                       $op = '>';
-                                       $opts['ORDER BY'] = 'rev_timestamp ASC';
-                               } else {
-                                       $op = '<';
-                                       $opts['ORDER BY'] = 'rev_timestamp DESC';
-                               }
-                               # Set offset
-                               if ( !empty( $this->history['offset'] ) ) {
-                                       $conds[] = "rev_timestamp $op " .
-                                               $this->db->addQuotes( $this->db->timestamp( $this->history['offset'] ) );
-                               }
-                               # Set query limit
-                               if ( !empty( $this->history['limit'] ) ) {
-                                       $opts['LIMIT'] = intval( $this->history['limit'] );
-                               }
-                       } elseif ( $this->history & self::FULL ) {
-                               # Full history dumps...
-                               # query optimization for history stub dumps
-                               if ( $this->text == self::STUB && $orderRevs ) {
-                                       $tables = $revQuery['tables'];
-                                       $opts['ORDER BY'] = [ 'rev_page ASC', 'rev_id ASC' ];
-                                       $opts['USE INDEX']['revision'] = 'rev_page_id';
-                                       unset( $join['revision'] );
-                                       $join['page'] = [ 'INNER JOIN', 'rev_page=page_id' ];
-                               }
-                       } elseif ( $this->history & self::CURRENT ) {
-                               # Latest revision dumps...
-                               if ( $this->list_authors && $cond != '' ) { // List authors, if so desired
-                                       $this->do_list_authors( $cond );
-                               }
-                               $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ];
-                       } elseif ( $this->history & self::STABLE ) {
-                               # "Stable" revision dumps...
-                               # Default JOIN, to be overridden...
-                               $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ];
-                               # One, and only one hook should set this, and return false
-                               if ( Hooks::run( 'WikiExporter::dumpStableQuery', [ &$tables, &$opts, &$join ] ) ) {
-                                       throw new MWException( __METHOD__ . " given invalid history dump type." );
-                               }
-                       } elseif ( $this->history & self::RANGE ) {
-                               # Dump of revisions within a specified range
-                               $opts['ORDER BY'] = [ 'rev_page ASC', 'rev_id ASC' ];
+               $conds = [];
+               if ( $cond !== '' ) {
+                       $conds[] = $cond;
+               }
+               $opts = [ 'ORDER BY' => [ 'rev_page ASC', 'rev_id ASC' ] ];
+               $opts['USE INDEX'] = [];
+
+               $op = '>';
+               if ( is_array( $this->history ) ) {
+                       # Time offset/limit for all pages/history...
+                       # Set time order
+                       if ( $this->history['dir'] == 'asc' ) {
+                               $opts['ORDER BY'] = 'rev_timestamp ASC';
                        } else {
-                               # Unknown history specification parameter?
+                               $op = '<';
+                               $opts['ORDER BY'] = 'rev_timestamp DESC';
+                       }
+                       # Set offset
+                       if ( !empty( $this->history['offset'] ) ) {
+                               $conds[] = "rev_timestamp $op " .
+                                       $this->db->addQuotes( $this->db->timestamp( $this->history['offset'] ) );
+                       }
+                       # Set query limit
+                       if ( !empty( $this->history['limit'] ) ) {
+                               $maxRowCount = intval( $this->history['limit'] );
+                       }
+               } elseif ( $this->history & self::FULL ) {
+                       # Full history dumps...
+                       # query optimization for history stub dumps
+                       if ( $this->text == self::STUB && $orderRevs ) {
+                               $tables = $revQuery['tables'];
+                               $opts['USE INDEX']['revision'] = 'rev_page_id';
+                               unset( $join['revision'] );
+                               $join['page'] = [ 'INNER JOIN', 'rev_page=page_id' ];
+                       }
+               } elseif ( $this->history & self::CURRENT ) {
+                       # Latest revision dumps...
+                       if ( $this->list_authors && $cond != '' ) { // List authors, if so desired
+                               $this->do_list_authors( $cond );
+                       }
+                       $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ];
+               } elseif ( $this->history & self::STABLE ) {
+                       # "Stable" revision dumps...
+                       # Default JOIN, to be overridden...
+                       $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ];
+                       # One, and only one hook should set this, and return false
+                       if ( Hooks::run( 'WikiExporter::dumpStableQuery', [ &$tables, &$opts, &$join ] ) ) {
                                throw new MWException( __METHOD__ . " given invalid history dump type." );
                        }
+               } elseif ( $this->history & self::RANGE ) {
+                       # Dump of revisions within a specified range.  Condition already set in revsByRange().
+               } else {
+                       # Unknown history specification parameter?
+                       throw new MWException( __METHOD__ . " given invalid history dump type." );
+               }
 
-                       if ( $this->buffer == self::STREAM ) {
-                               $prev = $this->db->bufferResults( false );
-                       }
-                       $result = null; // Assuring $result is not undefined, if exception occurs early
-                       try {
-                               Hooks::run( 'ModifyExportQuery',
-                                               [ $this->db, &$tables, &$cond, &$opts, &$join ] );
-
-                               # Do the query!
-                               $result = $this->db->select(
-                                       $tables,
-                                       $fields,
-                                       $conds,
-                                       __METHOD__,
-                                       $opts,
-                                       $join
-                               );
-                               # Output dump results
-                               $this->outputPageStream( $result );
-
-                               if ( $this->buffer == self::STREAM ) {
-                                       $this->db->bufferResults( $prev );
-                               }
-                       } catch ( Exception $e ) {
-                               // Throwing the exception does not reliably free the resultset, and
-                               // would also leave the connection in unbuffered mode.
-
-                               // Freeing result
-                               try {
-                                       if ( $result ) {
-                                               $result->free();
-                                       }
-                               } catch ( Exception $e2 ) {
-                                       // Already in panic mode -> ignoring $e2 as $e has
-                                       // higher priority
-                               }
+               $result = null; // Assuring $result is not undefined, if exception occurs early
+               $done = false;
+               $lastRow = null;
+               $revPage = 0;
+               $revId = 0;
+               $rowCount = 0;
 
-                               // Putting database back in previous buffer mode
-                               try {
-                                       if ( $this->buffer == self::STREAM ) {
-                                               $this->db->bufferResults( $prev );
-                                       }
-                               } catch ( Exception $e2 ) {
-                                       // Already in panic mode -> ignoring $e2 as $e has
-                                       // higher priority
-                               }
+               $opts['LIMIT'] = self::BATCH_SIZE;
 
-                               // Inform caller about problem
-                               throw $e;
+               Hooks::run( 'ModifyExportQuery',
+                       [ $this->db, &$tables, &$cond, &$opts, &$join ] );
+
+               while ( !$done ) {
+                       // If necessary, impose the overall maximum and stop looping after this iteration.
+                       if ( !empty( $maxRowCount ) && $rowCount + self::BATCH_SIZE > $maxRowCount ) {
+                               $opts['LIMIT'] = $maxRowCount - $rowCount;
+                               $done = true;
+                       }
+
+                       $queryConds = $conds;
+                       $queryConds[] = 'rev_page>' . intval( $revPage ) . ' OR (rev_page=' .
+                               intval( $revPage ) . ' AND rev_id' . $op . intval( $revId ) . ')';
+
+                       # Do the query!
+                       $result = $this->db->select(
+                               $tables,
+                               $fields,
+                               $queryConds,
+                               __METHOD__,
+                               $opts,
+                               $join
+                       );
+                       # Output dump results, get new max ids.
+                       $lastRow = $this->outputPageStream( $result, $lastRow );
+
+                       if ( !$result->numRows() || !$lastRow ) {
+                               $done = true;
+                       } else {
+                               $rowCount += $result->numRows();
+                               $revPage = $lastRow->rev_page;
+                               $revId = $lastRow->rev_id;
                        }
                }
        }
@@ -472,52 +444,55 @@ class WikiExporter {
         * The result set should be sorted/grouped by page to avoid duplicate
         * page records in the output.
         *
-        * Should be safe for
-        * streaming (non-buffered) queries, as long as it was made on a
-        * separate database connection not managed by LoadBalancer; some
-        * blob storage types will make queries to pull source data.
-        *
         * @param ResultWrapper $resultset
+        * @param object $lastRow the last row output from the previous call (or null if none)
+        * @return object the last row processed
         */
-       protected function outputPageStream( $resultset ) {
-               $last = null;
-               foreach ( $resultset as $row ) {
-                       if ( $last === null ||
-                               $last->page_namespace != $row->page_namespace ||
-                               $last->page_title != $row->page_title ) {
-                               if ( $last !== null ) {
-                                       $output = '';
-                                       if ( $this->dumpUploads ) {
-                                               $output .= $this->writer->writeUploads( $last, $this->dumpUploadFileContents );
+       protected function outputPageStream( $resultset, $lastRow ) {
+               if ( $resultset->numRows() ) {
+                       foreach ( $resultset as $row ) {
+                               if ( $lastRow === null ||
+                                       $lastRow->page_namespace != $row->page_namespace ||
+                                       $lastRow->page_title != $row->page_title ) {
+                                       if ( $lastRow !== null ) {
+                                               $output = '';
+                                               if ( $this->dumpUploads ) {
+                                                       $output .= $this->writer->writeUploads( $lastRow, $this->dumpUploadFileContents );
+                                               }
+                                               $output .= $this->writer->closePage();
+                                               $this->sink->writeClosePage( $output );
                                        }
-                                       $output .= $this->writer->closePage();
-                                       $this->sink->writeClosePage( $output );
+                                       $output = $this->writer->openPage( $row );
+                                       $this->sink->writeOpenPage( $row, $output );
                                }
-                               $output = $this->writer->openPage( $row );
-                               $this->sink->writeOpenPage( $row, $output );
-                               $last = $row;
+                               $output = $this->writer->writeRevision( $row );
+                               $this->sink->writeRevision( $row, $output );
+                               $lastRow = $row;
                        }
-                       $output = $this->writer->writeRevision( $row );
-                       $this->sink->writeRevision( $row, $output );
-               }
-               if ( $last !== null ) {
+               } elseif ( $lastRow !== null ) {
+                       // Empty resultset means done with all batches  Close off final page element (if any).
                        $output = '';
                        if ( $this->dumpUploads ) {
-                               $output .= $this->writer->writeUploads( $last, $this->dumpUploadFileContents );
+                               $output .= $this->writer->writeUploads( $lastRow, $this->dumpUploadFileContents );
                        }
                        $output .= $this->author_list;
                        $output .= $this->writer->closePage();
                        $this->sink->writeClosePage( $output );
+                       $lastRow = null;
                }
+
+               return $lastRow;
        }
 
        /**
         * @param ResultWrapper $resultset
+        * @return int the log_id value of the last item output, or null if none
         */
        protected function outputLogStream( $resultset ) {
                foreach ( $resultset as $row ) {
                        $output = $this->writer->writeLogItem( $row );
                        $this->sink->writeLogItem( $row, $output );
                }
+               return isset( $row ) ? $row->log_id : null;
        }
 }
index 3a7e9cd..513e7a9 100644 (file)
@@ -23,7 +23,6 @@
  * @ingroup SpecialPage
  */
 
-use MediaWiki\MediaWikiServices;
 use MediaWiki\Logger\LoggerFactory;
 
 /**
@@ -379,23 +378,10 @@ class SpecialExport extends SpecialPage {
                }
 
                /* Ok, let's get to it... */
-               if ( $history == WikiExporter::CURRENT ) {
-                       $lb = false;
-                       $db = wfGetDB( DB_REPLICA );
-                       $buffer = WikiExporter::BUFFER;
-               } else {
-                       // Use an unbuffered query; histories may be very long!
-                       $lb = MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->newMainLB();
-                       $db = $lb->getConnection( DB_REPLICA );
-                       $buffer = WikiExporter::STREAM;
-
-                       // This might take a while... :D
-                       Wikimedia\suppressWarnings();
-                       set_time_limit( 0 );
-                       Wikimedia\restoreWarnings();
-               }
+               $lb = false;
+               $db = wfGetDB( DB_REPLICA );
 
-               $exporter = new WikiExporter( $db, $history, $buffer );
+               $exporter = new WikiExporter( $db, $history );
                $exporter->list_authors = $list_authors;
                $exporter->openStream();
 
index e8993e4..4c2b64c 100644 (file)
@@ -257,7 +257,7 @@ abstract class BackupDumper extends Maintenance {
                $this->initProgress( $history );
 
                $db = $this->backupDb();
-               $exporter = new WikiExporter( $db, $history, WikiExporter::STREAM, $text );
+               $exporter = new WikiExporter( $db, $history, $text );
                $exporter->dumpUploads = $this->dumpUploads;
                $exporter->dumpUploadFileContents = $this->dumpUploadFileContents;