objectcache: allow for callbacks to mask SYNC_WRITE latency
authorAaron Schulz <aschulz@wikimedia.org>
Mon, 29 Aug 2016 17:00:05 +0000 (10:00 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Thu, 1 Sep 2016 20:44:18 +0000 (13:44 -0700)
Change-Id: I908222ad3788ebe330aa58831cda139da32becd8

autoload.php
includes/libs/WaitConditionLoop.php [new file with mode: 0644]
includes/libs/objectcache/BagOStuff.php
includes/objectcache/SqlBagOStuff.php
tests/phpunit/includes/libs/WaitConditionLoopTest.php [new file with mode: 0644]

index dec7219..652535c 100644 (file)
@@ -1498,6 +1498,7 @@ $wgAutoloadLocalClasses = [
        'VirtualRESTService' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTService.php',
        'VirtualRESTServiceClient' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTServiceClient.php',
        'WANObjectCache' => __DIR__ . '/includes/libs/objectcache/WANObjectCache.php',
+       'WaitConditionLoop' => __DIR__ . '/includes/libs/WaitConditionLoop.php',
        'WantedCategoriesPage' => __DIR__ . '/includes/specials/SpecialWantedcategories.php',
        'WantedFilesPage' => __DIR__ . '/includes/specials/SpecialWantedfiles.php',
        'WantedPagesPage' => __DIR__ . '/includes/specials/SpecialWantedpages.php',
diff --git a/includes/libs/WaitConditionLoop.php b/includes/libs/WaitConditionLoop.php
new file mode 100644 (file)
index 0000000..3cc6236
--- /dev/null
@@ -0,0 +1,171 @@
+<?php
+/**
+ * Wait loop that reaches a condition or times out.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Wait loop that reaches a condition or times out
+ * @since 1.28
+ */
+class WaitConditionLoop {
+       /** @var callable */
+       private $condition;
+       /** @var callable[] */
+       private $busyCallbacks = [];
+       /** @var float Seconds */
+       private $timeout;
+       /** @var float Seconds */
+       private $lastWaitTime;
+
+       const CONDITION_REACHED = 1;
+       const CONDITION_CONTINUE = 0; // evaluates as falsey
+       const CONDITION_TIMED_OUT = -1;
+       const CONDITION_ABORTED = -2;
+
+       /**
+        * @param callable $condition Callback that returns a WaitConditionLoop::CONDITION_ constant
+        * @param float $timeout Timeout in seconds
+        * @param array &$busyCallbacks List of callbacks to do useful work (by reference)
+        */
+       public function __construct( callable $condition, $timeout = 5.0, &$busyCallbacks = [] ) {
+               $this->condition = $condition;
+               $this->timeout = $timeout;
+               $this->busyCallbacks =& $busyCallbacks;
+       }
+
+       /**
+        * Invoke the loop and continue until either:
+        *   - a) The condition callback does not return either CONDITION_CONTINUE or true
+        *   - b) The timeout is reached
+        * This a condition callback can return true (stop) or false (continue) for convenience.
+        * In such cases, the halting result of "true" will be converted to CONDITION_REACHED.
+        *
+        * Exceptions in callbacks will be caught and the callback will be swapped with
+        * one that simply rethrows that exception back to the caller when invoked.
+        *
+        * @return integer WaitConditionLoop::CONDITION_* constant
+        * @throws Exception Any error from the condition callback
+        */
+       public function invoke() {
+               $elapsed = 0.0; // seconds
+               $sleepUs = 0; // microseconds to sleep each time
+               $lastCheck = false;
+               $finalResult = self::CONDITION_TIMED_OUT;
+               do {
+                       $checkStartTime = $this->getWallTime();
+                       // Check if the condition is met yet
+                       $realStart = $this->getWallTime();
+                       $cpuStart = $this->getCpuTime();
+                       $checkResult = call_user_func( $this->condition );
+                       $cpu = $this->getCpuTime() - $cpuStart;
+                       $real = $this->getWallTime() - $realStart;
+                       // Exit if the condition is reached
+                       if ( (int)$checkResult !== self::CONDITION_CONTINUE ) {
+                               $finalResult = is_int( $checkResult ) ? $checkResult : self::CONDITION_REACHED;
+                               break;
+                       } elseif ( $lastCheck ) {
+                               break; // timeout
+                       }
+                       // Detect if condition callback seems to block or if justs burns CPU
+                       $conditionUsesInterrupts = ( $real > 0.100 && $cpu <= $real * .03 );
+                       if ( !$this->popAndRunBusyCallback() && !$conditionUsesInterrupts ) {
+                               // 10 queries = 10(10+100)/2 ms = 550ms, 14 queries = 1050ms
+                               $sleepUs = min( $sleepUs + 10 * 1e3, 1e6 ); // stop incrementing at ~1s
+                               $this->usleep( $sleepUs );
+                       }
+                       $checkEndTime = $this->getWallTime();
+                       // The max() protects against the clock getting set back
+                       $elapsed += max( $checkEndTime - $checkStartTime, 0.010 );
+                       // Do not let slow callbacks timeout without checking the condition one more time
+                       $lastCheck = ( $elapsed >= $this->timeout );
+               } while ( true );
+
+               $this->lastWaitTime = $elapsed;
+
+               return $finalResult;
+       }
+
+       /**
+        * @return float Seconds
+        */
+       public function getLastWaitTime() {
+               return $this->lastWaitTime;
+       }
+
+       /**
+        * @param integer $microseconds
+        */
+       protected function usleep( $microseconds ) {
+               usleep( $microseconds );
+       }
+
+       /**
+        * @return float
+        */
+       protected function getWallTime() {
+               return microtime( true );
+       }
+
+       /**
+        * @return float Returns 0.0 if not supported (Windows on PHP < 7)
+        */
+       protected function getCpuTime() {
+               $time = 0.0;
+
+               if ( defined( 'HHVM_VERSION' ) && PHP_OS === 'Linux' ) {
+                       $ru = getrusage( 2 /* RUSAGE_THREAD */ );
+               } else {
+                       $ru = getrusage( 0 /* RUSAGE_SELF */ );
+               }
+               if ( $ru ) {
+                       $time += $ru['ru_utime.tv_sec'] + $ru['ru_utime.tv_usec'] / 1e6;
+                       $time += $ru['ru_stime.tv_sec'] + $ru['ru_stime.tv_usec'] / 1e6;
+               }
+
+               return $time;
+       }
+
+       /**
+        * Run one of the callbacks that does work ahead of time for another caller
+        *
+        * @return bool Whether a callback was executed
+        */
+       private function popAndRunBusyCallback() {
+               if ( $this->busyCallbacks ) {
+                       reset( $this->busyCallbacks );
+                       $key = key( $this->busyCallbacks );
+                       /** @var callable $workCallback */
+                       $workCallback =& $this->busyCallbacks[$key];
+                       try {
+                               $workCallback();
+                       } catch ( Exception $e ) {
+                               $workCallback = function () use ( $e ) {
+                                       throw $e;
+                               };
+                       }
+                       unset( $this->busyCallbacks[$key] ); // consume
+
+                       return true;
+               }
+
+               return false;
+       }
+}
index 5472e83..a679be8 100644 (file)
@@ -45,31 +45,29 @@ use Psr\Log\NullLogger;
 abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
        /** @var array[] Lock tracking */
        protected $locks = [];
-
        /** @var integer ERR_* class constant */
        protected $lastError = self::ERR_NONE;
-
        /** @var string */
        protected $keyspace = 'local';
-
        /** @var LoggerInterface */
        protected $logger;
-
        /** @var callback|null */
        protected $asyncHandler;
+       /** @var integer Seconds */
+       protected $syncTimeout;
 
        /** @var bool */
        private $debugMode = false;
-
        /** @var array */
        private $duplicateKeyLookups = [];
-
        /** @var bool */
        private $reportDupes = false;
-
        /** @var bool */
        private $dupeTrackScheduled = false;
 
+       /** @var callable[] */
+       protected $busyCallbacks = [];
+
        /** @var integer[] Map of (ATTR_* class constant => QOS_* class constant) */
        protected $attrMap = [];
 
@@ -94,6 +92,7 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
         *      In CLI mode, it should run the task immediately.
         *   - reportDupes: Whether to emit warning log messages for all keys that were
         *      requested more than once (requires an asyncHandler).
+        *   - syncTimeout: How long to wait with WRITE_SYNC in seconds.
         * @param array $params
         */
        public function __construct( array $params = [] ) {
@@ -114,6 +113,8 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
                if ( !empty( $params['reportDupes'] ) && is_callable( $this->asyncHandler ) ) {
                        $this->reportDupes = true;
                }
+
+               $this->syncTimeout = isset( $params['syncTimeout'] ) ? $params['syncTimeout'] : 3;
        }
 
        /**
@@ -642,6 +643,30 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
                $this->lastError = $err;
        }
 
+       /**
+        * Let a callback be run to avoid wasting time on special blocking calls
+        *
+        * The callbacks may or may not be called ever, in any particular order.
+        * They are likely to be invoked when something WRITE_SYNC is used used.
+        * They should follow a caching pattern as shown below, so that any code
+        * using the word will get it's result no matter what happens.
+        * @code
+        *     $result = null;
+        *     $workCallback = function () use ( &$result ) {
+        *         if ( !$result ) {
+        *             $result = ....
+        *         }
+        *         return $result;
+        *     }
+        * @endcode
+        *
+        * @param callable $workCallback
+        * @since 1.28
+        */
+       public function addBusyCallback( callable $workCallback ) {
+               $this->busyCallbacks[] = $workCallback;
+       }
+
        /**
         * Modify a cache update operation array for EventRelayer::notify()
         *
index 73f8280..0abe64c 100644 (file)
@@ -377,7 +377,7 @@ class SqlBagOStuff extends BagOStuff {
        public function set( $key, $value, $exptime = 0, $flags = 0 ) {
                $ok = $this->setMulti( [ $key => $value ], $exptime );
                if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
-                       $ok = $ok && $this->waitForSlaves();
+                       $ok = $this->waitForReplication() && $ok;
                }
 
                return $ok;
@@ -489,7 +489,7 @@ class SqlBagOStuff extends BagOStuff {
        public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
                $ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts );
                if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
-                       $ok = $ok && $this->waitForSlaves();
+                       $ok = $this->waitForReplication() && $ok;
                }
 
                return $ok;
@@ -797,26 +797,30 @@ class SqlBagOStuff extends BagOStuff {
                return !$this->serverInfos;
        }
 
-       protected function waitForSlaves() {
-               if ( $this->usesMainDB() ) {
-                       $lb = $this->getSeparateMainLB()
-                               ?: MediaWikiServices::getInstance()->getDBLoadBalancer();
-                       // Return if there are no slaves
-                       if ( $lb->getServerCount() <= 1 ) {
-                               return true;
-                       }
-                       // Main LB is used; wait for any slaves to catch up
-                       try {
-                               $pos = $lb->getMasterPos();
-                               if ( $pos ) {
-                                       return $lb->waitForAll( $pos, 3 );
-                               }
-                       } catch ( DBReplicationWaitError $e ) {
-                               return false;
-                       }
+       protected function waitForReplication() {
+               if ( !$this->usesMainDB() ) {
+                       // Custom DB server list; probably doesn't use replication
+                       return true;
                }
 
-               // Custom DB server list; probably doesn't use replication
-               return true;
+               $lb = $this->getSeparateMainLB()
+                       ?: MediaWikiServices::getInstance()->getDBLoadBalancer();
+
+               if ( $lb->getServerCount() <= 1 ) {
+                       return true; // no slaves
+               }
+
+               // Main LB is used; wait for any slaves to catch up
+               $masterPos = $lb->getMasterPos();
+
+               $loop = new WaitConditionLoop(
+                       function () use ( $lb, $masterPos ) {
+                               return $lb->waitForAll( $masterPos, 1 );
+                       },
+                       $this->syncTimeout,
+                       $this->busyCallbacks
+               );
+
+               return ( $loop->invoke() === $loop::CONDITION_REACHED );
        }
 }
diff --git a/tests/phpunit/includes/libs/WaitConditionLoopTest.php b/tests/phpunit/includes/libs/WaitConditionLoopTest.php
new file mode 100644 (file)
index 0000000..b75adca
--- /dev/null
@@ -0,0 +1,154 @@
+<?php
+
+class WaitConditionLoopFakeTime extends WaitConditionLoop {
+       protected $wallClock = 1;
+
+       function __construct( callable $condition, $timeout, array $busyCallbacks ) {
+               parent::__construct( $condition, $timeout, $busyCallbacks );
+       }
+
+       function usleep( $microseconds ) {
+               $this->wallClock += $microseconds / 1e6;
+       }
+
+       function getCpuTime() {
+               return 0.0;
+       }
+
+       function getWallTime() {
+               return $this->wallClock;
+       }
+
+       public function setWallClock( &$timestamp ) {
+               $this->wallClock =& $timestamp;
+       }
+}
+
+class WaitConditionLoopTest extends PHPUnit_Framework_TestCase {
+       public function testCallbackReached() {
+               $wallClock = microtime( true );
+
+               $count = 0;
+               $status = new StatusValue();
+               $loop = new WaitConditionLoopFakeTime(
+                       function () use ( &$count, $status ) {
+                               ++$count;
+                               $status->value = 'cookie';
+
+                               return WaitConditionLoop::CONDITION_REACHED;
+                       },
+                       10.0,
+                       $this->newBusyWork( $x, $y, $z )
+               );
+               $this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke() );
+               $this->assertEquals( 1, $count );
+               $this->assertEquals( 'cookie', $status->value );
+               $this->assertEquals( [ 0, 0, 0 ], [ $x, $y, $z ], "No busy work done" );
+
+               $count = 0;
+               $loop = new WaitConditionLoopFakeTime(
+                       function () use ( &$count, &$wallClock ) {
+                               $wallClock += 1;
+                               ++$count;
+
+                               return $count >= 2 ? WaitConditionLoop::CONDITION_REACHED : false;
+                       },
+                       7.0,
+                       $this->newBusyWork( $x, $y, $z, $wallClock )
+               );
+               $this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke(),
+                       "Busy work did not cause timeout" );
+               $this->assertEquals( [ 1, 0, 0 ], [ $x, $y, $z ] );
+
+               $count = 0;
+               $loop = new WaitConditionLoopFakeTime(
+                       function () use ( &$count, &$wallClock ) {
+                               $wallClock += .1;
+                               ++$count;
+
+                               return $count > 80 ? true : false;
+                       },
+                       50.0,
+                       $this->newBusyWork( $x, $y, $z, $wallClock, $dontCallMe, $badCalls )
+               );
+               $this->assertEquals( 0, $badCalls, "Callback exception not yet called" );
+               $this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke() );
+               $this->assertEquals( [ 1, 1, 1 ], [ $x, $y, $z ], "Busy work done" );
+               $this->assertEquals( 1, $badCalls, "Bad callback ran and was exception caught" );
+
+               try {
+                       $e = null;
+                       $dontCallMe();
+               } catch ( Exception $e ) {
+               }
+
+               $this->assertInstanceOf( 'RunTimeException', $e );
+               $this->assertEquals( 1, $badCalls, "Callback exception cached" );
+       }
+
+       public function testCallbackTimeout() {
+               $count = 0;
+               $wallClock = microtime( true );
+               $loop = new WaitConditionLoopFakeTime(
+                       function () use ( &$count, &$wallClock ) {
+                               $wallClock += 3;
+                               ++$count;
+
+                               return $count > 300 ? true : false;
+                       },
+                       50.0,
+                       $this->newBusyWork( $x, $y, $z, $wallClock )
+               );
+               $loop->setWallClock( $wallClock );
+               $this->assertEquals( $loop::CONDITION_TIMED_OUT, $loop->invoke() );
+               $this->assertEquals( [ 1, 1, 1 ], [ $x, $y, $z ], "Busy work done" );
+       }
+
+       public function testCallbackAborted() {
+               $x = 0;
+               $wallClock = microtime( true );
+               $loop = new WaitConditionLoopFakeTime(
+                       function () use ( &$x, &$wallClock ) {
+                               $wallClock += 2;
+                               ++$x;
+
+                               return $x > 2 ? WaitConditionLoop::CONDITION_ABORTED : false;
+                       },
+                       10.0,
+                       $this->newBusyWork( $x, $y, $z, $wallClock )
+               );
+               $loop->setWallClock( $wallClock );
+               $this->assertEquals( $loop::CONDITION_ABORTED, $loop->invoke() );
+       }
+
+       private function newBusyWork(
+               &$x, &$y, &$z, &$wallClock = 1, &$dontCallMe = null, &$badCalls = 0
+       ) {
+               $x = $y = $z = 0;
+               $badCalls = 0;
+
+               $list = [];
+               $list[] = function () use ( &$x, &$wallClock ) {
+                       $wallClock += 1;
+
+                       return ++$x;
+               };
+               $dontCallMe = function () use ( &$badCalls ) {
+                       ++$badCalls;
+                       throw new RuntimeException( "TrollyMcTrollFace" );
+               };
+               $list[] =& $dontCallMe;
+               $list[] = function () use ( &$y, &$wallClock ) {
+                       $wallClock += 15;
+
+                       return ++$y;
+               };
+               $list[] = function () use ( &$z, &$wallClock ) {
+                       $wallClock += 0.1;
+
+                       return ++$z;
+               };
+
+               return $list;
+       }
+}