Database: Add batching to non-native insertSelect()
[lhc/web/wiklou.git] / includes / libs / rdbms / database / Database.php
index 572a798..fd7bfe9 100644 (file)
@@ -236,6 +236,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        /** @var TransactionProfiler */
        protected $trxProfiler;
 
+       /** @var int */
+       protected $nonNativeInsertSelectBatchSize = 10000;
+
        /**
         * Constructor and database handle and attempt to connect to the DB server
         *
@@ -278,6 +281,10 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $this->queryLogger = $params['queryLogger'];
                $this->errorLogger = $params['errorLogger'];
 
+               if ( isset( $params['nonNativeInsertSelectBatchSize'] ) ) {
+                       $this->nonNativeInsertSelectBatchSize = $params['nonNativeInsertSelectBatchSize'];
+               }
+
                // Set initial dummy domain until open() sets the final DB/prefix
                $this->currentDomain = DatabaseDomain::newUnspecified();
 
@@ -331,6 +338,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         *   - cliMode: Whether to consider the execution context that of a CLI script.
         *   - agent: Optional name used to identify the end-user in query profiling/logging.
         *   - srvCache: Optional BagOStuff instance to an APC-style cache.
+        *   - nonNativeInsertSelectBatchSize: Optional batch size for non-native INSERT SELECT emulation.
         * @return Database|null If the database driver or extension cannot be found
         * @throws InvalidArgumentException If the database driver or extension cannot be found
         * @since 1.18
@@ -2504,12 +2512,43 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        return false;
                }
 
-               $rows = [];
-               foreach ( $res as $row ) {
-                       $rows[] = (array)$row;
+               try {
+                       $affectedRowCount = 0;
+                       $this->startAtomic( $fname );
+                       $rows = [];
+                       $ok = true;
+                       foreach ( $res as $row ) {
+                               $rows[] = (array)$row;
+
+                               // Avoid inserts that are too huge
+                               if ( count( $rows ) >= $this->nonNativeInsertSelectBatchSize ) {
+                                       $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+                                       if ( !$ok ) {
+                                               break;
+                                       }
+                                       $affectedRowCount += $this->affectedRows();
+                                       $rows = [];
+                               }
+                       }
+                       if ( $rows && $ok ) {
+                               $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+                               if ( $ok ) {
+                                       $affectedRowCount += $this->affectedRows();
+                               }
+                       }
+                       if ( $ok ) {
+                               $this->endAtomic( $fname );
+                               $this->affectedRowCount = $affectedRowCount;
+                       } else {
+                               $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                               $this->affectedRowCount = 0;
+                       }
+                       return $ok;
+               } catch ( Exception $e ) {
+                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                       $this->affectedRowCount = 0;
+                       throw $e;
                }
-
-               return $this->insert( $destTable, $rows, $fname, $insertOptions );
        }
 
        /**