3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
21 use Wikimedia\ObjectFactory
;
24 * A cache class that directs writes to one set of servers and reads to
25 * another. This assumes that the servers used for reads are setup to replica DB
26 * those that writes go to. This can easily be used with redis for example.
28 * In the WAN scenario (e.g. multi-datacenter case), this is useful when
29 * writes are rare or they usually take place in the primary datacenter.
34 class ReplicatedBagOStuff
extends BagOStuff
{
40 /** @var int Seconds to read from the master source for a key after writing to it */
41 private $consistencyWindow;
42 /** @var float[] Map of (key => UNIX timestamp) */
43 private $lastKeyWrites = [];
45 /** @var int Max expected delay (in seconds) for writes to reach replicas */
46 const MAX_WRITE_DELAY
= 5;
49 * Constructor. Parameters are:
50 * - writeFactory: ObjectFactory::getObjectFromSpec array yielding BagOStuff.
51 * This object will be used for writes (e.g. the master DB).
52 * - readFactory: ObjectFactory::getObjectFromSpec array yielding BagOStuff.
53 * This object will be used for reads (e.g. a replica DB).
54 * - sessionConsistencyWindow: Seconds to read from the master source for a key
55 * after writing to it. [Default: ReplicatedBagOStuff::MAX_WRITE_DELAY]
57 * @param array $params
58 * @throws InvalidArgumentException
60 public function __construct( $params ) {
61 parent
::__construct( $params );
63 if ( !isset( $params['writeFactory'] ) ) {
64 throw new InvalidArgumentException(
65 __METHOD__
. ': the "writeFactory" parameter is required' );
66 } elseif ( !isset( $params['readFactory'] ) ) {
67 throw new InvalidArgumentException(
68 __METHOD__
. ': the "readFactory" parameter is required' );
71 $this->consistencyWindow
= $params['sessionConsistencyWindow'] ?? self
::MAX_WRITE_DELAY
;
72 $this->writeStore
= ( $params['writeFactory'] instanceof BagOStuff
)
73 ?
$params['writeFactory']
74 : ObjectFactory
::getObjectFromSpec( $params['writeFactory'] );
75 $this->readStore
= ( $params['readFactory'] instanceof BagOStuff
)
76 ?
$params['readFactory']
77 : ObjectFactory
::getObjectFromSpec( $params['readFactory'] );
78 $this->attrMap
= $this->mergeFlagMaps( [ $this->readStore
, $this->writeStore
] );
81 public function setDebug( $enabled ) {
82 parent
::setDebug( $enabled );
83 $this->writeStore
->setDebug( $enabled );
84 $this->readStore
->setDebug( $enabled );
87 public function get( $key, $flags = 0 ) {
89 $this->hadRecentSessionWrite( [ $key ] ) ||
90 $this->fieldHasFlags( $flags, self
::READ_LATEST
)
92 ?
$this->writeStore
->get( $key, $flags )
93 : $this->readStore
->get( $key, $flags );
96 public function set( $key, $value, $exptime = 0, $flags = 0 ) {
97 $this->remarkRecentSessionWrite( [ $key ] );
99 return $this->writeStore
->set( $key, $value, $exptime, $flags );
102 public function delete( $key, $flags = 0 ) {
103 $this->remarkRecentSessionWrite( [ $key ] );
105 return $this->writeStore
->delete( $key, $flags );
108 public function add( $key, $value, $exptime = 0, $flags = 0 ) {
109 $this->remarkRecentSessionWrite( [ $key ] );
111 return $this->writeStore
->add( $key, $value, $exptime, $flags );
114 public function merge( $key, callable
$callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
115 $this->remarkRecentSessionWrite( [ $key ] );
117 return $this->writeStore
->merge( $key, $callback, $exptime, $attempts, $flags );
120 public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
121 $this->remarkRecentSessionWrite( [ $key ] );
123 return $this->writeStore
->changeTTL( $key, $exptime, $flags );
126 public function lock( $key, $timeout = 6, $expiry = 6, $rclass = '' ) {
127 return $this->writeStore
->lock( $key, $timeout, $expiry, $rclass );
130 public function unlock( $key ) {
131 return $this->writeStore
->unlock( $key );
134 public function deleteObjectsExpiringBefore(
136 callable
$progress = null,
139 return $this->writeStore
->deleteObjectsExpiringBefore( $timestamp, $progress, $limit );
142 public function getMulti( array $keys, $flags = 0 ) {
144 $this->hadRecentSessionWrite( $keys ) ||
145 $this->fieldHasFlags( $flags, self
::READ_LATEST
)
147 ?
$this->writeStore
->getMulti( $keys, $flags )
148 : $this->readStore
->getMulti( $keys, $flags );
151 public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
152 $this->remarkRecentSessionWrite( array_keys( $data ) );
154 return $this->writeStore
->setMulti( $data, $exptime, $flags );
157 public function deleteMulti( array $keys, $flags = 0 ) {
158 $this->remarkRecentSessionWrite( $keys );
160 return $this->writeStore
->deleteMulti( $keys, $flags );
163 public function changeTTLMulti( array $keys, $exptime, $flags = 0 ) {
164 $this->remarkRecentSessionWrite( $keys );
166 return $this->writeStore
->changeTTLMulti( $keys, $exptime, $flags );
169 public function incr( $key, $value = 1, $flags = 0 ) {
170 $this->remarkRecentSessionWrite( [ $key ] );
172 return $this->writeStore
->incr( $key, $value, $flags );
175 public function decr( $key, $value = 1, $flags = 0 ) {
176 $this->remarkRecentSessionWrite( [ $key ] );
178 return $this->writeStore
->decr( $key, $value, $flags );
181 public function incrWithInit( $key, $exptime, $value = 1, $init = null, $flags = 0 ) {
182 $this->remarkRecentSessionWrite( [ $key ] );
184 return $this->writeStore
->incrWithInit( $key, $exptime, $value, $init, $flags );
187 public function getLastError() {
188 return ( $this->writeStore
->getLastError() !== self
::ERR_NONE
)
189 ?
$this->writeStore
->getLastError()
190 : $this->readStore
->getLastError();
193 public function clearLastError() {
194 $this->writeStore
->clearLastError();
195 $this->readStore
->clearLastError();
198 public function makeKeyInternal( $keyspace, $args ) {
199 return $this->writeStore
->makeKeyInternal( ...func_get_args() );
202 public function makeKey( $class, ...$components ) {
203 return $this->writeStore
->makeKey( ...func_get_args() );
206 public function makeGlobalKey( $class, ...$components ) {
207 return $this->writeStore
->makeGlobalKey( ...func_get_args() );
210 public function addBusyCallback( callable
$workCallback ) {
211 $this->writeStore
->addBusyCallback( $workCallback );
214 public function setMockTime( &$time ) {
215 parent
::setMockTime( $time );
216 $this->writeStore
->setMockTime( $time );
217 $this->readStore
->setMockTime( $time );
221 * @param string[] $keys
224 private function hadRecentSessionWrite( array $keys ) {
225 $now = $this->getCurrentTime();
226 foreach ( $keys as $key ) {
227 $ts = $this->lastKeyWrites
[$key] ??
0;
228 if ( $ts && ( $now - $ts ) <= $this->consistencyWindow
) {
237 * @param string[] $keys
239 private function remarkRecentSessionWrite( array $keys ) {
240 $now = $this->getCurrentTime();
241 foreach ( $keys as $key ) {
242 unset( $this->lastKeyWrites
[$key] ); // move to the end
243 $this->lastKeyWrites
[$key] = $now;
245 // Prune out the map if the first key is obsolete
246 if ( ( $now - reset( $this->lastKeyWrites
) ) > $this->consistencyWindow
) {
247 $this->lastKeyWrites
= array_filter(
248 $this->lastKeyWrites
,
249 function ( $timestamp ) use ( $now ) {
250 return ( ( $now - $timestamp ) <= $this->consistencyWindow
);