3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
22 namespace Wikimedia\Rdbms
;
24 use Psr\Log\LoggerInterface
;
25 use Psr\Log\NullLogger
;
26 use Wikimedia\ScopedCallback
;
31 * Basic DB load monitor with no external dependencies
32 * Uses memcached to cache the replication lag for a short time
36 class LoadMonitor
implements ILoadMonitor
{
37 /** @var ILoadBalancer */
41 /** @var WANObjectCache */
43 /** @var LoggerInterface */
44 protected $replLogger;
46 /** @var float Moving average ratio (e.g. 0.1 for 10% weight to new weight) */
47 private $movingAveRatio;
48 /** @var int Amount of replication lag in seconds before warnings are logged */
49 private $lagWarnThreshold;
51 /** @var int cache key version */
53 /** @var int Default 'max lag' in seconds when unspecified */
54 const LAG_WARN_THRESHOLD
= 10;
57 * @param ILoadBalancer $lb
58 * @param BagOStuff $srvCache
59 * @param WANObjectCache $wCache
60 * @param array $options
61 * - movingAveRatio: moving average constant for server weight updates based on lag
62 * - lagWarnThreshold: how many seconds of lag trigger warnings
64 public function __construct(
65 ILoadBalancer
$lb, BagOStuff
$srvCache, WANObjectCache
$wCache, array $options = []
68 $this->srvCache
= $srvCache;
69 $this->wanCache
= $wCache;
70 $this->replLogger
= new NullLogger();
72 $this->movingAveRatio
= $options['movingAveRatio'] ??
0.1;
73 $this->lagWarnThreshold
= $options['lagWarnThreshold'] ?? self
::LAG_WARN_THRESHOLD
;
76 public function setLogger( LoggerInterface
$logger ) {
77 $this->replLogger
= $logger;
80 final public function scaleLoads( array &$weightByServer, $domain ) {
81 $serverIndexes = array_keys( $weightByServer );
82 $states = $this->getServerStates( $serverIndexes, $domain );
83 $newScalesByServer = $states['weightScales'];
84 foreach ( $weightByServer as $i => $weight ) {
85 if ( isset( $newScalesByServer[$i] ) ) {
86 $weightByServer[$i] = $weight * $newScalesByServer[$i];
87 } else { // server recently added to config?
88 $host = $this->lb
->getServerName( $i );
89 $this->replLogger
->error( __METHOD__
. ": host $host not in cache" );
94 final public function getLagTimes( array $serverIndexes, $domain ) {
95 return $this->getServerStates( $serverIndexes, $domain )['lagTimes'];
98 protected function getServerStates( array $serverIndexes, $domain ) {
99 $writerIndex = $this->lb
->getWriterIndex();
100 if ( count( $serverIndexes ) == 1 && reset( $serverIndexes ) == $writerIndex ) {
101 # Single server only, just return zero without caching
103 'lagTimes' => [ $writerIndex => 0 ],
104 'weightScales' => [ $writerIndex => 1.0 ]
108 $key = $this->getCacheKey( $serverIndexes );
109 # Randomize TTLs to reduce stampedes (4.0 - 5.0 sec)
110 // @phan-suppress-next-line PhanTypeMismatchArgumentInternal
111 $ttl = mt_rand( 4e6
, 5e6
) / 1e6
;
112 # Keep keys around longer as fallbacks
115 # (a) Check the local APC cache
116 $value = $this->srvCache
->get( $key );
117 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
118 $this->replLogger
->debug( __METHOD__
. ": got lag times ($key) from local cache" );
119 return $value; // cache hit
121 $staleValue = $value ?
: false;
123 # (b) Check the shared cache and backfill APC
124 $value = $this->wanCache
->get( $key );
125 if ( $value && $value['timestamp'] > ( microtime( true ) - $ttl ) ) {
126 $this->srvCache
->set( $key, $value, $staleTTL );
127 $this->replLogger
->debug( __METHOD__
. ": got lag times ($key) from main cache" );
129 return $value; // cache hit
131 $staleValue = $value ?
: $staleValue;
133 # (c) Cache key missing or expired; regenerate and backfill
134 if ( $this->srvCache
->lock( $key, 0, 10 ) ) {
135 # Let only this process update the cache value on this server
136 $sCache = $this->srvCache
;
137 /** @noinspection PhpUnusedLocalVariableInspection */
138 $unlocker = new ScopedCallback( function () use ( $sCache, $key ) {
139 $sCache->unlock( $key );
141 } elseif ( $staleValue ) {
142 # Could not acquire lock but an old cache exists, so use it
148 $movAveRatio = $this->movingAveRatio
;
149 foreach ( $serverIndexes as $i ) {
150 if ( $i == $this->lb
->getWriterIndex() ) {
151 $lagTimes[$i] = 0; // master always has no lag
152 $weightScales[$i] = 1.0; // nominal weight
156 # Handles with open transactions are avoided since they might be subject
157 # to REPEATABLE-READ snapshots, which could affect the lag estimate query.
158 $flags = ILoadBalancer
::CONN_TRX_AUTOCOMMIT | ILoadBalancer
::CONN_SILENCE_ERRORS
;
159 $conn = $this->lb
->getAnyOpenConnection( $i, $flags );
161 $close = false; // already open
163 // Get a connection to this server without triggering other server connections
164 $conn = $this->lb
->getServerConnection( $i, ILoadBalancer
::DOMAIN_ANY
, $flags );
165 $close = true; // new connection
168 $lastWeight = $staleValue['weightScales'][$i] ??
1.0;
169 $coefficient = $this->getWeightScale( $i, $conn ?
: null );
170 $newWeight = $movAveRatio * $coefficient +
( 1 - $movAveRatio ) * $lastWeight;
172 // Scale from 10% to 100% of nominal weight
173 $weightScales[$i] = max( $newWeight, 0.10 );
175 $host = $this->lb
->getServerName( $i );
178 $lagTimes[$i] = false;
179 $this->replLogger
->error(
180 __METHOD__
. ": host {db_server} is unreachable",
181 [ 'db_server' => $host ]
186 $lagTimes[$i] = $conn->getLag();
187 if ( $lagTimes[$i] === false ) {
188 $this->replLogger
->error(
189 __METHOD__
. ": host {db_server} is not replicating?",
190 [ 'db_server' => $host ]
192 } elseif ( $lagTimes[$i] > $this->lagWarnThreshold
) {
193 $this->replLogger
->warning(
194 "Server {host} has {lag} seconds of lag (>= {maxlag})",
197 'lag' => $lagTimes[$i],
198 'maxlag' => $this->lagWarnThreshold
204 # Close the connection to avoid sleeper connections piling up.
205 # Note that the caller will pick one of these DBs and reconnect,
206 # which is slightly inefficient, but this only matters for the lag
207 # time cache miss cache, which is far less common that cache hits.
208 $this->lb
->closeConnection( $conn );
212 # Add a timestamp key so we know when it was cached
214 'lagTimes' => $lagTimes,
215 'weightScales' => $weightScales,
216 'timestamp' => microtime( true )
218 $this->wanCache
->set( $key, $value, $staleTTL );
219 $this->srvCache
->set( $key, $value, $staleTTL );
220 $this->replLogger
->info( __METHOD__
. ": re-calculated lag times ($key)" );
226 * @param int $index Server index
227 * @param IDatabase|null $conn Connection handle or null on connection failure
230 protected function getWeightScale( $index, IDatabase
$conn = null ) {
231 return $conn ?
1.0 : 0.0;
234 private function getCacheKey( array $serverIndexes ) {
235 sort( $serverIndexes );
236 // Lag is per-server, not per-DB, so key on the master DB name
237 return $this->srvCache
->makeGlobalKey(
240 $this->lb
->getServerName( $this->lb
->getWriterIndex() ),
241 implode( '-', $serverIndexes )