rdbms: Add callback for atomic section cancellation
[lhc/web/wiklou.git] / includes / libs / rdbms / database / Database.php
index 971257f..c86adac 100644 (file)
@@ -38,6 +38,7 @@ use InvalidArgumentException;
 use UnexpectedValueException;
 use Exception;
 use RuntimeException;
+use Throwable;
 
 /**
  * Relational database abstraction object
@@ -148,6 +149,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        private $trxPreCommitCallbacks = [];
        /** @var array[] List of (callable, method name, atomic section id) */
        private $trxEndCallbacks = [];
+       /** @var array[] List of (callable, method name, atomic section id) */
+       private $trxSectionCancelCallbacks = [];
        /** @var callable[] Map of (name => callable) */
        private $trxRecurringCallbacks = [];
        /** @var bool Whether to suppress triggering of transaction end callbacks */
@@ -625,7 +628,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        $this->trxDoneWrites ||
                        $this->trxIdleCallbacks ||
                        $this->trxPreCommitCallbacks ||
-                       $this->trxEndCallbacks
+                       $this->trxEndCallbacks ||
+                       $this->trxSectionCancelCallbacks
                );
        }
 
@@ -698,7 +702,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                foreach ( [
                        $this->trxIdleCallbacks,
                        $this->trxPreCommitCallbacks,
-                       $this->trxEndCallbacks
+                       $this->trxEndCallbacks,
+                       $this->trxSectionCancelCallbacks
                ] as $callbacks ) {
                        foreach ( $callbacks as $callback ) {
                                $fnames[] = $callback[1];
@@ -3376,6 +3381,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                }
        }
 
+       final public function onAtomicSectionCancel( callable $callback, $fname = __METHOD__ ) {
+               if ( !$this->trxLevel || !$this->trxAtomicLevels ) {
+                       throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
+               }
+               $this->trxSectionCancelCallbacks[] = [ $callback, $fname, $this->currentAtomicSectionId() ];
+       }
+
        /**
         * @return AtomicSectionIdentifier|null ID of the topmost atomic section level
         */
@@ -3390,6 +3402,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        }
 
        /**
+        * Hoist callback ownership for callbacks in a section to a parent section.
+        * All callbacks should have an owner that is present in trxAtomicLevels.
         * @param AtomicSectionIdentifier $old
         * @param AtomicSectionIdentifier $new
         */
@@ -3411,13 +3425,35 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                                $this->trxEndCallbacks[$key][2] = $new;
                        }
                }
+               foreach ( $this->trxSectionCancelCallbacks as $key => $info ) {
+                       if ( $info[2] === $old ) {
+                               $this->trxSectionCancelCallbacks[$key][2] = $new;
+                       }
+               }
        }
 
        /**
+        * Update callbacks that were owned by cancelled atomic sections.
+        *
+        * Callbacks for "on commit" should never be run if they're owned by a
+        * section that won't be committed.
+        *
+        * Callbacks for "on resolution" need to reflect that the section was
+        * rolled back, even if the transaction as a whole commits successfully.
+        *
+        * Callbacks for "on section cancel" should already have been consumed,
+        * but errors during the cancellation itself can prevent that while still
+        * destroying the section. Hoist any such callbacks to the new top section,
+        * which we assume will itself have to be cancelled or rolled back to
+        * resolve the error.
+        *
         * @param AtomicSectionIdentifier[] $sectionIds ID of an actual savepoint
+        * @param AtomicSectionIdentifier|null $newSectionId New top section ID.
         * @throws UnexpectedValueException
         */
-       private function modifyCallbacksForCancel( array $sectionIds ) {
+       private function modifyCallbacksForCancel(
+               array $sectionIds, AtomicSectionIdentifier $newSectionId = null
+       ) {
                // Cancel the "on commit" callbacks owned by this savepoint
                $this->trxIdleCallbacks = array_filter(
                        $this->trxIdleCallbacks,
@@ -3436,8 +3472,17 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        if ( in_array( $entry[2], $sectionIds, true ) ) {
                                $callback = $entry[0];
                                $this->trxEndCallbacks[$key][0] = function () use ( $callback ) {
+                                       // @phan-suppress-next-line PhanInfiniteRecursion No recursion at all here, phan is confused
                                        return $callback( self::TRIGGER_ROLLBACK, $this );
                                };
+                               // This "on resolution" callback no longer belongs to a section.
+                               $this->trxEndCallbacks[$key][2] = null;
+                       }
+               }
+               // Hoist callback ownership for section cancel callbacks to the new top section
+               foreach ( $this->trxSectionCancelCallbacks as $key => $entry ) {
+                       if ( in_array( $entry[2], $sectionIds, true ) ) {
+                               $this->trxSectionCancelCallbacks[$key][2] = $newSectionId;
                        }
                }
        }
@@ -3492,6 +3537,14 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        );
                        $this->trxIdleCallbacks = []; // consumed (and recursion guard)
                        $this->trxEndCallbacks = []; // consumed (recursion guard)
