*/
public function lock( $lockName, $method, $timeout = 5 ) {
$key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
- for ( $attempts = 1; $attempts <= $timeout; ++$attempts ) {
- $result = $this->query(
- "SELECT pg_try_advisory_lock($key) AS lockstatus", $method );
- $row = $this->fetchObject( $result );
- if ( $row->lockstatus === 't' ) {
- parent::lock( $lockName, $method, $timeout ); // record
- return true;
- } else {
- sleep( 1 );
- }
- }
+ $loop = new WaitConditionLoop(
+ function () use ( $lockName, $key, $timeout, $method ) {
+ $res = $this->query( "SELECT pg_try_advisory_lock($key) AS lockstatus", $method );
+ $row = $this->fetchObject( $res );
+ if ( $row->lockstatus === 't' ) {
+ parent::lock( $lockName, $method, $timeout ); // record
+ return true;
+ }
- wfDebug( __METHOD__ . " failed to acquire lock\n" );
+ return WaitConditionLoop::CONDITION_CONTINUE;
+ },
+ $timeout
+ );
- return false;
+ return ( $loop->invoke() === $loop::CONDITION_REACHED );
}
/**
*/
final public function lockByType( array $pathsByType, $timeout = 0 ) {
$pathsByType = $this->normalizePathsByType( $pathsByType );
- $msleep = [ 0, 50, 100, 300, 500 ]; // retry backoff times
- $start = microtime( true );
- do {
- $status = $this->doLockByType( $pathsByType );
- $elapsed = microtime( true ) - $start;
- if ( $status->isOK() || $elapsed >= $timeout || $elapsed < 0 ) {
- break; // success, timeout, or clock set back
- }
- usleep( 1e3 * ( next( $msleep ) ?: 1000 ) ); // use 1 sec after enough times
- $elapsed = microtime( true ) - $start;
- } while ( $elapsed < $timeout && $elapsed >= 0 );
+
+ $status = null;
+ $loop = new WaitConditionLoop(
+ function () use ( &$status, $pathsByType ) {
+ $status = $this->doLockByType( $pathsByType );
+
+ return $status->isOK() ?: WaitConditionLoop::CONDITION_CONTINUE;
+ },
+ $timeout
+ );
+ $loop->invoke();
return $status;
}
* @return MemcachedBagOStuff|null
*/
protected function getCache( $lockSrv ) {
+ /** @var BagOStuff $memc */
$memc = null;
if ( isset( $this->bagOStuffs[$lockSrv] ) ) {
$memc = $this->bagOStuffs[$lockSrv];
// Try to quickly loop to acquire the keys, but back off after a few rounds.
// This reduces memcached spam, especially in the rare case where a server acquires
// some lock keys and dies without releasing them. Lock keys expire after a few minutes.
- $rounds = 0;
- $start = microtime( true );
- do {
- if ( ( ++$rounds % 4 ) == 0 ) {
- usleep( 1000 * 50 ); // 50 ms
- }
- foreach ( array_diff( $keys, $lockedKeys ) as $key ) {
- if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record
- $lockedKeys[] = $key;
- } else {
- continue; // acquire in order
+ $loop = new WaitConditionLoop(
+ function () use ( $memc, $keys, &$lockedKeys ) {
+ foreach ( array_diff( $keys, $lockedKeys ) as $key ) {
+ if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record
+ $lockedKeys[] = $key;
+ }
}
- }
- } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 3 );
+
+ return array_diff( $keys, $lockedKeys )
+ ? WaitConditionLoop::CONDITION_CONTINUE
+ : true;
+ },
+ 3.0 // timeout
+ );
+ $loop->invoke();
if ( count( $lockedKeys ) != count( $keys ) ) {
$this->releaseMutexes( $memc, $lockedKeys ); // failed; release what was locked
const CONDITION_REACHED = 1;
const CONDITION_CONTINUE = 0; // evaluates as falsey
- const CONDITION_TIMED_OUT = -1;
- const CONDITION_ABORTED = -2;
+ const CONDITION_FAILED = -1;
+ const CONDITION_TIMED_OUT = -2;
+ const CONDITION_ABORTED = -3;
/**
* @param callable $condition Callback that returns a WaitConditionLoop::CONDITION_ constant
/**
* Invoke the loop and continue until either:
- * - a) The condition callback does not return either CONDITION_CONTINUE or true
+ * - a) The condition callback returns neither CONDITION_CONTINUE nor false
* - b) The timeout is reached
* This a condition callback can return true (stop) or false (continue) for convenience.
* In such cases, the halting result of "true" will be converted to CONDITION_REACHED.
*
+ * If $timeout is 0, then only the condition callback will be called (no busy callbacks),
+ * and this will immediately return CONDITION_FAILED if the condition was not met.
+ *
* Exceptions in callbacks will be caught and the callback will be swapped with
* one that simply rethrows that exception back to the caller when invoked.
*
$checkResult = call_user_func( $this->condition );
$cpu = $this->getCpuTime() - $cpuStart;
$real = $this->getWallTime() - $realStart;
- // Exit if the condition is reached
- if ( (int)$checkResult !== self::CONDITION_CONTINUE ) {
- $finalResult = is_int( $checkResult ) ? $checkResult : self::CONDITION_REACHED;
+ // Exit if the condition is reached, and error occurs, or this is non-blocking
+ if ( $this->timeout <= 0 ) {
+ $finalResult = $checkResult ? self::CONDITION_REACHED : self::CONDITION_FAILED;
+ break;
+ } elseif ( (int)$checkResult !== self::CONDITION_CONTINUE ) {
+ if ( is_int( $checkResult ) ) {
+ $finalResult = $checkResult;
+ } else {
+ $finalResult = self::CONDITION_REACHED;
+ }
break;
} elseif ( $lastCheck ) {
- break; // timeout
+ break; // timeout reached
}
// Detect if condition callback seems to block or if justs burns CPU
$conditionUsesInterrupts = ( $real > 0.100 && $cpu <= $real * .03 );
}
$expiry = min( $expiry ?: INF, self::TTL_DAY );
-
- $this->clearLastError();
- $timestamp = microtime( true ); // starting UNIX timestamp
- if ( $this->add( "{$key}:lock", 1, $expiry ) ) {
- $locked = true;
- } elseif ( $this->getLastError() || $timeout <= 0 ) {
- $locked = false; // network partition or non-blocking
- } else {
- // Estimate the RTT (us); use 1ms minimum for sanity
- $uRTT = max( 1e3, ceil( 1e6 * ( microtime( true ) - $timestamp ) ) );
- $sleep = 2 * $uRTT; // rough time to do get()+set()
-
- $attempts = 0; // failed attempts
- do {
- if ( ++$attempts >= 3 && $sleep <= 5e5 ) {
- // Exponentially back off after failed attempts to avoid network spam.
- // About 2*$uRTT*(2^n-1) us of "sleep" happen for the next n attempts.
- $sleep *= 2;
- }
- usleep( $sleep ); // back off
+ $loop = new WaitConditionLoop(
+ function () use ( $key, $timeout, $expiry ) {
$this->clearLastError();
- $locked = $this->add( "{$key}:lock", 1, $expiry );
- if ( $this->getLastError() ) {
- $locked = false; // network partition
- break;
+ if ( $this->add( "{$key}:lock", 1, $expiry ) ) {
+ return true; // locked!
+ } elseif ( $this->getLastError() ) {
+ return WaitConditionLoop::CONDITION_ABORTED; // network partition?
}
- } while ( !$locked && ( microtime( true ) - $timestamp ) < $timeout );
- }
+ return WaitConditionLoop::CONDITION_CONTINUE;
+ },
+ $timeout
+ );
+
+ $locked = ( $loop->invoke() === $loop::CONDITION_REACHED );
if ( $locked ) {
$this->locks[$key] = [ 'class' => $rclass, 'depth' => 1 ];
}
$loop->setWallClock( $wallClock );
$this->assertEquals( $loop::CONDITION_TIMED_OUT, $loop->invoke() );
$this->assertEquals( [ 1, 1, 1 ], [ $x, $y, $z ], "Busy work done" );
+
+ $loop = new WaitConditionLoopFakeTime(
+ function () use ( &$count, &$wallClock ) {
+ $wallClock += 3;
+ ++$count;
+
+ return true;
+ },
+ 0.0,
+ $this->newBusyWork( $x, $y, $z, $wallClock )
+ );
+ $this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke() );
+
+ $count = 0;
+ $loop = new WaitConditionLoopFakeTime(
+ function () use ( &$count, &$wallClock ) {
+ $wallClock += 3;
+ ++$count;
+
+ return $count > 10 ? true : false;
+ },
+ 0,
+ $this->newBusyWork( $x, $y, $z, $wallClock )
+ );
+ $this->assertEquals( $loop::CONDITION_FAILED, $loop->invoke() );
}
public function testCallbackAborted() {