*/
class ReplicatedBagOStuff extends BagOStuff {
/** @var BagOStuff */
- protected $writeStore;
+ private $writeStore;
/** @var BagOStuff */
- protected $readStore;
+ private $readStore;
+
+ /** @var int Seconds to read from the master source for a key after writing to it */
+ private $consistencyWindow;
+ /** @var float[] Map of (key => UNIX timestamp) */
+ private $lastKeyWrites = [];
+
+ /** @var int Max expected delay (in seconds) for writes to reach replicas */
+ const MAX_WRITE_DELAY = 5;
/**
* Constructor. Parameters are:
- * - writeFactory : ObjectFactory::getObjectFromSpec array yeilding BagOStuff.
- * This object will be used for writes (e.g. the master DB).
- * - readFactory : ObjectFactory::getObjectFromSpec array yeilding BagOStuff.
- * This object will be used for reads (e.g. a replica DB).
+ * - writeFactory: ObjectFactory::getObjectFromSpec array yeilding BagOStuff.
+ * This object will be used for writes (e.g. the master DB).
+ * - readFactory: ObjectFactory::getObjectFromSpec array yeilding BagOStuff.
+ * This object will be used for reads (e.g. a replica DB).
+ * - sessionConsistencyWindow: Seconds to read from the master source for a key
+ * after writing to it. [Default: ReplicatedBagOStuff::MAX_WRITE_DELAY]
*
* @param array $params
* @throws InvalidArgumentException
if ( !isset( $params['writeFactory'] ) ) {
throw new InvalidArgumentException(
__METHOD__ . ': the "writeFactory" parameter is required' );
- }
- if ( !isset( $params['readFactory'] ) ) {
+ } elseif ( !isset( $params['readFactory'] ) ) {
throw new InvalidArgumentException(
__METHOD__ . ': the "readFactory" parameter is required' );
}
- $opts = [ 'reportDupes' => false ]; // redundant
+ $this->consistencyWindow = $params['sessionConsistencyWindow'] ?? self::MAX_WRITE_DELAY;
$this->writeStore = ( $params['writeFactory'] instanceof BagOStuff )
? $params['writeFactory']
- : ObjectFactory::getObjectFromSpec( $opts + $params['writeFactory'] );
+ : ObjectFactory::getObjectFromSpec( $params['writeFactory'] );
$this->readStore = ( $params['readFactory'] instanceof BagOStuff )
? $params['readFactory']
- : ObjectFactory::getObjectFromSpec( $opts + $params['readFactory'] );
+ : ObjectFactory::getObjectFromSpec( $params['readFactory'] );
$this->attrMap = $this->mergeFlagMaps( [ $this->readStore, $this->writeStore ] );
}
}
public function get( $key, $flags = 0 ) {
- return $this->fieldHasFlags( $flags, self::READ_LATEST )
+ return (
+ $this->hadRecentSessionWrite( [ $key ] ) ||
+ $this->fieldHasFlags( $flags, self::READ_LATEST )
+ )
? $this->writeStore->get( $key, $flags )
: $this->readStore->get( $key, $flags );
}
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( [ $key ] );
+
return $this->writeStore->set( $key, $value, $exptime, $flags );
}
public function delete( $key, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( [ $key ] );
+
return $this->writeStore->delete( $key, $flags );
}
public function add( $key, $value, $exptime = 0, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( [ $key ] );
+
return $this->writeStore->add( $key, $value, $exptime, $flags );
}
public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( [ $key ] );
+
return $this->writeStore->merge( $key, $callback, $exptime, $attempts, $flags );
}
public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( [ $key ] );
+
return $this->writeStore->changeTTL( $key, $exptime, $flags );
}
}
public function getMulti( array $keys, $flags = 0 ) {
- return $this->fieldHasFlags( $flags, self::READ_LATEST )
+ return (
+ $this->hadRecentSessionWrite( $keys ) ||
+ $this->fieldHasFlags( $flags, self::READ_LATEST )
+ )
? $this->writeStore->getMulti( $keys, $flags )
: $this->readStore->getMulti( $keys, $flags );
}
public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( array_keys( $data ) );
+
return $this->writeStore->setMulti( $data, $exptime, $flags );
}
public function deleteMulti( array $keys, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( $keys );
+
return $this->writeStore->deleteMulti( $keys, $flags );
}
public function changeTTLMulti( array $keys, $exptime, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( $keys );
+
return $this->writeStore->changeTTLMulti( $keys, $exptime, $flags );
}
public function incr( $key, $value = 1, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( [ $key ] );
+
return $this->writeStore->incr( $key, $value, $flags );
}
public function decr( $key, $value = 1, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( [ $key ] );
+
return $this->writeStore->decr( $key, $value, $flags );
}
public function incrWithInit( $key, $exptime, $value = 1, $init = null, $flags = 0 ) {
+ $this->remarkRecentSessionWrite( [ $key ] );
+
return $this->writeStore->incrWithInit( $key, $exptime, $value, $init, $flags );
}
public function getLastError() {
- return ( $this->writeStore->getLastError() != self::ERR_NONE )
+ return ( $this->writeStore->getLastError() !== self::ERR_NONE )
? $this->writeStore->getLastError()
: $this->readStore->getLastError();
}
$this->writeStore->setMockTime( $time );
$this->readStore->setMockTime( $time );
}
+
+ /**
+ * @param string[] $keys
+ * @return bool
+ */
+ private function hadRecentSessionWrite( array $keys ) {
+ $now = $this->getCurrentTime();
+ foreach ( $keys as $key ) {
+ $ts = $this->lastKeyWrites[$key] ?? 0;
+ if ( $ts && ( $now - $ts ) <= $this->consistencyWindow ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param string[] $keys
+ */
+ private function remarkRecentSessionWrite( array $keys ) {
+ $now = $this->getCurrentTime();
+ foreach ( $keys as $key ) {
+ unset( $this->lastKeyWrites[$key] ); // move to the end
+ $this->lastKeyWrites[$key] = $now;
+ }
+ // Prune out the map if the first key is obsolete
+ if ( ( $now - reset( $this->lastKeyWrites ) ) > $this->consistencyWindow ) {
+ $this->lastKeyWrites = array_filter(
+ $this->lastKeyWrites,
+ function ( $timestamp ) use ( $now ) {
+ return ( ( $now - $timestamp ) <= $this->consistencyWindow );
+ }
+ );
+ }
+ }
}