[Database] Added an upsert() function to perform/emulate ON DUPLICATE KEY UPDATE.
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 2 May 2013 22:50:29 +0000 (15:50 -0700)
committerGerrit Code Review <gerrit@wikimedia.org>
Tue, 4 Jun 2013 06:20:41 +0000 (06:20 +0000)
Change-Id: Id7fc6652268a439af0457309a678677351867f37

includes/db/Database.php
includes/db/DatabaseMysql.php

index 799e168..2f82e65 100644 (file)
@@ -2529,6 +2529,92 @@ abstract class DatabaseBase implements DatabaseType {
                return $this->query( $sql, $fname );
        }
 
+       /**
+        * INSERT ON DUPLICATE KEY UPDATE wrapper, upserts an array into a table.
+        *
+        * This updates any conflicting rows (according to the unique indexes) using
+        * the provided SET clause and inserts any remaining (non-conflicted) rows.
+        *
+        * $rows may be either:
+        *   - A single associative array. The array keys are the field names, and
+        *     the values are the values to insert. The values are treated as data
+        *     and will be quoted appropriately. If NULL is inserted, this will be
+        *     converted to a database NULL.
+        *   - An array with numeric keys, holding a list of associative arrays.
+        *     This causes a multi-row INSERT on DBMSs that support it. The keys in
+        *     each subarray must be identical to each other, and in the same order.
+        *
+        * It may be more efficient to leave off unique indexes which are unlikely
+        * to collide. However if you do this, you run the risk of encountering
+        * errors which wouldn't have occurred in MySQL.
+        *
+        * Usually throws a DBQueryError on failure. If errors are explicitly ignored,
+        * returns success.
+        *
+        * @param string $table Table name. This will be passed through DatabaseBase::tableName().
+        * @param array $rows A single row or list of rows to insert
+        * @param array $uniqueIndexes List of single field names or field name tuples
+        * @param array $set An array of values to SET. For each array element,
+        *                the key gives the field name, and the value gives the data
+        *                to set that field to. The data will be quoted by
+        *                DatabaseBase::addQuotes().
+        * @param string $fname Calling function name (use __METHOD__) for logs/profiling
+        * @param array $options of options
+        *
+        * @return bool
+        * @since 1.22
+        */
+       public function upsert(
+               $table, array $rows, array $uniqueIndexes, array $set, $fname = 'DatabaseBase::upsert'
+       ) {
+               if ( !count( $rows ) ) {
+                       return true; // nothing to do
+               }
+               $rows = is_array( reset( $rows ) ) ? $rows : array( $rows );
+
+               if ( count( $uniqueIndexes ) ) {
+                       $clauses = array(); // list WHERE clauses that each identify a single row
+                       foreach ( $rows as $row ) {
+                               foreach ( $uniqueIndexes as $index ) {
+                                       $index = is_array( $index ) ? $index : array( $index ); // columns
+                                       $rowKey = array(); // unique key to this row
+                                       foreach ( $index as $column ) {
+                                               $rowKey[$column] = $row[$column];
+                                       }
+                                       $clauses[] = $this->makeList( $rowKey, LIST_AND );
+                               }
+                       }
+                       $where = array( $this->makeList( $clauses, LIST_OR ) );
+               } else {
+                       $where = false;
+               }
+
+               $useTrx = !$this->mTrxLevel;
+               if ( $useTrx ) {
+                       $this->begin();
+               }
+               try {
+                       # Update any existing conflicting row(s)
+                       if ( $where !== false ) {
+                               $ok = $this->update( $table, $set, $where, $fname );
+                       } else {
+                               $ok = true;
+                       }
+                       # Now insert any non-conflicting row(s)
+                       $ok = $this->insert( $table, $rows, $fname, array( 'IGNORE' ) ) && $ok;
+               } catch ( Exception $e ) {
+                       if ( $useTrx ) {
+                               $this->rollback();
+                       }
+                       throw $e;
+               }
+               if ( $useTrx ) {
+                       $this->commit();
+               }
+
+               return $ok;
+       }
+
        /**
         * DELETE where the condition is a join.
         *
index ca5a2b4..8bc975f 100644 (file)
@@ -352,6 +352,37 @@ class DatabaseMysql extends DatabaseBase {
                return $this->nativeReplace( $table, $rows, $fname );
        }
 
+       /**
+        * @param string $table
+        * @param array $rows
+        * @param array $uniqueIndexes
+        * @param array $set
+        * @param string $fname
+        * @param array $options
+        * @return bool
+        */
+       public function upsert(
+               $table, array $rows, array $uniqueIndexes, array $set, $fname = 'DatabaseMysql::upsert'
+       ) {
+               if ( !count( $rows ) ) {
+                       return true; // nothing to do
+               }
+               $rows = is_array( reset( $rows ) ) ? $rows : array( $rows );
+
+               $table = $this->tableName( $table );
+               $columns = array_keys( $rows[0] );
+
+               $sql = "INSERT INTO $table (" . implode( ',', $columns ) . ') VALUES ';
+               $rowTuples = array();
+               foreach ( $rows as $row ) {
+                       $rowTuples[] = '(' . $this->makeList( $row ) . ')';
+               }
+               $sql .= implode( ',', $rowTuples );
+               $sql .= " ON DUPLICATE KEY UPDATE " . $this->makeList( $set, LIST_SET );
+
+               return (bool)$this->query( $sql, $fname );
+       }
+
        /**
         * Estimate rows in dataset
         * Returns estimated count, based on EXPLAIN output