/* WHERE */ [ 'cl_from' => $this->next ],
__METHOD__ . '-1'
);
- foreach ( $res as $o ) {
- $k = $o->cl_to;
+ foreach ( $res as $row ) {
+ $k = $row->cl_to;
# Update parent tree
- if ( !isset( $this->parents[$o->cl_from] ) ) {
- $this->parents[$o->cl_from] = [];
+ if ( !isset( $this->parents[$row->cl_from] ) ) {
+ $this->parents[$row->cl_from] = [];
}
- $this->parents[$o->cl_from][$k] = $o;
+ $this->parents[$row->cl_from][$k] = $row;
# Ignore those we already have
if ( in_array( $k, $this->deadend ) ) {
/* WHERE */ [ 'page_namespace' => NS_CATEGORY, 'page_title' => $layer ],
__METHOD__ . '-2'
);
- foreach ( $res as $o ) {
- $id = $o->page_id;
- $name = $o->page_title;
+ foreach ( $res as $row ) {
+ $id = $row->page_id;
+ $name = $row->page_title;
$this->name2id[$name] = $id;
$this->next[] = $id;
unset( $layer[$name] );
$wgEnableWANCacheReaper = false;
/**
- * Main object stash type. This should be a fast storage system for storing
- * lightweight data like hit counters and user activity. Sites with multiple
- * data-centers should have this use a store that replicates all writes. The
- * store should have enough consistency for CAS operations to be usable.
- * Reads outside of those needed for merge() may be eventually consistent.
+ * The object store type of the main stash.
+ *
+ * This store should be a very fast storage system optimized for holding lightweight data
+ * like incrementable hit counters and current user activity. The store should replicate the
+ * dataset among all data-centers. Any add(), merge(), lock(), and unlock() operations should
+ * maintain "best effort" linearizability; as long as connectivity is strong, latency is low,
+ * and there is no eviction pressure prompted by low free space, those operations should be
+ * linearizable. In terms of PACELC (https://en.wikipedia.org/wiki/PACELC_theorem), the store
+ * should act as a PA/EL distributed system for these operations. One optimization for these
+ * operations is to route them to a "primary" data-center (e.g. one that serves HTTP POST) for
+ * synchronous execution and then replicate to the others asynchronously. This means that at
+ * least calls to these operations during HTTP POST requests would quickly return.
+ *
+ * All other operations, such as get(), set(), delete(), changeTTL(), incr(), and decr(),
+ * should be synchronous in the local data-center, replicating asynchronously to the others.
+ * This behavior can be overriden by the use of the WRITE_SYNC and READ_LATEST flags.
+ *
+ * The store should *preferably* have eventual consistency to handle network partitions.
+ *
+ * Modules that rely on the stash should be prepared for:
+ * - add(), merge(), lock(), and unlock() to be slower than other write operations,
+ * at least in "secondary" data-centers (e.g. one that only serves HTTP GET/HEAD)
+ * - Other write operations to have race conditions accross data-centers
+ * - Read operations to have race conditions accross data-centers
+ * - Consistency to be either eventual (with Last-Write-Wins) or just "best effort"
+ *
+ * In general, this means avoiding updates during idempotent HTTP requests (GET/HEAD) and
+ * avoiding assumptions of true linearizability (e.g. accepting anomalies). Modules that need
+ * these kind of guarantees should use other storage mediums.
*
* The options are:
* - db: Store cache objects in the DB
* up going to the HashBagOStuff used for the in-memory cache).
*
* @ingroup Cache
- * @TODO: Make this class use composition instead of calling super
*/
-class CachedBagOStuff extends HashBagOStuff {
+class CachedBagOStuff extends BagOStuff {
/** @var BagOStuff */
protected $backend;
+ /** @var HashBagOStuff */
+ protected $procCache;
/**
* @param BagOStuff $backend Permanent backend to use
* @param array $params Parameters for HashBagOStuff
*/
- function __construct( BagOStuff $backend, $params = [] ) {
+ public function __construct( BagOStuff $backend, $params = [] ) {
unset( $params['reportDupes'] ); // useless here
parent::__construct( $params );
$this->backend = $backend;
+ $this->procCache = new HashBagOStuff( $params );
$this->attrMap = $backend->attrMap;
}
- public function get( $key, $flags = 0 ) {
- $ret = parent::get( $key, $flags );
- if ( $ret === false && !$this->hasKey( $key ) ) {
+ protected function doGet( $key, $flags = 0, &$casToken = null ) {
+ $ret = $this->procCache->get( $key, $flags );
+ if ( $ret === false && !$this->procCache->hasKey( $key ) ) {
$ret = $this->backend->get( $key, $flags );
- $this->set( $key, $ret, 0, self::WRITE_CACHE_ONLY );
+ $this->set( $key, $ret, self::TTL_INDEFINITE, self::WRITE_CACHE_ONLY );
}
+
return $ret;
}
- public function set( $key, $value, $exptime = 0, $flags = 0 ) {
- parent::set( $key, $value, $exptime, $flags );
- if ( !( $flags & self::WRITE_CACHE_ONLY ) ) {
- $this->backend->set( $key, $value, $exptime, $flags & ~self::WRITE_CACHE_ONLY );
+ protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
+ $this->procCache->set( $key, $value, $exptime, $flags );
+ if ( ( $flags & self::WRITE_CACHE_ONLY ) != self::WRITE_CACHE_ONLY ) {
+ $this->backend->set( $key, $value, $exptime, $flags );
}
+
return true;
}
- public function delete( $key, $flags = 0 ) {
- parent::delete( $key, $flags );
- if ( !( $flags & self::WRITE_CACHE_ONLY ) ) {
- $this->backend->delete( $key );
+ protected function doDelete( $key, $flags = 0 ) {
+ $this->procCache->delete( $key, $flags );
+ if ( ( $flags & self::WRITE_CACHE_ONLY ) != self::WRITE_CACHE_ONLY ) {
+ $this->backend->delete( $key, $flags );
}
return true;
}
- public function setDebug( $bool ) {
- parent::setDebug( $bool );
- $this->backend->setDebug( $bool );
- }
-
public function deleteObjectsExpiringBefore(
$timestamp,
callable $progressCallback = null,
$limit = INF
) {
- parent::deleteObjectsExpiringBefore( $timestamp, $progressCallback, $limit );
+ $this->procCache->deleteObjectsExpiringBefore( $timestamp, $progressCallback, $limit );
return $this->backend->deleteObjectsExpiringBefore(
$timestamp,
);
}
- public function makeKeyInternal( $keyspace, $args ) {
- return $this->backend->makeKeyInternal( ...func_get_args() );
- }
-
- public function makeKey( $class, $component = null ) {
- return $this->backend->makeKey( ...func_get_args() );
- }
-
- public function makeGlobalKey( $class, $component = null ) {
- return $this->backend->makeGlobalKey( ...func_get_args() );
- }
-
// These just call the backend (tested elsewhere)
// @codeCoverageIgnoreStart
public function incr( $key, $value = 1 ) {
$n = $this->backend->incr( $key, $value );
- parent::delete( $key );
+
+ $this->procCache->delete( $key );
return $n;
}
return $this->backend->unlock( $key );
}
+ public function makeKeyInternal( $keyspace, $args ) {
+ return $this->backend->makeKeyInternal( ...func_get_args() );
+ }
+
+ public function makeKey( $class, $component = null ) {
+ return $this->backend->makeKey( ...func_get_args() );
+ }
+
+ public function makeGlobalKey( $class, $component = null ) {
+ return $this->backend->makeGlobalKey( ...func_get_args() );
+ }
+
+ public function setDebug( $bool ) {
+ parent::setDebug( $bool );
+ $this->backend->setDebug( $bool );
+ }
+
public function getLastError() {
return $this->backend->getLastError();
}
* @return bool
* @since 1.27
*/
- protected function hasKey( $key ) {
+ public function hasKey( $key ) {
return isset( $this->bag[$key] );
}
}
$this->segmentationSize = $params['maxPreferedKeySize'] ?? 917504; // < 1MiB
}
- /**
- * Fill in some defaults for missing keys in $params.
- *
- * @param array $params
- * @return array
- */
- protected function applyDefaultParams( $params ) {
- return $params + [
- 'compress_threshold' => 1500,
- 'connect_timeout' => 0.5,
- 'debug' => false
- ];
- }
-
/**
* Construct a cache key.
*
/**
* Available parameters are:
- * - servers: The list of IP:port combinations holding the memcached servers.
- * - persistent: Whether to use a persistent connection
- * - compress_threshold: The minimum size an object must be before it is compressed
- * - timeout: The read timeout in microseconds
- * - connect_timeout: The connect timeout in seconds
- * - retry_timeout: Time in seconds to wait before retrying a failed connect attempt
- * - server_failure_limit: Limit for server connect failures before it is removed
- * - serializer: May be either "php" or "igbinary". Igbinary produces more compact
- * values, but serialization is much slower unless the php.ini option
- * igbinary.compact_strings is off.
- * - use_binary_protocol Whether to enable the binary protocol (default is ASCII) (boolean)
+ * - servers: List of IP:port combinations holding the memcached servers.
+ * - persistent: Whether to use a persistent connection
+ * - compress_threshold: The minimum size an object must be before it is compressed
+ * - timeout: The read timeout in microseconds
+ * - connect_timeout: The connect timeout in seconds
+ * - retry_timeout: Time in seconds to wait before retrying a failed connect attempt
+ * - server_failure_limit: Limit for server connect failures before it is removed
+ * - serializer: Either "php" or "igbinary". Igbinary produces more compact
+ * values, but serialization is much slower unless the php.ini
+ * option igbinary.compact_strings is off.
+ * - use_binary_protocol Whether to enable the binary protocol (default is ASCII)
+ * - allow_tcp_nagle_delay Whether to permit Nagle's algorithm for reducing packet count
* @param array $params
- * @throws InvalidArgumentException
*/
function __construct( $params ) {
parent::__construct( $params );
- $params = $this->applyDefaultParams( $params );
+
+ // Default class-specific parameters
+ $params += [
+ 'compress_threshold' => 1500,
+ 'connect_timeout' => 0.5,
+ 'serializer' => 'php',
+ 'use_binary_protocol' => false,
+ 'allow_tcp_nagle_delay' => true
+ ];
if ( $params['persistent'] ) {
// The pool ID must be unique to the server/option combination.
// The Memcached object is essentially shared for each pool ID.
// We can only reuse a pool ID if we keep the config consistent.
- $this->client = new Memcached( md5( serialize( $params ) ) );
- if ( count( $this->client->getServerList() ) ) {
- $this->logger->debug( __METHOD__ . ": persistent Memcached object already loaded." );
- return; // already initialized; don't add duplicate servers
- }
+ $connectionPoolId = md5( serialize( $params ) );
+ $client = new Memcached( $connectionPoolId );
+ $this->initializeClient( $client, $params );
} else {
- $this->client = new Memcached;
+ $client = new Memcached;
+ $this->initializeClient( $client, $params );
}
- if ( $params['use_binary_protocol'] ) {
- $this->client->setOption( Memcached::OPT_BINARY_PROTOCOL, true );
- }
-
- if ( isset( $params['retry_timeout'] ) ) {
- $this->client->setOption( Memcached::OPT_RETRY_TIMEOUT, $params['retry_timeout'] );
- }
-
- if ( isset( $params['server_failure_limit'] ) ) {
- $this->client->setOption( Memcached::OPT_SERVER_FAILURE_LIMIT, $params['server_failure_limit'] );
- }
+ $this->client = $client;
// The compression threshold is an undocumented php.ini option for some
// reason. There's probably not much harm in setting it globally, for
// compatibility with the settings for the PHP client.
ini_set( 'memcached.compression_threshold', $params['compress_threshold'] );
+ }
- // Set timeouts
- $this->client->setOption( Memcached::OPT_CONNECT_TIMEOUT, $params['connect_timeout'] * 1000 );
- $this->client->setOption( Memcached::OPT_SEND_TIMEOUT, $params['timeout'] );
- $this->client->setOption( Memcached::OPT_RECV_TIMEOUT, $params['timeout'] );
- $this->client->setOption( Memcached::OPT_POLL_TIMEOUT, $params['timeout'] / 1000 );
+ /**
+ * Initialize the client only if needed and reuse it otherwise.
+ * This avoids duplicate servers in the list and new connections.
+ *
+ * @param Memcached $client
+ * @param array $params
+ * @throws RuntimeException
+ */
+ private function initializeClient( Memcached $client, array $params ) {
+ if ( $client->getServerList() ) {
+ $this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
- // Set libketama mode since it's recommended by the documentation and
- // is as good as any. There's no way to configure libmemcached to use
- // hashes identical to the ones currently in use by the PHP client, and
- // even implementing one of the libmemcached hashes in pure PHP for
- // forwards compatibility would require MemcachedClient::get_sock() to be
- // rewritten.
- $this->client->setOption( Memcached::OPT_LIBKETAMA_COMPATIBLE, true );
+ return; // preserve persistent handle
+ }
- // Set the serializer
- $ok = false;
+ $this->logger->debug( __METHOD__ . ": initializing new client instance." );
+
+ $options = [
+ // Network protocol (ASCII or binary)
+ Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
+ // Set various network timeouts
+ Memcached::OPT_CONNECT_TIMEOUT => $params['connect_timeout'] * 1000,
+ Memcached::OPT_SEND_TIMEOUT => $params['timeout'],
+ Memcached::OPT_RECV_TIMEOUT => $params['timeout'],
+ Memcached::OPT_POLL_TIMEOUT => $params['timeout'] / 1000,
+ // Avoid pointless delay when sending/fetching large blobs
+ Memcached::OPT_TCP_NODELAY => !$params['allow_tcp_nagle_delay'],
+ // Set libketama mode since it's recommended by the documentation
+ Memcached::OPT_LIBKETAMA_COMPATIBLE => true
+ ];
+ if ( isset( $params['retry_timeout'] ) ) {
+ $options[Memcached::OPT_RETRY_TIMEOUT] = $params['retry_timeout'];
+ }
+ if ( isset( $params['server_failure_limit'] ) ) {
+ $options[Memcached::OPT_SERVER_FAILURE_LIMIT] = $params['server_failure_limit'];
+ }
if ( $params['serializer'] === 'php' ) {
- $ok = $this->client->setOption( Memcached::OPT_SERIALIZER, Memcached::SERIALIZER_PHP );
+ $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_PHP;
} elseif ( $params['serializer'] === 'igbinary' ) {
if ( !Memcached::HAVE_IGBINARY ) {
- throw new InvalidArgumentException(
+ throw new RuntimeException(
__CLASS__ . ': the igbinary extension is not available ' .
'but igbinary serialization was requested.'
);
}
- $ok = $this->client->setOption( Memcached::OPT_SERIALIZER, Memcached::SERIALIZER_IGBINARY );
+ $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_IGBINARY;
}
- if ( !$ok ) {
- throw new InvalidArgumentException( __CLASS__ . ': invalid serializer parameter' );
+
+ if ( !$client->setOptions( $options ) ) {
+ throw new RuntimeException(
+ "Invalid options: " . json_encode( $options, JSON_PRETTY_PRINT )
+ );
}
$servers = [];
$servers[] = [ $host, false ]; // (ip or path, port)
}
}
- $this->client->addServers( $servers );
- }
-
- protected function applyDefaultParams( $params ) {
- $params = parent::applyDefaultParams( $params );
-
- if ( !isset( $params['use_binary_protocol'] ) ) {
- $params['use_binary_protocol'] = false;
- }
- if ( !isset( $params['serializer'] ) ) {
- $params['serializer'] = 'php';
+ if ( !$client->addServers( $servers ) ) {
+ throw new RuntimeException( "Failed to inject server address list" );
}
-
- return $params;
}
protected function doGet( $key, $flags = 0, &$casToken = null ) {
$this->debug( "get($key)" );
if ( defined( Memcached::class . '::GET_EXTENDED' ) ) { // v3.0.0
+ /** @noinspection PhpUndefinedClassConstantInspection */
$flags = Memcached::GET_EXTENDED;
$res = $this->client->get( $this->validateKeyEncoding( $key ), null, $flags );
if ( is_array( $res ) ) {
/**
* Available parameters are:
* - servers: The list of IP:port combinations holding the memcached servers.
- * - debug: Whether to set the debug flag in the underlying client.
* - persistent: Whether to use a persistent connection
* - compress_threshold: The minimum size an object must be before it is compressed
* - timeout: The read timeout in microseconds
*/
function __construct( $params ) {
parent::__construct( $params );
- $params = $this->applyDefaultParams( $params );
+
+ // Default class-specific parameters
+ $params += [
+ 'compress_threshold' => 1500,
+ 'connect_timeout' => 0.5
+ ];
$this->client = new MemcachedClient( $params );
$this->client->set_servers( $params['servers'] );
- $this->client->set_debug( $params['debug'] );
}
public function setDebug( $debug ) {
$this->server,
$this->user,
$this->password,
- $this->getDBname(),
- $this->dbSchema(),
+ $this->currentDomain->getDatabase(),
+ $this->currentDomain->getSchema(),
$this->tablePrefix()
);
$this->lastPing = microtime( true );
$this->server,
$this->user,
$this->password,
- $this->getDBname(),
- $this->dbSchema(),
+ $this->currentDomain->getDatabase(),
+ $this->currentDomain->getSchema(),
$this->tablePrefix()
);
$this->lastPing = microtime( true );
/**
* Get the cache object for the main stash.
*
- * Stash objects are BagOStuff instances suitable for storing light
- * weight data that is not canonically stored elsewhere (such as RDBMS).
- * Stashes should be configured to propagate changes to all data-centers.
- *
- * Callers should be prepared for:
- * - a) Writes to be slower in non-"primary" (e.g. HTTP GET/HEAD only) DCs
- * - b) Reads to be eventually consistent, e.g. for get()/getMulti()
- * In general, this means avoiding updates on idempotent HTTP requests and
- * avoiding an assumption of perfect serializability (or accepting anomalies).
- * Reads may be eventually consistent or data might rollback as nodes flap.
- * Callers can use BagOStuff:READ_LATEST to see the latest available data.
- *
* @return BagOStuff
* @since 1.26
* @deprecated Since 1.28 Use MediaWikiServices::getInstance()->getMainObjectStash()
public function execute() {
global $wgMainCacheType, $wgMemCachedTimeout, $wgObjectCaches;
+ $memcachedTypes = [ CACHE_MEMCACHED, 'memcached-php', 'memcached-pecl' ];
+
$cache = $this->getOption( 'cache' );
$iterations = $this->getOption( 'i', 100 );
if ( $cache ) {
$servers = $wgObjectCaches[$cache]['servers'];
} elseif ( $this->hasArg( 0 ) ) {
$servers = [ $this->getArg( 0 ) ];
- } elseif ( $wgMainCacheType === CACHE_MEMCACHED ) {
+ } elseif ( in_array( $wgMainCacheType, $memcachedTypes, true ) ) {
global $wgMemCachedServers;
$servers = $wgMemCachedServers;
} elseif ( isset( $wgObjectCaches[$wgMainCacheType]['servers'] ) ) {
/**
* Rebuild pass 1: Insert `recentchanges` entries for page revisions.
+ *
+ * @param ILBFactory $lbFactory
*/
private function rebuildRecentChangesTablePass1( ILBFactory $lbFactory ) {
$dbw = $this->getDB( DB_MASTER );
/**
* Rebuild pass 2: Enhance entries for page revisions with references to the previous revision
* (rc_last_oldid, rc_new etc.) and size differences (rc_old_len, rc_new_len).
+ *
+ * @param ILBFactory $lbFactory
*/
private function rebuildRecentChangesTablePass2( ILBFactory $lbFactory ) {
$dbw = $this->getDB( DB_MASTER );
$lastOldId = 0;
$lastSize = null;
$updated = 0;
- foreach ( $res as $obj ) {
+ foreach ( $res as $row ) {
$new = 0;
- if ( $obj->rc_cur_id != $lastCurId ) {
+ if ( $row->rc_cur_id != $lastCurId ) {
# Switch! Look up the previous last edit, if any
- $lastCurId = intval( $obj->rc_cur_id );
- $emit = $obj->rc_timestamp;
+ $lastCurId = intval( $row->rc_cur_id );
+ $emit = $row->rc_timestamp;
- $row = $dbw->selectRow(
+ $revRow = $dbw->selectRow(
'revision',
[ 'rev_id', 'rev_len' ],
[ 'rev_page' => $lastCurId, "rev_timestamp < " . $dbw->addQuotes( $emit ) ],
__METHOD__,
[ 'ORDER BY' => 'rev_timestamp DESC' ]
);
- if ( $row ) {
- $lastOldId = intval( $row->rev_id );
+ if ( $revRow ) {
+ $lastOldId = intval( $revRow->rev_id );
# Grab the last text size if available
- $lastSize = !is_null( $row->rev_len ) ? intval( $row->rev_len ) : null;
+ $lastSize = !is_null( $revRow->rev_len ) ? intval( $revRow->rev_len ) : null;
} else {
# No previous edit
$lastOldId = 0;
$size = (int)$dbw->selectField(
'revision',
'rev_len',
- [ 'rev_id' => $obj->rc_this_oldid ],
+ [ 'rev_id' => $row->rc_this_oldid ],
__METHOD__
);
],
[
'rc_cur_id' => $lastCurId,
- 'rc_this_oldid' => $obj->rc_this_oldid,
- 'rc_timestamp' => $obj->rc_timestamp // index usage
+ 'rc_this_oldid' => $row->rc_this_oldid,
+ 'rc_timestamp' => $row->rc_timestamp // index usage
],
__METHOD__
);
- $lastOldId = intval( $obj->rc_this_oldid );
+ $lastOldId = intval( $row->rc_this_oldid );
$lastSize = $size;
if ( ( ++$updated % $this->getBatchSize() ) == 0 ) {
/**
* Rebuild pass 3: Insert `recentchanges` entries for action logs.
+ *
+ * @param ILBFactory $lbFactory
*/
private function rebuildRecentChangesTablePass3( ILBFactory $lbFactory ) {
global $wgLogRestrictions, $wgFilterLogTypes;
/**
* Rebuild pass 4: Mark bot and autopatrolled entries.
+ *
+ * @param ILBFactory $lbFactory
*/
private function rebuildRecentChangesTablePass4( ILBFactory $lbFactory ) {
global $wgUseRCPatrol, $wgMiserMode;
);
$botusers = [];
- foreach ( $res as $obj ) {
- $botusers[] = User::newFromRow( $obj );
+ foreach ( $res as $row ) {
+ $botusers[] = User::newFromRow( $row );
}
# Fill in the rc_bot field
[ 'user_groups' => [ 'JOIN', 'user_id = ug_user' ] ] + $userQuery['joins']
);
- foreach ( $res as $obj ) {
- $patrolusers[] = User::newFromRow( $obj );
+ foreach ( $res as $row ) {
+ $patrolusers[] = User::newFromRow( $row );
}
# Fill in the rc_patrolled field
}
/**
- * Rebuild pass 5: Delete duplicate entries where we generate both a page revision and a log entry
- * for a single action (upload only, at the moment, but potentially also move, protect, ...).
+ * Rebuild pass 5: Delete duplicate entries where we generate both a page revision and a log
+ * entry for a single action (upload only, at the moment, but potentially move, protect, ...).
+ *
+ * @param ILBFactory $lbFactory
*/
private function rebuildRecentChangesTablePass5( ILBFactory $lbFactory ) {
$dbw = wfGetDB( DB_MASTER );
);
$updates = 0;
- foreach ( $res as $obj ) {
- $rev_id = $obj->ls_value;
- $log_id = $obj->ls_log_id;
+ foreach ( $res as $row ) {
+ $rev_id = $row->ls_value;
+ $log_id = $row->ls_log_id;
// Mark the logging row as having an associated rev id
$dbw->update(
$hashes = [];
$maxSize = 0;
- foreach ( $res as $boRow ) {
- $extDB = $this->getDB( $boRow->bo_cluster );
+ foreach ( $res as $row ) {
+ $extDB = $this->getDB( $row->bo_cluster );
$blobRow = $extDB->selectRow(
'blobs',
'*',
- [ 'blob_id' => $boRow->bo_blob_id ],
+ [ 'blob_id' => $row->bo_blob_id ],
__METHOD__
);