X-Git-Url: https://git.cyclocoop.org/%7B%24admin_url%7Dcompta/operations/supprimer.php?a=blobdiff_plain;f=includes%2Flibs%2Fobjectcache%2FMemcachedPeclBagOStuff.php;h=cc7ee2a5f5a4c983cd89706aca6cb0e137193a96;hb=35093297075bb55490b0c24653e0978333bc2fec;hp=221bc82ce32cb0da0742728b57cd6b3249d88bd0;hpb=3e39ec72a693cacc6bb92364fcf610de0e8c5a25;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/libs/objectcache/MemcachedPeclBagOStuff.php b/includes/libs/objectcache/MemcachedPeclBagOStuff.php index 221bc82ce3..cc7ee2a5f5 100644 --- a/includes/libs/objectcache/MemcachedPeclBagOStuff.php +++ b/includes/libs/objectcache/MemcachedPeclBagOStuff.php @@ -28,7 +28,25 @@ */ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { /** @var Memcached */ - protected $client; + protected $syncClient; + /** @var Memcached|null */ + protected $asyncClient; + + /** @var bool Whether the non-buffering client is locked from use */ + protected $syncClientIsBuffering = false; + /** @var bool Whether the non-buffering client should be flushed before use */ + protected $hasUnflushedChanges = false; + + /** @var array Memcached options */ + private static $OPTS_SYNC_WRITES = [ + Memcached::OPT_NO_BLOCK => false, // async I/O (using TCP buffers) + Memcached::OPT_BUFFER_WRITES => false // libmemcached buffers + ]; + /** @var array Memcached options */ + private static $OPTS_ASYNC_WRITES = [ + Memcached::OPT_NO_BLOCK => true, // async I/O (using TCP buffers) + Memcached::OPT_BUFFER_WRITES => true // libmemcached buffers + ]; /** * Available parameters are: @@ -63,15 +81,22 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { // The Memcached object is essentially shared for each pool ID. // We can only reuse a pool ID if we keep the config consistent. $connectionPoolId = md5( serialize( $params ) ); - $client = new Memcached( $connectionPoolId ); - $this->initializeClient( $client, $params ); + $syncClient = new Memcached( "$connectionPoolId-sync" ); + // Avoid clobbering the main thread-shared Memcached instance + $asyncClient = new Memcached( "$connectionPoolId-async" ); } else { - $client = new Memcached; - $this->initializeClient( $client, $params ); + $syncClient = new Memcached(); + $asyncClient = null; } - $this->client = $client; + $this->initializeClient( $syncClient, $params, self::$OPTS_SYNC_WRITES ); + if ( $asyncClient ) { + $this->initializeClient( $asyncClient, $params, self::$OPTS_ASYNC_WRITES ); + } + // Set the main client and any dedicated one for buffered writes + $this->syncClient = $syncClient; + $this->asyncClient = $asyncClient; // The compression threshold is an undocumented php.ini option for some // reason. There's probably not much harm in setting it globally, for // compatibility with the settings for the PHP client. @@ -84,9 +109,10 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { * * @param Memcached $client * @param array $params + * @param array $options Base options for Memcached::setOptions() * @throws RuntimeException */ - private function initializeClient( Memcached $client, array $params ) { + private function initializeClient( Memcached $client, array $params, array $options ) { if ( $client->getServerList() ) { $this->logger->debug( __METHOD__ . ": pre-initialized client instance." ); @@ -95,7 +121,9 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { $this->logger->debug( __METHOD__ . ": initializing new client instance." ); - $options = [ + $options += [ + Memcached::OPT_NO_BLOCK => false, + Memcached::OPT_BUFFER_WRITES => false, // Network protocol (ASCII or binary) Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'], // Set various network timeouts @@ -150,10 +178,12 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { protected function doGet( $key, $flags = 0, &$casToken = null ) { $this->debug( "get($key)" ); + + $client = $this->acquireSyncClient(); if ( defined( Memcached::class . '::GET_EXTENDED' ) ) { // v3.0.0 /** @noinspection PhpUndefinedClassConstantInspection */ $flags = Memcached::GET_EXTENDED; - $res = $this->client->get( $this->validateKeyEncoding( $key ), null, $flags ); + $res = $client->get( $this->validateKeyEncoding( $key ), null, $flags ); if ( is_array( $res ) ) { $result = $res['value']; $casToken = $res['cas']; @@ -162,62 +192,77 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { $casToken = null; } } else { - $result = $this->client->get( $this->validateKeyEncoding( $key ), null, $casToken ); + $result = $client->get( $this->validateKeyEncoding( $key ), null, $casToken ); } - $result = $this->checkResult( $key, $result ); - return $result; + + return $this->checkResult( $key, $result ); } protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) { $this->debug( "set($key)" ); - $result = $this->client->set( + + $client = $this->acquireSyncClient(); + $result = $client->set( $this->validateKeyEncoding( $key ), $value, $this->fixExpiry( $exptime ) ); - if ( $result === false && $this->client->getResultCode() === Memcached::RES_NOTSTORED ) { + + return ( $result === false && $client->getResultCode() === Memcached::RES_NOTSTORED ) // "Not stored" is always used as the mcrouter response with AllAsyncRoute - return true; - } - return $this->checkResult( $key, $result ); + ? true + : $this->checkResult( $key, $result ); } protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) { $this->debug( "cas($key)" ); - $result = $this->client->cas( $casToken, $this->validateKeyEncoding( $key ), - $value, $this->fixExpiry( $exptime ) ); + + $result = $this->acquireSyncClient()->cas( + $casToken, + $this->validateKeyEncoding( $key ), + $value, $this->fixExpiry( $exptime ) + ); + return $this->checkResult( $key, $result ); } protected function doDelete( $key, $flags = 0 ) { $this->debug( "delete($key)" ); - $result = $this->client->delete( $this->validateKeyEncoding( $key ) ); - if ( $result === false && $this->client->getResultCode() === Memcached::RES_NOTFOUND ) { + + $client = $this->acquireSyncClient(); + $result = $client->delete( $this->validateKeyEncoding( $key ) ); + + return ( $result === false && $client->getResultCode() === Memcached::RES_NOTFOUND ) // "Not found" is counted as success in our interface - return true; - } - return $this->checkResult( $key, $result ); + ? true + : $this->checkResult( $key, $result ); } public function add( $key, $value, $exptime = 0, $flags = 0 ) { $this->debug( "add($key)" ); - $result = $this->client->add( + + $result = $this->acquireSyncClient()->add( $this->validateKeyEncoding( $key ), $value, $this->fixExpiry( $exptime ) ); + return $this->checkResult( $key, $result ); } public function incr( $key, $value = 1 ) { $this->debug( "incr($key)" ); - $result = $this->client->increment( $key, $value ); + + $result = $this->acquireSyncClient()->increment( $key, $value ); + return $this->checkResult( $key, $result ); } public function decr( $key, $value = 1 ) { $this->debug( "decr($key)" ); - $result = $this->client->decrement( $key, $value ); + + $result = $this->acquireSyncClient()->decrement( $key, $value ); + return $this->checkResult( $key, $result ); } @@ -236,22 +281,25 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { if ( $result !== false ) { return $result; } - switch ( $this->client->getResultCode() ) { + + $client = $this->syncClient; + switch ( $client->getResultCode() ) { case Memcached::RES_SUCCESS: break; case Memcached::RES_DATA_EXISTS: case Memcached::RES_NOTSTORED: case Memcached::RES_NOTFOUND: - $this->debug( "result: " . $this->client->getResultMessage() ); + $this->debug( "result: " . $client->getResultMessage() ); break; default: - $msg = $this->client->getResultMessage(); + $msg = $client->getResultMessage(); $logCtx = []; if ( $key !== false ) { - $server = $this->client->getServerByKey( $key ); + $server = $client->getServerByKey( $key ); $logCtx['memcached-server'] = "{$server['host']}:{$server['port']}"; $logCtx['memcached-key'] = $key; - $msg = "Memcached error for key \"{memcached-key}\" on server \"{memcached-server}\": $msg"; + $msg = "Memcached error for key \"{memcached-key}\" " . + "on server \"{memcached-server}\": $msg"; } else { $msg = "Memcached error: $msg"; } @@ -263,41 +311,71 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { protected function doGetMulti( array $keys, $flags = 0 ) { $this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' ); + foreach ( $keys as $key ) { $this->validateKeyEncoding( $key ); } - $result = $this->client->getMulti( $keys ) ?: []; + + // The PECL implementation uses "gets" which works as well as a pipeline + $result = $this->acquireSyncClient()->getMulti( $keys ) ?: []; + return $this->checkResult( false, $result ); } protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) { $this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' ); + + $exptime = $this->fixExpiry( $exptime ); foreach ( array_keys( $data ) as $key ) { $this->validateKeyEncoding( $key ); } - $result = $this->client->setMulti( $data, $this->fixExpiry( $exptime ) ); + + // The PECL implementation is a naïve for-loop so use async I/O to pipeline; + // https://github.com/php-memcached-dev/php-memcached/blob/master/php_memcached.c#L1852 + if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) { + $client = $this->acquireAsyncClient(); + $result = $client->setMulti( $data, $exptime ); + $this->releaseAsyncClient( $client ); + } else { + $result = $this->acquireSyncClient()->setMulti( $data, $exptime ); + } + return $this->checkResult( false, $result ); } protected function doDeleteMulti( array $keys, $flags = 0 ) { $this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' ); + foreach ( $keys as $key ) { $this->validateKeyEncoding( $key ); } - $result = $this->client->deleteMulti( $keys ) ?: []; - $ok = true; - foreach ( $result as $code ) { + + // The PECL implementation is a naïve for-loop so use async I/O to pipeline; + // https://github.com/php-memcached-dev/php-memcached/blob/7443d16d02fb73cdba2e90ae282446f80969229c/php_memcached.c#L1852 + if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) { + $client = $this->acquireAsyncClient(); + $resultArray = $client->deleteMulti( $keys ) ?: []; + $this->releaseAsyncClient( $client ); + } else { + $resultArray = $this->acquireSyncClient()->deleteMulti( $keys ) ?: []; + } + + $result = true; + foreach ( $resultArray as $code ) { if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) { // "Not found" is counted as success in our interface - $ok = false; + $result = false; } } - return $this->checkResult( false, $ok ); + + return $this->checkResult( false, $result ); } protected function doChangeTTL( $key, $exptime, $flags ) { $this->debug( "touch($key)" ); - $result = $this->client->touch( $key, $exptime ); + + $result = $this->acquireSyncClient()->touch( $key, $this->fixExpiry( $exptime ) ); + return $this->checkResult( $key, $result ); } @@ -306,7 +384,7 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { return $value; } - $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER ); + $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER ); if ( $serializer === Memcached::SERIALIZER_PHP ) { return serialize( $value ); } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) { @@ -321,7 +399,7 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { return (int)$value; } - $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER ); + $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER ); if ( $serializer === Memcached::SERIALIZER_PHP ) { return unserialize( $value ); } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) { @@ -330,4 +408,52 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff { throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." ); } + + /** + * @return Memcached + */ + private function acquireSyncClient() { + if ( $this->syncClientIsBuffering ) { + throw new RuntimeException( "The main (unbuffered I/O) client is locked" ); + } + + if ( $this->hasUnflushedChanges ) { + // Force a synchronous flush of async writes so that their changes are visible + $this->syncClient->fetch(); + if ( $this->asyncClient ) { + $this->asyncClient->fetch(); + } + $this->hasUnflushedChanges = false; + } + + return $this->syncClient; + } + + /** + * @return Memcached + */ + private function acquireAsyncClient() { + if ( $this->asyncClient ) { + return $this->asyncClient; // dedicated buffering instance + } + + // Modify the main instance to temporarily buffer writes + $this->syncClientIsBuffering = true; + $this->syncClient->setOptions( self::$OPTS_ASYNC_WRITES ); + + return $this->syncClient; + } + + /** + * @param Memcached $client + */ + private function releaseAsyncClient( $client ) { + $this->hasUnflushedChanges = true; + + if ( !$this->asyncClient ) { + // This is the main instance; make it stop buffering writes again + $client->setOptions( self::$OPTS_SYNC_WRITES ); + $this->syncClientIsBuffering = false; + } + } }