+
+                       // Only run trxSectionCancelCallbacks on rollback, not commit.
+                       // But always consume them.
+                       if ( $trigger === self::TRIGGER_ROLLBACK ) {
+                               $callbacks = array_merge( $callbacks, $this->trxSectionCancelCallbacks );
+                       }
+                       $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
+
                        foreach ( $callbacks as $callback ) {
                                ++$count;
                                list( $phpCallback ) = $callback;
@@ -3559,6 +3612,46 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                return $count;
        }
 
+       /**
+        * Actually run any "atomic section cancel" callbacks.
+        *
+        * @param int $trigger IDatabase::TRIGGER_* constant
+        * @param AtomicSectionIdentifier[]|null $sectionId Section IDs to cancel,
+        *  null on transaction rollback
+        */
+       private function runOnAtomicSectionCancelCallbacks(
+               $trigger, array $sectionIds = null
+       ) {
+               /** @var Exception|Throwable $e */
+               $e = null; // first exception
+
+               $notCancelled = [];
+               do {
+                       $callbacks = $this->trxSectionCancelCallbacks;
+                       $this->trxSectionCancelCallbacks = []; // consumed (recursion guard)
+                       foreach ( $callbacks as $entry ) {
+                               if ( $sectionIds === null || in_array( $entry[2], $sectionIds, true ) ) {
+                                       try {
+                                               $entry[0]( $trigger, $this );
+                                       } catch ( Exception $ex ) {
+                                               ( $this->errorLogger )( $ex );
+                                               $e = $e ?: $ex;
+                                       } catch ( Throwable $ex ) {
+                                               // @todo: Log?
+                                               $e = $e ?: $ex;
+                                       }
+                               } else {
+                                       $notCancelled[] = $entry;
+                               }
+                       }
+               } while ( count( $this->trxSectionCancelCallbacks ) );
+               $this->trxSectionCancelCallbacks = $notCancelled;
+
+               if ( $e !== null ) {
+                       throw $e; // re-throw any first Exception/Throwable
+               }
+       }
+
        /**
         * Actually run any "transaction listener" callbacks.
         *
@@ -3722,67 +3815,79 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        throw new DBUnexpectedError( $this, "No atomic section is open (got $fname)." );
                }
 
-               $excisedFnames = [];
-               if ( $sectionId !== null ) {
-                       // Find the (last) section with the given $sectionId
-                       $pos = -1;
-                       foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
-                               if ( $asId === $sectionId ) {
-                                       $pos = $i;
+               $excisedIds = [];
+               $newTopSection = $this->currentAtomicSectionId();
+               try {
+                       $excisedFnames = [];
+                       if ( $sectionId !== null ) {
+                               // Find the (last) section with the given $sectionId
+                               $pos = -1;
+                               foreach ( $this->trxAtomicLevels as $i => list( $asFname, $asId, $spId ) ) {
+                                       if ( $asId === $sectionId ) {
+                                               $pos = $i;
+                                       }
                                }
+                               if ( $pos < 0 ) {
+                                       throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" );
+                               }
+                               // Remove all descendant sections and re-index the array
+                               $len = count( $this->trxAtomicLevels );
+                               for ( $i = $pos + 1; $i < $len; ++$i ) {
+                                       $excisedFnames[] = $this->trxAtomicLevels[$i][0];
+                                       $excisedIds[] = $this->trxAtomicLevels[$i][1];
+                               }
+                               $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
+                               $newTopSection = $this->currentAtomicSectionId();
                        }
-                       if ( $pos < 0 ) {
-                               throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" );
-                       }
-                       // Remove all descendant sections and re-index the array
-                       $excisedIds = [];
-                       $len = count( $this->trxAtomicLevels );
-                       for ( $i = $pos + 1; $i < $len; ++$i ) {
-                               $excisedFnames[] = $this->trxAtomicLevels[$i][0];
-                               $excisedIds[] = $this->trxAtomicLevels[$i][1];
-                       }
-                       $this->trxAtomicLevels = array_slice( $this->trxAtomicLevels, 0, $pos + 1 );
-                       $this->modifyCallbacksForCancel( $excisedIds );
-               }
 
-               // Check if the current section matches $fname
-               $pos = count( $this->trxAtomicLevels ) - 1;
-               list( $savedFname, $savedSectionId, $savepointId ) = $this->trxAtomicLevels[$pos];
+                       // Check if the current section matches $fname
+                       $pos = count( $this->trxAtomicLevels ) - 1;
+                       list( $savedFname, $savedSectionId, $savepointId ) = $this->trxAtomicLevels[$pos];
 
-               if ( $excisedFnames ) {
-                       $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname) " .
-                               "and descendants " . implode( ', ', $excisedFnames ) );
-               } else {
-                       $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname)" );
-               }
+                       if ( $excisedFnames ) {
+                               $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname) " .
+                                       "and descendants " . implode( ', ', $excisedFnames ) );
+                       } else {
+                               $this->queryLogger->debug( "cancelAtomic: canceling level $pos ($savedFname)" );
+                       }
 
-               if ( $savedFname !== $fname ) {
-                       throw new DBUnexpectedError(
-                               $this,
-                               "Invalid atomic section ended (got $fname but expected $savedFname)."
-                       );
-               }
+                       if ( $savedFname !== $fname ) {
+                               throw new DBUnexpectedError(
+                                       $this,
+                                       "Invalid atomic section ended (got $fname but expected $savedFname)."
+                               );
+                       }
 
-               // Remove the last section (no need to re-index the array)
-               array_pop( $this->trxAtomicLevels );
-               $this->modifyCallbacksForCancel( [ $savedSectionId ] );
+                       // Remove the last section (no need to re-index the array)
+                       array_pop( $this->trxAtomicLevels );
+                       $excisedIds[] = $savedSectionId;
+                       $newTopSection = $this->currentAtomicSectionId();
 
-               if ( $savepointId !== null ) {
-                       // Rollback the transaction to the state just before this atomic section
-                       if ( $savepointId === self::$NOT_APPLICABLE ) {
-                               $this->rollback( $fname, self::FLUSHING_INTERNAL );
-                       } else {
-                               $this->doRollbackToSavepoint( $savepointId, $fname );
-                               $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
-                               $this->trxStatusIgnoredCause = null;
+                       if ( $savepointId !== null ) {
+                               // Rollback the transaction to the state just before this atomic section
+                               if ( $savepointId === self::$NOT_APPLICABLE ) {
+                                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                                       // Note: rollback() will run trxSectionCancelCallbacks
+                               } else {
+                                       $this->doRollbackToSavepoint( $savepointId, $fname );
+                                       $this->trxStatus = self::STATUS_TRX_OK; // no exception; recovered
+                                       $this->trxStatusIgnoredCause = null;
+
+                                       // Run trxSectionCancelCallbacks now.
+                                       $this->runOnAtomicSectionCancelCallbacks( self::TRIGGER_CANCEL, $excisedIds );
+                               }
+                       } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
+                               // Put the transaction into an error state if it's not already in one
+                               $this->trxStatus = self::STATUS_TRX_ERROR;
+                               $this->trxStatusCause = new DBUnexpectedError(
+                                       $this,
+                                       "Uncancelable atomic section canceled (got $fname)."
+                               );
                        }
-               } elseif ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
-                       // Put the transaction into an error state if it's not already in one
-                       $this->trxStatus = self::STATUS_TRX_ERROR;
-                       $this->trxStatusCause = new DBUnexpectedError(
-                               $this,
-                               "Uncancelable atomic section canceled (got $fname)."
-                       );
+               } finally {
+                       // Fix up callbacks owned by the sections that were just cancelled.
+                       // All callbacks should have an owner that is present in trxAtomicLevels.
+                       $this->modifyCallbacksForCancel( $excisedIds, $newTopSection );
                }
 
                $this->affectedRowCount = 0; // for the sake of consistency
@@ -4697,6 +4802,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        // Open a new connection resource without messing with the old one
                        $this->conn = false;
                        $this->trxEndCallbacks = []; // don't copy
+                       $this->trxSectionCancelCallbacks = []; // don't copy
                        $this->handleSessionLossPreconnect(); // no trx or locks anymore
                        $this->open(
                                $this->server,