From e2b03d5d30a8a6c62569a336622b865f47e148d0 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Tue, 12 Mar 2019 00:38:56 -0700 Subject: [PATCH] objectcache: add BagOStuff::deleteMulti() method for consistency Also: * Make the BagOStuff tests actually pass for memcached and sql * Add more unit tests for BagOStuff * Make SqlBagOStuff::add() more atomic Change-Id: Ic1eec0990a66b595b57c646498c3bd229442230c --- includes/libs/objectcache/BagOStuff.php | 18 ++- includes/libs/objectcache/RedisBagOStuff.php | 39 ++++++ .../libs/objectcache/ReplicatedBagOStuff.php | 8 ++ includes/objectcache/SqlBagOStuff.php | 95 +++++++++----- .../libs/objectcache/BagOStuffTest.php | 118 +++++++++++++++--- 5 files changed, 228 insertions(+), 50 deletions(-) diff --git a/includes/libs/objectcache/BagOStuff.php b/includes/libs/objectcache/BagOStuff.php index 2f107082b5..9c75a2f1f6 100644 --- a/includes/libs/objectcache/BagOStuff.php +++ b/includes/libs/objectcache/BagOStuff.php @@ -463,7 +463,7 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface { function () use ( $key, $expiry, $fname ) { $this->clearLastError(); if ( $this->add( "{$key}:lock", 1, $expiry ) ) { - return true; // locked! + return WaitConditionLoop::CONDITION_REACHED; // locked! } elseif ( $this->getLastError() ) { $this->logger->warning( $fname . ' failed due to I/O error for {key}.', @@ -606,6 +606,22 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface { return $res; } + /** + * Batch deletion + * @param string[] $keys List of keys + * @param int $flags Bitfield of BagOStuff::WRITE_* constants + * @return bool Success + * @since 1.33 + */ + public function deleteMulti( array $keys, $flags = 0 ) { + $res = true; + foreach ( $keys as $key ) { + $res = $this->delete( $key, $flags ) && $res; + } + + return $res; + } + /** * Insertion * @param string $key diff --git a/includes/libs/objectcache/RedisBagOStuff.php b/includes/libs/objectcache/RedisBagOStuff.php index 2bf0c48d32..f64fe7e780 100644 --- a/includes/libs/objectcache/RedisBagOStuff.php +++ b/includes/libs/objectcache/RedisBagOStuff.php @@ -233,6 +233,45 @@ class RedisBagOStuff extends BagOStuff { return $result; } + public function deleteMulti( array $keys, $flags = 0 ) { + $batches = []; + $conns = []; + foreach ( $keys as $key ) { + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + continue; + } + $conns[$server] = $conn; + $batches[$server][] = $key; + } + + $result = true; + foreach ( $batches as $server => $batchKeys ) { + $conn = $conns[$server]; + try { + $conn->multi( Redis::PIPELINE ); + foreach ( $batchKeys as $key ) { + $conn->delete( $key ); + } + $batchResult = $conn->exec(); + if ( $batchResult === false ) { + $this->debug( "deleteMulti request to $server failed" ); + continue; + } + foreach ( $batchResult as $value ) { + if ( $value === false ) { + $result = false; + } + } + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + $result = false; + } + } + + return $result; + } + public function add( $key, $value, $expiry = 0, $flags = 0 ) { list( $server, $conn ) = $this->getConnection( $key ); if ( !$conn ) { diff --git a/includes/libs/objectcache/ReplicatedBagOStuff.php b/includes/libs/objectcache/ReplicatedBagOStuff.php index 6afdc1cfaf..ea380a6423 100644 --- a/includes/libs/objectcache/ReplicatedBagOStuff.php +++ b/includes/libs/objectcache/ReplicatedBagOStuff.php @@ -90,10 +90,18 @@ class ReplicatedBagOStuff extends BagOStuff { return $this->writeStore->set( $key, $value, $exptime, $flags ); } + public function setMulti( array $keys, $exptime = 0, $flags = 0 ) { + return $this->writeStore->setMulti( $keys, $exptime, $flags ); + } + public function delete( $key, $flags = 0 ) { return $this->writeStore->delete( $key, $flags ); } + public function deleteMulti( array $keys, $flags = 0 ) { + return $this->writeStore->deleteMulti( $keys, $flags ); + } + public function add( $key, $value, $exptime = 0, $flags = 0 ) { return $this->writeStore->add( $key, $value, $exptime ); } diff --git a/includes/objectcache/SqlBagOStuff.php b/includes/objectcache/SqlBagOStuff.php index 66b488e194..b2d61a8925 100644 --- a/includes/objectcache/SqlBagOStuff.php +++ b/includes/objectcache/SqlBagOStuff.php @@ -312,6 +312,10 @@ class SqlBagOStuff extends BagOStuff { } public function setMulti( array $data, $expiry = 0, $flags = 0 ) { + return $this->insertMulti( $data, $expiry, $flags, true ); + } + + private function insertMulti( array $data, $expiry, $flags, $replace ) { $keysByTable = []; foreach ( $data as $key => $value ) { list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); @@ -354,19 +358,22 @@ class SqlBagOStuff extends BagOStuff { } try { - $db->replace( - $tableName, - [ 'keyname' ], - $rows, - __METHOD__ - ); + if ( $replace ) { + $db->replace( $tableName, [ 'keyname' ], $rows, __METHOD__ ); + } else { + $db->insert( $tableName, $rows, __METHOD__, [ 'IGNORE' ] ); + $result = ( $db->affectedRows() > 0 && $result ); + } } catch ( DBError $e ) { $this->handleWriteError( $e, $db, $serverIndex ); $result = false; } } + } + if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) { + $result = $this->waitForReplication() && $result; } return $result; @@ -374,13 +381,16 @@ class SqlBagOStuff extends BagOStuff { public function set( $key, $value, $exptime = 0, $flags = 0 ) { $ok = $this->setMulti( [ $key => $value ], $exptime ); - if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) { - $ok = $this->waitForReplication() && $ok; - } return $ok; } + public function add( $key, $value, $exptime = 0, $flags = 0 ) { + $added = $this->insertMulti( [ $key => $value ], $exptime, $flags, false ); + + return $added; + } + protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) { list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); $db = null; @@ -423,26 +433,46 @@ class SqlBagOStuff extends BagOStuff { return (bool)$db->affectedRows(); } - public function delete( $key, $flags = 0 ) { - $ok = true; + public function deleteMulti( array $keys, $flags = 0 ) { + $keysByTable = []; + foreach ( $keys as $key ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + $keysByTable[$serverIndex][$tableName][] = $key; + } - list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); - $db = null; + $result = true; $silenceScope = $this->silenceTransactionProfiler(); - try { - $db = $this->getDB( $serverIndex ); - $db->delete( - $tableName, - [ 'keyname' => $key ], - __METHOD__ ); - } catch ( DBError $e ) { - $this->handleWriteError( $e, $db, $serverIndex ); - $ok = false; + foreach ( $keysByTable as $serverIndex => $serverKeys ) { + $db = null; + try { + $db = $this->getDB( $serverIndex ); + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + $result = false; + continue; + } + + foreach ( $serverKeys as $tableName => $tableKeys ) { + try { + $db->delete( $tableName, [ 'keyname' => $tableKeys ], __METHOD__ ); + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + $result = false; + } + + } } + if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) { - $ok = $this->waitForReplication() && $ok; + $result = $this->waitForReplication() && $result; } + return $result; + } + + public function delete( $key, $flags = 0 ) { + $ok = $this->deleteMulti( [ $key ], $flags ); + return $ok; } @@ -458,31 +488,34 @@ class SqlBagOStuff extends BagOStuff { [ 'value', 'exptime' ], [ 'keyname' => $key ], __METHOD__, - [ 'FOR UPDATE' ] ); + [ 'FOR UPDATE' ] + ); if ( $row === false ) { // Missing - - return null; + return false; } $db->delete( $tableName, [ 'keyname' => $key ], __METHOD__ ); if ( $this->isExpired( $db, $row->exptime ) ) { // Expired, do not reinsert - - return null; + return false; } $oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) ); $newValue = $oldValue + $step; - $db->insert( $tableName, + $db->insert( + $tableName, [ 'keyname' => $key, 'value' => $db->encodeBlob( $this->serialize( $newValue ) ), 'exptime' => $row->exptime - ], __METHOD__, 'IGNORE' ); + ], + __METHOD__, + 'IGNORE' + ); if ( $db->affectedRows() == 0 ) { // Race condition. See T30611 - $newValue = null; + $newValue = false; } } catch ( DBError $e ) { $this->handleWriteError( $e, $db, $serverIndex ); diff --git a/tests/phpunit/includes/libs/objectcache/BagOStuffTest.php b/tests/phpunit/includes/libs/objectcache/BagOStuffTest.php index f0f55fb6ac..b68ffaf8d6 100644 --- a/tests/phpunit/includes/libs/objectcache/BagOStuffTest.php +++ b/tests/phpunit/includes/libs/objectcache/BagOStuffTest.php @@ -26,6 +26,7 @@ class BagOStuffTest extends MediaWikiTestCase { } $this->cache->delete( $this->cache->makeKey( self::TEST_KEY ) ); + $this->cache->delete( $this->cache->makeKey( self::TEST_KEY ) . ':lock' ); } /** @@ -68,10 +69,25 @@ class BagOStuffTest extends MediaWikiTestCase { * @covers BagOStuff::mergeViaCas */ public function testMerge() { - $calls = 0; $key = $this->cache->makeKey( self::TEST_KEY ); - $callback = function ( BagOStuff $cache, $key, $oldVal ) use ( &$calls ) { + $locks = false; + $checkLockingCallback = function ( BagOStuff $cache, $key, $oldVal ) use ( &$locks ) { + $locks = $cache->get( "$key:lock" ); + + return false; + }; + + $this->cache->merge( $key, $checkLockingCallback, 5 ); + $this->assertFalse( $this->cache->get( $key ) ); + + $calls = 0; + $casRace = false; // emulate a race + $callback = function ( BagOStuff $cache, $key, $oldVal ) use ( &$calls, &$casRace ) { ++$calls; + if ( $casRace ) { + // Uses CAS instead? + $cache->set( $key, 'conflict', 5 ); + } return ( $oldVal === false ) ? 'merged' : $oldVal . 'merged'; }; @@ -87,21 +103,43 @@ class BagOStuffTest extends MediaWikiTestCase { $this->assertEquals( 'mergedmerged', $this->cache->get( $key ) ); $calls = 0; - $this->cache->lock( $key ); - $this->assertFalse( $this->cache->merge( $key, $callback, 1 ), 'Non-blocking merge' ); - $this->cache->unlock( $key ); - $this->assertEquals( 0, $calls ); + if ( $locks ) { + // merge were something else already was merging (e.g. had the lock) + $this->cache->lock( $key ); + $this->assertFalse( + $this->cache->merge( $key, $callback, 5, 1 ), + 'Non-blocking merge (locking)' + ); + $this->cache->unlock( $key ); + $this->assertEquals( 0, $calls ); + } else { + $casRace = true; + $this->assertFalse( + $this->cache->merge( $key, $callback, 5, 1 ), + 'Non-blocking merge (CAS)' + ); + $this->assertEquals( 1, $calls ); + } } /** * @covers BagOStuff::merge * @covers BagOStuff::mergeViaLock + * @dataProvider provideTestMerge_fork */ - public function testMerge_fork() { + public function testMerge_fork( $exists, $winsLocking, $resLocking, $resCAS ) { $key = $this->cache->makeKey( self::TEST_KEY ); - $callback = function ( BagOStuff $cache, $key, $oldVal ) { - return ( $oldVal === false ) ? 'merged' : $oldVal . 'merged'; + $pCallback = function ( BagOStuff $cache, $key, $oldVal ) { + return ( $oldVal === false ) ? 'init-parent' : $oldVal . '-merged-parent'; + }; + $cCallback = function ( BagOStuff $cache, $key, $oldVal ) { + return ( $oldVal === false ) ? 'init-child' : $oldVal . '-merged-child'; }; + + if ( $exists ) { + $this->cache->set( $key, 'x', 5 ); + } + /* * Test concurrent merges by forking this process, if: * - not manually called with --use-bagostuff @@ -115,17 +153,21 @@ class BagOStuffTest extends MediaWikiTestCase { $fork &= !$this->cache instanceof MultiWriteBagOStuff; if ( $fork ) { $pid = null; + $locked = false; // Function to start merge(), run another merge() midway through, then finish - $outerFunc = function ( BagOStuff $cache, $key, $oldVal ) use ( $callback, &$pid ) { + $func = function ( BagOStuff $cache, $key, $cur ) + use ( $pCallback, $cCallback, &$pid, &$locked ) + { $pid = pcntl_fork(); if ( $pid == -1 ) { return false; } elseif ( $pid ) { + $locked = $cache->get( "$key:lock" ); // parent has lock? pcntl_wait( $status ); - return $callback( $cache, $key, $oldVal ); + return $pCallback( $cache, $key, $cur ); } else { - $this->cache->merge( $key, $callback, 0, 1 ); + $this->cache->merge( $key, $cCallback, 0, 1 ); // Bail out of the outer merge() in the child process since it does not // need to attempt to write anything. Success is checked by the parent. parent::tearDown(); // avoid phpunit notices @@ -134,22 +176,34 @@ class BagOStuffTest extends MediaWikiTestCase { }; // attempt a merge - this should fail - $merged = $this->cache->merge( $key, $outerFunc, 0, 1 ); + $merged = $this->cache->merge( $key, $func, 0, 1 ); if ( $pid == -1 ) { return; // can't fork, ignore this test... } - // merge has failed because child process was merging (and we only attempted once) - $this->assertFalse( $merged ); - - // make sure the child's merge is completed and verify - $this->assertEquals( $this->cache->get( $key ), 'mergedmerged' ); + if ( $locked ) { + // merge succeed since child was locked out + $this->assertEquals( $winsLocking, $merged ); + $this->assertEquals( $this->cache->get( $key ), $resLocking ); + } else { + // merge has failed because child process was merging (and we only attempted once) + $this->assertEquals( !$winsLocking, $merged ); + $this->assertEquals( $this->cache->get( $key ), $resCAS ); + } } else { $this->markTestSkipped( 'No pcntl methods available' ); } } + function provideTestMerge_fork() { + return [ + // (already exists, parent wins if locking, result if locking, result if CAS) + [ false, true, 'init-parent', 'init-child' ], + [ true, true, 'x-merged-parent', 'x-merged-child' ] + ]; + } + /** * @covers BagOStuff::changeTTL */ @@ -266,6 +320,34 @@ class BagOStuffTest extends MediaWikiTestCase { $this->cache->delete( $key4 ); } + /** + * @covers BagOStuff::setMulti + * @covers BagOStuff::deleteMulti + */ + public function testSetDeleteMulti() { + $map = [ + $this->cache->makeKey( 'test-1' ) => 'Siberian', + $this->cache->makeKey( 'test-2' ) => [ 'Huskies' ], + $this->cache->makeKey( 'test-3' ) => [ 'are' => 'the' ], + $this->cache->makeKey( 'test-4' ) => (object)[ 'greatest' => 'animal' ], + $this->cache->makeKey( 'test-5' ) => 4, + $this->cache->makeKey( 'test-6' ) => 'ever' + ]; + + $this->cache->setMulti( $map, 5 ); + $this->assertEquals( + $map, + $this->cache->getMulti( array_keys( $map ) ) + ); + + $this->assertTrue( $this->cache->deleteMulti( array_keys( $map ), 5 ) ); + + $this->assertEquals( + [], + $this->cache->getMulti( array_keys( $map ) ) + ); + } + /** * @covers BagOStuff::getScopedLock */ -- 2.20.1