From: Aaron Schulz Date: Mon, 27 Apr 2015 19:35:58 +0000 (-0700) Subject: Moved WAN cache and relayer to /libs X-Git-Tag: 1.31.0-rc.0~11581^2 X-Git-Url: http://git.cyclocoop.org/fichier?a=commitdiff_plain;h=c7bf955bca42c3a8937db9593f3f36414b163ace;p=lhc%2Fweb%2Fwiklou.git Moved WAN cache and relayer to /libs Change-Id: I9586d22890ce1cda7f74869ff701de73d65eaaef --- diff --git a/autoload.php b/autoload.php index 71bc7b2e9f..81ee8b1833 100644 --- a/autoload.php +++ b/autoload.php @@ -382,9 +382,9 @@ $wgAutoloadLocalClasses = array( 'EnqueueJob' => __DIR__ . '/includes/jobqueue/jobs/EnqueueJob.php', 'EraseArchivedFile' => __DIR__ . '/maintenance/eraseArchivedFile.php', 'ErrorPageError' => __DIR__ . '/includes/exception/ErrorPageError.php', - 'EventRelayer' => __DIR__ . '/includes/eventrelayer/EventRelayer.php', - 'EventRelayerMCRD' => __DIR__ . '/includes/eventrelayer/EventRelayerMCRD.php', - 'EventRelayerNull' => __DIR__ . '/includes/eventrelayer/EventRelayer.php', + 'EventRelayer' => __DIR__ . '/includes/libs/eventrelayer/EventRelayer.php', + 'EventRelayerMCRD' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerMCRD.php', + 'EventRelayerNull' => __DIR__ . '/includes/libs/eventrelayer/EventRelayer.php', 'Exif' => __DIR__ . '/includes/media/Exif.php', 'ExifBitmapHandler' => __DIR__ . '/includes/media/ExifBitmap.php', 'ExplodeIterator' => __DIR__ . '/includes/libs/ExplodeIterator.php', @@ -1316,7 +1316,7 @@ $wgAutoloadLocalClasses = array( 'ViewAction' => __DIR__ . '/includes/actions/ViewAction.php', 'VirtualRESTService' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTService.php', 'VirtualRESTServiceClient' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTServiceClient.php', - 'WANObjectCache' => __DIR__ . '/includes/objectcache/WANObjectCache.php', + 'WANObjectCache' => __DIR__ . '/includes/libs/objectcache/WANObjectCache.php', 'WaitForSlave' => __DIR__ . '/maintenance/waitForSlave.php', 'WantedCategoriesPage' => __DIR__ . '/includes/specials/SpecialWantedcategories.php', 'WantedFilesPage' => __DIR__ . '/includes/specials/SpecialWantedfiles.php', diff --git a/includes/eventrelayer/EventRelayer.php b/includes/eventrelayer/EventRelayer.php deleted file mode 100755 index a3d214f80c..0000000000 --- a/includes/eventrelayer/EventRelayer.php +++ /dev/null @@ -1,64 +0,0 @@ -doNotify( $channel, array( $event ) ); - } - - /** - * @param string $channel - * @param array $events List of event data maps - * @return bool Success - */ - final public function notifyMulti( $channel, $events ) { - return $this->doNotify( $channel, $events ); - } - - /** - * @param string $channel - * @param array $events List of event data maps - * @return bool Success - */ - abstract protected function doNotify( $channel, array $events ); -} - -/** - * No-op class for publishing messages into a PubSub system - */ -class EventRelayerNull extends EventRelayer { - public function doNotify( $channel, array $events ) { - return true; - } -} diff --git a/includes/eventrelayer/EventRelayerMCRD.php b/includes/eventrelayer/EventRelayerMCRD.php deleted file mode 100755 index 1e8b2a4058..0000000000 --- a/includes/eventrelayer/EventRelayerMCRD.php +++ /dev/null @@ -1,66 +0,0 @@ -baseUrl = $params['mcrdConfig']['url']; - - $httpConfig = isset( $params['httpConfig'] ) ? $params['httpConfig'] : array(); - if ( !isset( $httpConfig['connTimeout'] ) ) { - $httpConfig['connTimeout'] = 1; - } - if ( !isset( $httpConfig['reqTimeout'] ) ) { - $httpConfig['reqTimeout'] = .25; - } - - $this->http = new MultiHttpClient( $httpConfig ); - } - - protected function doNotify( $channel, array $events ) { - if ( !count( $events ) ) { - return true; - } - - $response = $this->http->run( array( - 'url' => "{$this->baseUrl}/relayer/api/v1.0/" . rawurlencode( $channel ), - 'method' => 'POST', - 'body' => json_encode( array( 'events' => $events ) ), - 'headers' => array( 'content-type' => 'application/json' ) - ) ); - - return $response['code'] == 201; - } -} diff --git a/includes/libs/eventrelayer/EventRelayer.php b/includes/libs/eventrelayer/EventRelayer.php new file mode 100755 index 0000000000..a3d214f80c --- /dev/null +++ b/includes/libs/eventrelayer/EventRelayer.php @@ -0,0 +1,64 @@ +doNotify( $channel, array( $event ) ); + } + + /** + * @param string $channel + * @param array $events List of event data maps + * @return bool Success + */ + final public function notifyMulti( $channel, $events ) { + return $this->doNotify( $channel, $events ); + } + + /** + * @param string $channel + * @param array $events List of event data maps + * @return bool Success + */ + abstract protected function doNotify( $channel, array $events ); +} + +/** + * No-op class for publishing messages into a PubSub system + */ +class EventRelayerNull extends EventRelayer { + public function doNotify( $channel, array $events ) { + return true; + } +} diff --git a/includes/libs/eventrelayer/EventRelayerMCRD.php b/includes/libs/eventrelayer/EventRelayerMCRD.php new file mode 100755 index 0000000000..1e8b2a4058 --- /dev/null +++ b/includes/libs/eventrelayer/EventRelayerMCRD.php @@ -0,0 +1,66 @@ +baseUrl = $params['mcrdConfig']['url']; + + $httpConfig = isset( $params['httpConfig'] ) ? $params['httpConfig'] : array(); + if ( !isset( $httpConfig['connTimeout'] ) ) { + $httpConfig['connTimeout'] = 1; + } + if ( !isset( $httpConfig['reqTimeout'] ) ) { + $httpConfig['reqTimeout'] = .25; + } + + $this->http = new MultiHttpClient( $httpConfig ); + } + + protected function doNotify( $channel, array $events ) { + if ( !count( $events ) ) { + return true; + } + + $response = $this->http->run( array( + 'url' => "{$this->baseUrl}/relayer/api/v1.0/" . rawurlencode( $channel ), + 'method' => 'POST', + 'body' => json_encode( array( 'events' => $events ) ), + 'headers' => array( 'content-type' => 'application/json' ) + ) ); + + return $response['code'] == 201; + } +} diff --git a/includes/libs/objectcache/WANObjectCache.php b/includes/libs/objectcache/WANObjectCache.php new file mode 100755 index 0000000000..6e5cad6257 --- /dev/null +++ b/includes/libs/objectcache/WANObjectCache.php @@ -0,0 +1,600 @@ +cache = $params['cache']; + $this->pool = $params['pool']; + $this->relayer = $params['relayer']; + } + + /** + * Fetch the value of a key from cache + * + * If passed in, $curTTL is set to the remaining TTL (current time left): + * - a) INF; if the key exists and has no TTL + * - b) float (>=0); if the key exists and has a TTL + * - c) float (<0); if the key is tombstoned or expired by $checkKeys + * - d) null; if the key does not exist and is not tombstoned + * + * If a key is tombstoned, $curTTL will reflect the time since delete(). + * + * The timestamp of $key will be checked against the last-purge timestamp + * of each of $checkKeys. Those $checkKeys not in cache will have the last-purge + * initialized to the current timestamp. If any of $checkKeys have a timestamp + * greater than that of $key, then $curTTL will reflect how long ago $key + * became invalid. Callers can use $curTTL to know when the value is stale. + * The $checkKeys parameter allow mass invalidations by updating a single key: + * - a) Each "check" key represents "last purged" of some source data + * - b) Callers pass in relevant "check" keys as $checkKeys in get() + * - c) When the source data that "check" keys represent changes, + * the touchCheckKey() method is called on them + * + * For keys that are hot/expensive, consider using getWithSetCallback() instead. + * + * @param string $key Cache key + * @param mixed $curTTL Approximate TTL left on the key if present [returned] + * @param array $checkKeys List of "check" keys + * @return mixed Value of cache key or false on failure + */ + final public function get( $key, &$curTTL = null, array $checkKeys = array() ) { + $curTTLs = array(); + $values = $this->getMulti( array( $key ), $curTTLs, $checkKeys ); + $curTTL = isset( $curTTLs[$key] ) ? $curTTLs[$key] : null; + + return isset( $values[$key] ) ? $values[$key] : false; + } + + /** + * Fetch the value of several keys from cache + * + * @see WANObjectCache::get() + * + * @param array $keys List of cache keys + * @param array $curTTLs Map of (key => approximate TTL left) for existing keys [returned] + * @param array $checkKeys List of "check" keys + * @return array Map of (key => value) for keys that exist + */ + final public function getMulti( + array $keys, &$curTTLs = array(), array $checkKeys = array() + ) { + $result = array(); + $curTTLs = array(); + + $vPrefixLen = strlen( self::VALUE_KEY_PREFIX ); + $valueKeys = self::prefixCacheKeys( $keys, self::VALUE_KEY_PREFIX ); + $checkKeys = self::prefixCacheKeys( $checkKeys, self::TIME_KEY_PREFIX ); + + // Fetch all of the raw values + $wrappedValues = $this->cache->getMulti( array_merge( $valueKeys, $checkKeys ) ); + $now = microtime( true ); + + // Get/initialize the timestamp of all the "check" keys + $checkKeyTimes = array(); + foreach ( $checkKeys as $checkKey ) { + $timestamp = isset( $wrappedValues[$checkKey] ) + ? self::parsePurgeValue( $wrappedValues[$checkKey] ) + : false; + if ( !is_float( $timestamp ) ) { + // Key is not set or invalid; regenerate + $this->cache->add( $checkKey, + self::PURGE_VAL_PREFIX . $now, self::CHECK_KEY_TTL ); + $timestamp = $now; + } + + $checkKeyTimes[] = $timestamp; + } + + // Get the main cache value for each key and validate them + foreach ( $valueKeys as $vKey ) { + if ( !isset( $wrappedValues[$vKey] ) ) { + continue; // not found + } + + $key = substr( $vKey, $vPrefixLen ); // unprefix + + list( $value, $curTTL ) = $this->unwrap( $wrappedValues[$vKey], $now ); + if ( $value !== false ) { + $result[$key] = $value; + foreach ( $checkKeyTimes as $checkKeyTime ) { + // Force dependant keys to be invalid for a while after purging + // to reduce race conditions involving stale data getting cached + $safeTimestamp = $checkKeyTime + self::HOLDOFF_TTL; + if ( $safeTimestamp >= $wrappedValues[$vKey][self::FLD_TIME] ) { + $curTTL = min( $curTTL, $checkKeyTime - $now ); + } + } + } + + $curTTLs[$key] = $curTTL; + } + + return $result; + } + + /** + * Set the value of a key from cache + * + * Simply calling this method when source data changes is not valid because + * the changes do not replicate to the other WAN sites. In that case, delete() + * should be used instead. This method is intended for use on cache misses. + * + * @param string $key Cache key + * @param mixed $value + * @param integer $ttl Seconds to live [0=forever] + * @return bool Success + */ + final public function set( $key, $value, $ttl = 0 ) { + $key = self::VALUE_KEY_PREFIX . $key; + $wrapped = $this->wrap( $value, $ttl ); + + $func = function ( $cache, $key, $cWrapped ) use ( $wrapped ) { + return ( is_string( $cWrapped ) ) + ? false // key is tombstoned; do nothing + : $wrapped; + }; + + return $this->cache->merge( $key, $func, $ttl, 1 ); + } + + /** + * Purge a key from all clusters + * + * This instantiates a hold-off period where the key cannot be + * written to avoid race conditions where dependent keys get updated + * with a stale value (e.g. from a DB slave). + * + * This should only be called when the underlying data (being cached) + * changes in a significant way. If called twice on the same key, then + * the last TTL takes precedence. + * + * @param string $key Cache key + * @param integer $ttl How long to block writes to the key [seconds] + * @return bool True if the item was purged or not found, false on failure + */ + final public function delete( $key, $ttl = self::HOLDOFF_TTL ) { + $key = self::VALUE_KEY_PREFIX . $key; + // Update the local cluster immediately + $ok = $this->cache->set( $key, self::PURGE_VAL_PREFIX . microtime( true ), $ttl ); + // Publish the purge to all clusters + return $this->relayPurge( $key, $ttl ) && $ok; + } + + /** + * Fetch the value of a timestamp "check" key + * + * @param string $key + * @return float|bool TS_UNIX timestamp of the key; false if not present + */ + final public function getCheckKeyTime( $key ) { + return self::parsePurgeValue( $this->cache->get( self::TIME_KEY_PREFIX . $key ) ); + } + + /** + * Purge a "check" key from all clusters, invalidating keys that use it + * + * This should only be called when the underlying data (being cached) + * changes in a significant way, and it is impractical to call delete() + * on all keys that should be changed. When get() is called on those + * keys, the relevant "check" keys must be supplied for this to work. + * + * The "check" key essentially represents a last-modified field. + * It is set in the future a few seconds when this is called, to + * avoid race conditions where dependent keys get updated with a + * stale value (e.g. from a DB slave). + * + * @see WANObjectCache::get() + * + * @param string $key Cache key + * @return bool True if the item was purged or not found, false on failure + */ + final public function touchCheckKey( $key ) { + $key = self::TIME_KEY_PREFIX . $key; + // Update the local cluster immediately + $ok = $this->cache->set( $key, + self::PURGE_VAL_PREFIX . microtime( true ), self::CHECK_KEY_TTL ); + // Publish the purge to all clusters + return $this->relayPurge( $key, self::CHECK_KEY_TTL ) && $ok; + } + + /** + * Method to fetch/regenerate cache keys + * + * On cache miss, the key will be set to the callback result. + * The callback function returns the new value given the current + * value (false if not present). If false is returned, then nothing + * will be saved to cache. + * + * Most callers should ignore the current value, but it can be used + * to maintain "most recent X" values that come from time or sequence + * based source data, provided that the "as of" id/time is tracked. + * + * Usage of $checkKeys is the same as with get(). + * + * The simplest way to avoid stampedes for hot keys is to use + * the 'lockTSE' option in $opts. If cache purges are needed, also: + * a) Pass $key into $checkKeys + * b) Use touchCheckKey( $key ) instead of delete( $key ) + * Following this pattern lets the old cache be used until a + * single thread updates it as needed. Also consider tweaking + * the 'lowTTL' parameter. + * + * Example usage: + * + * $key = wfMemcKey( 'cat-recent-actions', $catId ); + * // Function that derives the new key value given the old value + * $callback = function( $cValue ) { ... }; + * // Get the key value from cache or from source on cache miss; + * // try to only let one cluster thread manage doing cache updates + * $opts = array( 'lockTSE' => 5, 'lowTTL' => 10 ); + * $value = $cache->getWithSetCallback( $key, $callback, 60, array(), $opts ); + * + * + * Example usage: + * + * $key = wfMemcKey( 'cat-state', $catId ); + * // The "check" keys that represent things the value depends on; + * // Calling touchCheckKey() on them invalidates "cat-state" + * $checkKeys = array( + * wfMemcKey( 'water-bowls', $houseId ), + * wfMemcKey( 'food-bowls', $houseId ), + * wfMemcKey( 'people-present', $houseId ) + * ); + * // Function that derives the new key value + * $callback = function() { ... }; + * // Get the key value from cache or from source on cache miss; + * // try to only let one cluster thread manage doing cache updates + * $opts = array( 'lockTSE' => 5, 'lowTTL' => 10 ); + * $value = $cache->getWithSetCallback( $key, $callback, 60, $checkKeys, $opts ); + * + * + * @see WANObjectCache::get() + * + * @param string $key Cache key + * @param callable $callback Value generation function + * @param integer $ttl Seconds to live when the key is updated [0=forever] + * @param array $checkKeys List of "check" keys + * @param array $opts Options map: + * - lowTTL : consider pre-emptive updates when the current TTL (sec) + * of the key is less than this. It becomes more likely + * over time, becoming a certainty once the key is expired. + * - lockTSE : if the key is tombstoned or expired less (by $checkKeys) + * than this many seconds ago, then try to have a single + * thread handle cache regeneration at any given time. + * Other threads will try to use stale values if possible. + * - tempTTL : when 'lockTSE' is set, this determines the TTL of the temp + * key used to cache values while a key is tombstoned. + * This avoids excessive regeneration of hot keys on delete() but + * may result in stale values. + * @return mixed Value to use for the key + */ + final public function getWithSetCallback( + $key, $callback, $ttl, array $checkKeys = array(), array $opts = array() + ) { + $lowTTL = isset( $opts['lowTTL'] ) ? $opts['lowTTL'] : min( 10, $ttl ); + $lockTSE = isset( $opts['lockTSE'] ) ? $opts['lockTSE'] : -1; + $tempTTL = isset( $opts['tempTTL'] ) ? $opts['tempTTL'] : 5; + + // Get the current key value + $curTTL = null; + $cValue = $this->get( $key, $curTTL, $checkKeys ); // current value + $value = $cValue; // return value + + // Determine if a regeneration is desired + if ( $value !== false && $curTTL > 0 && !$this->worthRefresh( $curTTL, $lowTTL ) ) { + return $value; + } + + if ( !is_callable( $callback ) ) { + throw new InvalidArgumentException( "Invalid cache miss callback provided." ); + } + + // Assume a key is hot if requested soon after invalidation + $isHot = ( $curTTL !== null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE ); + $isTombstone = ( $curTTL !== null && $value === false ); + + $locked = false; + if ( $isHot || $isTombstone ) { + // Acquire a cluster-local non-blocking lock + if ( $this->cache->lock( $key, 0, self::LOCK_TTL ) ) { + // Lock acquired; this thread should update the key + $locked = true; + } elseif ( $value !== false ) { + // If it cannot be acquired; then the stale value can be used + return $value; + } else { + // Either another thread has the lock or the lock failed. + // Use the stash value, which is likely from the prior thread. + $value = $this->cache->get( self::STASH_KEY_PREFIX . $key ); + // Regenerate on timeout or if the other thread failed + if ( $value !== false ) { + return $value; + } + } + } + + // Generate the new value from the callback... + $value = call_user_func( $callback, $cValue ); + // When delete() is called, writes are write-holed by the tombstone, + // so use a special stash key to pass the new value around threads. + if ( $value !== false && ( $isHot || $isTombstone ) ) { + $this->cache->set( self::STASH_KEY_PREFIX . $key, $value, $tempTTL ); + } + + if ( $locked ) { + $this->cache->unlock( $key ); + } + + if ( $value !== false ) { + // Update the cache; this will fail if the key is tombstoned + $this->set( $key, $value, $ttl ); + } + + return $value; + } + + /** + * Get the "last error" registered; clearLastError() should be called manually + * @return int ERR_* constant for the "last error" registry + */ + final public function getLastError() { + if ( $this->lastRelayError ) { + // If the cache and the relayer failed, focus on the later. + // An update not making it to the relayer means it won't show up + // in other DCs (nor will consistent re-hashing see up-to-date values). + // On the other hand, if just the cache update failed, then it should + // eventually be applied by the relayer. + return $this->lastRelayError; + } + + $code = $this->cache->getLastError(); + switch ( $code ) { + case BagOStuff::ERR_NONE: + return self::ERR_NONE; + case BagOStuff::ERR_NO_RESPONSE: + return self::ERR_NO_RESPONSE; + case BagOStuff::ERR_UNREACHABLE: + return self::ERR_UNREACHABLE; + default: + return self::ERR_UNEXPECTED; + } + } + + /** + * Clear the "last error" registry + */ + final public function clearLastError() { + $this->cache->clearLastError(); + $this->lastRelayError = self::ERR_NONE; + } + + /** + * Do the actual async bus purge of a key + * + * This must set the key to "PURGED:" + * + * @param string $key Cache key + * @param integer $ttl How long to keep the tombstone [seconds] + * @return bool Success + */ + protected function relayPurge( $key, $ttl ) { + $event = $this->cache->modifySimpleRelayEvent( array( + 'cmd' => 'set', + 'key' => $key, + 'val' => 'PURGED:$UNIXTIME$', + 'ttl' => max( $ttl, 1 ), + 'sbt' => true, // substitute $UNIXTIME$ with actual microtime + ) ); + + $ok = $this->relayer->notify( "{$this->pool}:purge", $event ); + if ( !$ok ) { + $this->lastRelayError = self::ERR_RELAY; + } + + return $ok; + } + + /** + * Check if a key should be regenerated (using random probability) + * + * This returns false if $curTTL >= $lowTTL. Otherwise, the chance + * of returning true increases steadily from 0% to 100% as the $curTTL + * moves from $lowTTL to 0 seconds. This handles widely varying + * levels of cache access traffic. + * + * @param float|INF $curTTL Approximate TTL left on the key if present + * @param float $lowTTL Consider a refresh when $curTTL is less than this + * @return bool + */ + protected function worthRefresh( $curTTL, $lowTTL ) { + if ( $curTTL >= $lowTTL ) { + return false; + } elseif ( $curTTL <= 0 ) { + return true; + } + + $chance = ( 1 - $curTTL / $lowTTL ); + + return mt_rand( 1, 1e9 ) <= 1e9 * $chance; + } + + /** + * Do not use this method outside WANObjectCache + * + * @param mixed $value + * @param integer $ttl [0=forever] + * @return string + */ + protected function wrap( $value, $ttl ) { + return array( + self::FLD_VERSION => self::VERSION, + self::FLD_VALUE => $value, + self::FLD_TTL => $ttl, + self::FLD_TIME => microtime( true ) + ); + } + + /** + * Do not use this method outside WANObjectCache + * + * @param array|string|bool $wrapped + * @param float $now Unix Current timestamp (preferrable pre-query) + * @return array (mixed; false if absent/invalid, current time left) + */ + protected function unwrap( $wrapped, $now ) { + // Check if the value is a tombstone + $purgeTimestamp = self::parsePurgeValue( $wrapped ); + if ( is_float( $purgeTimestamp ) ) { + // Purged values should always have a negative current $ttl + $curTTL = min( -0.000001, $purgeTimestamp - $now ); + return array( false, $curTTL ); + } + + if ( !is_array( $wrapped ) // not found + || !isset( $wrapped[self::FLD_VERSION] ) // wrong format + || $wrapped[self::FLD_VERSION] !== self::VERSION // wrong version + ) { + return array( false, null ); + } + + if ( $wrapped[self::FLD_TTL] > 0 ) { + // Get the approximate time left on the key + $age = $now - $wrapped[self::FLD_TIME]; + $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 ); + } else { + // Key had no TTL, so the time left is unbounded + $curTTL = INF; + } + + return array( $wrapped[self::FLD_VALUE], $curTTL ); + } + + /** + * @param array $keys + * @param string $prefix + * @return string + */ + protected static function prefixCacheKeys( array $keys, $prefix ) { + $res = array(); + foreach ( $keys as $key ) { + $res[] = $prefix . $key; + } + + return $res; + } + + /** + * @param string $value String like "PURGED:" + * @return float|bool UNIX timestamp or false on failure + */ + protected static function parsePurgeValue( $value ) { + $m = array(); + if ( is_string( $value ) && + preg_match( '/^' . self::PURGE_VAL_PREFIX . '([^:]+)$/', $value, $m ) + ) { + return (float)$m[1]; + } else { + return false; + } + } +} diff --git a/includes/objectcache/WANObjectCache.php b/includes/objectcache/WANObjectCache.php deleted file mode 100755 index 6e5cad6257..0000000000 --- a/includes/objectcache/WANObjectCache.php +++ /dev/null @@ -1,600 +0,0 @@ -cache = $params['cache']; - $this->pool = $params['pool']; - $this->relayer = $params['relayer']; - } - - /** - * Fetch the value of a key from cache - * - * If passed in, $curTTL is set to the remaining TTL (current time left): - * - a) INF; if the key exists and has no TTL - * - b) float (>=0); if the key exists and has a TTL - * - c) float (<0); if the key is tombstoned or expired by $checkKeys - * - d) null; if the key does not exist and is not tombstoned - * - * If a key is tombstoned, $curTTL will reflect the time since delete(). - * - * The timestamp of $key will be checked against the last-purge timestamp - * of each of $checkKeys. Those $checkKeys not in cache will have the last-purge - * initialized to the current timestamp. If any of $checkKeys have a timestamp - * greater than that of $key, then $curTTL will reflect how long ago $key - * became invalid. Callers can use $curTTL to know when the value is stale. - * The $checkKeys parameter allow mass invalidations by updating a single key: - * - a) Each "check" key represents "last purged" of some source data - * - b) Callers pass in relevant "check" keys as $checkKeys in get() - * - c) When the source data that "check" keys represent changes, - * the touchCheckKey() method is called on them - * - * For keys that are hot/expensive, consider using getWithSetCallback() instead. - * - * @param string $key Cache key - * @param mixed $curTTL Approximate TTL left on the key if present [returned] - * @param array $checkKeys List of "check" keys - * @return mixed Value of cache key or false on failure - */ - final public function get( $key, &$curTTL = null, array $checkKeys = array() ) { - $curTTLs = array(); - $values = $this->getMulti( array( $key ), $curTTLs, $checkKeys ); - $curTTL = isset( $curTTLs[$key] ) ? $curTTLs[$key] : null; - - return isset( $values[$key] ) ? $values[$key] : false; - } - - /** - * Fetch the value of several keys from cache - * - * @see WANObjectCache::get() - * - * @param array $keys List of cache keys - * @param array $curTTLs Map of (key => approximate TTL left) for existing keys [returned] - * @param array $checkKeys List of "check" keys - * @return array Map of (key => value) for keys that exist - */ - final public function getMulti( - array $keys, &$curTTLs = array(), array $checkKeys = array() - ) { - $result = array(); - $curTTLs = array(); - - $vPrefixLen = strlen( self::VALUE_KEY_PREFIX ); - $valueKeys = self::prefixCacheKeys( $keys, self::VALUE_KEY_PREFIX ); - $checkKeys = self::prefixCacheKeys( $checkKeys, self::TIME_KEY_PREFIX ); - - // Fetch all of the raw values - $wrappedValues = $this->cache->getMulti( array_merge( $valueKeys, $checkKeys ) ); - $now = microtime( true ); - - // Get/initialize the timestamp of all the "check" keys - $checkKeyTimes = array(); - foreach ( $checkKeys as $checkKey ) { - $timestamp = isset( $wrappedValues[$checkKey] ) - ? self::parsePurgeValue( $wrappedValues[$checkKey] ) - : false; - if ( !is_float( $timestamp ) ) { - // Key is not set or invalid; regenerate - $this->cache->add( $checkKey, - self::PURGE_VAL_PREFIX . $now, self::CHECK_KEY_TTL ); - $timestamp = $now; - } - - $checkKeyTimes[] = $timestamp; - } - - // Get the main cache value for each key and validate them - foreach ( $valueKeys as $vKey ) { - if ( !isset( $wrappedValues[$vKey] ) ) { - continue; // not found - } - - $key = substr( $vKey, $vPrefixLen ); // unprefix - - list( $value, $curTTL ) = $this->unwrap( $wrappedValues[$vKey], $now ); - if ( $value !== false ) { - $result[$key] = $value; - foreach ( $checkKeyTimes as $checkKeyTime ) { - // Force dependant keys to be invalid for a while after purging - // to reduce race conditions involving stale data getting cached - $safeTimestamp = $checkKeyTime + self::HOLDOFF_TTL; - if ( $safeTimestamp >= $wrappedValues[$vKey][self::FLD_TIME] ) { - $curTTL = min( $curTTL, $checkKeyTime - $now ); - } - } - } - - $curTTLs[$key] = $curTTL; - } - - return $result; - } - - /** - * Set the value of a key from cache - * - * Simply calling this method when source data changes is not valid because - * the changes do not replicate to the other WAN sites. In that case, delete() - * should be used instead. This method is intended for use on cache misses. - * - * @param string $key Cache key - * @param mixed $value - * @param integer $ttl Seconds to live [0=forever] - * @return bool Success - */ - final public function set( $key, $value, $ttl = 0 ) { - $key = self::VALUE_KEY_PREFIX . $key; - $wrapped = $this->wrap( $value, $ttl ); - - $func = function ( $cache, $key, $cWrapped ) use ( $wrapped ) { - return ( is_string( $cWrapped ) ) - ? false // key is tombstoned; do nothing - : $wrapped; - }; - - return $this->cache->merge( $key, $func, $ttl, 1 ); - } - - /** - * Purge a key from all clusters - * - * This instantiates a hold-off period where the key cannot be - * written to avoid race conditions where dependent keys get updated - * with a stale value (e.g. from a DB slave). - * - * This should only be called when the underlying data (being cached) - * changes in a significant way. If called twice on the same key, then - * the last TTL takes precedence. - * - * @param string $key Cache key - * @param integer $ttl How long to block writes to the key [seconds] - * @return bool True if the item was purged or not found, false on failure - */ - final public function delete( $key, $ttl = self::HOLDOFF_TTL ) { - $key = self::VALUE_KEY_PREFIX . $key; - // Update the local cluster immediately - $ok = $this->cache->set( $key, self::PURGE_VAL_PREFIX . microtime( true ), $ttl ); - // Publish the purge to all clusters - return $this->relayPurge( $key, $ttl ) && $ok; - } - - /** - * Fetch the value of a timestamp "check" key - * - * @param string $key - * @return float|bool TS_UNIX timestamp of the key; false if not present - */ - final public function getCheckKeyTime( $key ) { - return self::parsePurgeValue( $this->cache->get( self::TIME_KEY_PREFIX . $key ) ); - } - - /** - * Purge a "check" key from all clusters, invalidating keys that use it - * - * This should only be called when the underlying data (being cached) - * changes in a significant way, and it is impractical to call delete() - * on all keys that should be changed. When get() is called on those - * keys, the relevant "check" keys must be supplied for this to work. - * - * The "check" key essentially represents a last-modified field. - * It is set in the future a few seconds when this is called, to - * avoid race conditions where dependent keys get updated with a - * stale value (e.g. from a DB slave). - * - * @see WANObjectCache::get() - * - * @param string $key Cache key - * @return bool True if the item was purged or not found, false on failure - */ - final public function touchCheckKey( $key ) { - $key = self::TIME_KEY_PREFIX . $key; - // Update the local cluster immediately - $ok = $this->cache->set( $key, - self::PURGE_VAL_PREFIX . microtime( true ), self::CHECK_KEY_TTL ); - // Publish the purge to all clusters - return $this->relayPurge( $key, self::CHECK_KEY_TTL ) && $ok; - } - - /** - * Method to fetch/regenerate cache keys - * - * On cache miss, the key will be set to the callback result. - * The callback function returns the new value given the current - * value (false if not present). If false is returned, then nothing - * will be saved to cache. - * - * Most callers should ignore the current value, but it can be used - * to maintain "most recent X" values that come from time or sequence - * based source data, provided that the "as of" id/time is tracked. - * - * Usage of $checkKeys is the same as with get(). - * - * The simplest way to avoid stampedes for hot keys is to use - * the 'lockTSE' option in $opts. If cache purges are needed, also: - * a) Pass $key into $checkKeys - * b) Use touchCheckKey( $key ) instead of delete( $key ) - * Following this pattern lets the old cache be used until a - * single thread updates it as needed. Also consider tweaking - * the 'lowTTL' parameter. - * - * Example usage: - * - * $key = wfMemcKey( 'cat-recent-actions', $catId ); - * // Function that derives the new key value given the old value - * $callback = function( $cValue ) { ... }; - * // Get the key value from cache or from source on cache miss; - * // try to only let one cluster thread manage doing cache updates - * $opts = array( 'lockTSE' => 5, 'lowTTL' => 10 ); - * $value = $cache->getWithSetCallback( $key, $callback, 60, array(), $opts ); - * - * - * Example usage: - * - * $key = wfMemcKey( 'cat-state', $catId ); - * // The "check" keys that represent things the value depends on; - * // Calling touchCheckKey() on them invalidates "cat-state" - * $checkKeys = array( - * wfMemcKey( 'water-bowls', $houseId ), - * wfMemcKey( 'food-bowls', $houseId ), - * wfMemcKey( 'people-present', $houseId ) - * ); - * // Function that derives the new key value - * $callback = function() { ... }; - * // Get the key value from cache or from source on cache miss; - * // try to only let one cluster thread manage doing cache updates - * $opts = array( 'lockTSE' => 5, 'lowTTL' => 10 ); - * $value = $cache->getWithSetCallback( $key, $callback, 60, $checkKeys, $opts ); - * - * - * @see WANObjectCache::get() - * - * @param string $key Cache key - * @param callable $callback Value generation function - * @param integer $ttl Seconds to live when the key is updated [0=forever] - * @param array $checkKeys List of "check" keys - * @param array $opts Options map: - * - lowTTL : consider pre-emptive updates when the current TTL (sec) - * of the key is less than this. It becomes more likely - * over time, becoming a certainty once the key is expired. - * - lockTSE : if the key is tombstoned or expired less (by $checkKeys) - * than this many seconds ago, then try to have a single - * thread handle cache regeneration at any given time. - * Other threads will try to use stale values if possible. - * - tempTTL : when 'lockTSE' is set, this determines the TTL of the temp - * key used to cache values while a key is tombstoned. - * This avoids excessive regeneration of hot keys on delete() but - * may result in stale values. - * @return mixed Value to use for the key - */ - final public function getWithSetCallback( - $key, $callback, $ttl, array $checkKeys = array(), array $opts = array() - ) { - $lowTTL = isset( $opts['lowTTL'] ) ? $opts['lowTTL'] : min( 10, $ttl ); - $lockTSE = isset( $opts['lockTSE'] ) ? $opts['lockTSE'] : -1; - $tempTTL = isset( $opts['tempTTL'] ) ? $opts['tempTTL'] : 5; - - // Get the current key value - $curTTL = null; - $cValue = $this->get( $key, $curTTL, $checkKeys ); // current value - $value = $cValue; // return value - - // Determine if a regeneration is desired - if ( $value !== false && $curTTL > 0 && !$this->worthRefresh( $curTTL, $lowTTL ) ) { - return $value; - } - - if ( !is_callable( $callback ) ) { - throw new InvalidArgumentException( "Invalid cache miss callback provided." ); - } - - // Assume a key is hot if requested soon after invalidation - $isHot = ( $curTTL !== null && $curTTL <= 0 && abs( $curTTL ) <= $lockTSE ); - $isTombstone = ( $curTTL !== null && $value === false ); - - $locked = false; - if ( $isHot || $isTombstone ) { - // Acquire a cluster-local non-blocking lock - if ( $this->cache->lock( $key, 0, self::LOCK_TTL ) ) { - // Lock acquired; this thread should update the key - $locked = true; - } elseif ( $value !== false ) { - // If it cannot be acquired; then the stale value can be used - return $value; - } else { - // Either another thread has the lock or the lock failed. - // Use the stash value, which is likely from the prior thread. - $value = $this->cache->get( self::STASH_KEY_PREFIX . $key ); - // Regenerate on timeout or if the other thread failed - if ( $value !== false ) { - return $value; - } - } - } - - // Generate the new value from the callback... - $value = call_user_func( $callback, $cValue ); - // When delete() is called, writes are write-holed by the tombstone, - // so use a special stash key to pass the new value around threads. - if ( $value !== false && ( $isHot || $isTombstone ) ) { - $this->cache->set( self::STASH_KEY_PREFIX . $key, $value, $tempTTL ); - } - - if ( $locked ) { - $this->cache->unlock( $key ); - } - - if ( $value !== false ) { - // Update the cache; this will fail if the key is tombstoned - $this->set( $key, $value, $ttl ); - } - - return $value; - } - - /** - * Get the "last error" registered; clearLastError() should be called manually - * @return int ERR_* constant for the "last error" registry - */ - final public function getLastError() { - if ( $this->lastRelayError ) { - // If the cache and the relayer failed, focus on the later. - // An update not making it to the relayer means it won't show up - // in other DCs (nor will consistent re-hashing see up-to-date values). - // On the other hand, if just the cache update failed, then it should - // eventually be applied by the relayer. - return $this->lastRelayError; - } - - $code = $this->cache->getLastError(); - switch ( $code ) { - case BagOStuff::ERR_NONE: - return self::ERR_NONE; - case BagOStuff::ERR_NO_RESPONSE: - return self::ERR_NO_RESPONSE; - case BagOStuff::ERR_UNREACHABLE: - return self::ERR_UNREACHABLE; - default: - return self::ERR_UNEXPECTED; - } - } - - /** - * Clear the "last error" registry - */ - final public function clearLastError() { - $this->cache->clearLastError(); - $this->lastRelayError = self::ERR_NONE; - } - - /** - * Do the actual async bus purge of a key - * - * This must set the key to "PURGED:" - * - * @param string $key Cache key - * @param integer $ttl How long to keep the tombstone [seconds] - * @return bool Success - */ - protected function relayPurge( $key, $ttl ) { - $event = $this->cache->modifySimpleRelayEvent( array( - 'cmd' => 'set', - 'key' => $key, - 'val' => 'PURGED:$UNIXTIME$', - 'ttl' => max( $ttl, 1 ), - 'sbt' => true, // substitute $UNIXTIME$ with actual microtime - ) ); - - $ok = $this->relayer->notify( "{$this->pool}:purge", $event ); - if ( !$ok ) { - $this->lastRelayError = self::ERR_RELAY; - } - - return $ok; - } - - /** - * Check if a key should be regenerated (using random probability) - * - * This returns false if $curTTL >= $lowTTL. Otherwise, the chance - * of returning true increases steadily from 0% to 100% as the $curTTL - * moves from $lowTTL to 0 seconds. This handles widely varying - * levels of cache access traffic. - * - * @param float|INF $curTTL Approximate TTL left on the key if present - * @param float $lowTTL Consider a refresh when $curTTL is less than this - * @return bool - */ - protected function worthRefresh( $curTTL, $lowTTL ) { - if ( $curTTL >= $lowTTL ) { - return false; - } elseif ( $curTTL <= 0 ) { - return true; - } - - $chance = ( 1 - $curTTL / $lowTTL ); - - return mt_rand( 1, 1e9 ) <= 1e9 * $chance; - } - - /** - * Do not use this method outside WANObjectCache - * - * @param mixed $value - * @param integer $ttl [0=forever] - * @return string - */ - protected function wrap( $value, $ttl ) { - return array( - self::FLD_VERSION => self::VERSION, - self::FLD_VALUE => $value, - self::FLD_TTL => $ttl, - self::FLD_TIME => microtime( true ) - ); - } - - /** - * Do not use this method outside WANObjectCache - * - * @param array|string|bool $wrapped - * @param float $now Unix Current timestamp (preferrable pre-query) - * @return array (mixed; false if absent/invalid, current time left) - */ - protected function unwrap( $wrapped, $now ) { - // Check if the value is a tombstone - $purgeTimestamp = self::parsePurgeValue( $wrapped ); - if ( is_float( $purgeTimestamp ) ) { - // Purged values should always have a negative current $ttl - $curTTL = min( -0.000001, $purgeTimestamp - $now ); - return array( false, $curTTL ); - } - - if ( !is_array( $wrapped ) // not found - || !isset( $wrapped[self::FLD_VERSION] ) // wrong format - || $wrapped[self::FLD_VERSION] !== self::VERSION // wrong version - ) { - return array( false, null ); - } - - if ( $wrapped[self::FLD_TTL] > 0 ) { - // Get the approximate time left on the key - $age = $now - $wrapped[self::FLD_TIME]; - $curTTL = max( $wrapped[self::FLD_TTL] - $age, 0.0 ); - } else { - // Key had no TTL, so the time left is unbounded - $curTTL = INF; - } - - return array( $wrapped[self::FLD_VALUE], $curTTL ); - } - - /** - * @param array $keys - * @param string $prefix - * @return string - */ - protected static function prefixCacheKeys( array $keys, $prefix ) { - $res = array(); - foreach ( $keys as $key ) { - $res[] = $prefix . $key; - } - - return $res; - } - - /** - * @param string $value String like "PURGED:" - * @return float|bool UNIX timestamp or false on failure - */ - protected static function parsePurgeValue( $value ) { - $m = array(); - if ( is_string( $value ) && - preg_match( '/^' . self::PURGE_VAL_PREFIX . '([^:]+)$/', $value, $m ) - ) { - return (float)$m[1]; - } else { - return false; - } - } -}