From 4245a2c2005990ee63688bc8153907a7486e2316 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Mon, 8 Jul 2019 23:02:05 -0700 Subject: [PATCH] objectcache: deleteObjectsExpiringBefore() signature and code improvements Make the method use a callable type hint for $progressCallback. Also make the SQLBagOStuff subclass shuffle() the array of server and table indexes in case the limit keeps getting applied. Only garbage collect on DBs that were going to be written to anyway. Also use DeferredUpdates if possible. Change-Id: I723e6377c26750ff98e33f7ab103c6162ae65f43 --- includes/libs/objectcache/BagOStuff.php | 10 +- includes/libs/objectcache/CachedBagOStuff.php | 15 +- .../libs/objectcache/MultiWriteBagOStuff.php | 8 +- .../libs/objectcache/ReplicatedBagOStuff.php | 12 +- includes/objectcache/SqlBagOStuff.php | 228 ++++++++++-------- 5 files changed, 169 insertions(+), 104 deletions(-) diff --git a/includes/libs/objectcache/BagOStuff.php b/includes/libs/objectcache/BagOStuff.php index 00bf57d66d..7ebbe8bcb7 100644 --- a/includes/libs/objectcache/BagOStuff.php +++ b/includes/libs/objectcache/BagOStuff.php @@ -652,15 +652,19 @@ abstract class BagOStuff implements IExpiringStore, IStoreKeyEncoder, LoggerAwar /** * Delete all objects expiring before a certain date. - * @param string $date The reference date in MW format - * @param callable|bool $progressCallback Optional, a function which will be called + * @param string|int $timestamp The reference date in MW or TS_UNIX format + * @param callable|null $progressCallback Optional, a function which will be called * regularly during long-running operations with the percentage progress * as the first parameter. [optional] * @param int $limit Maximum number of keys to delete [default: INF] * * @return bool Success, false if unimplemented */ - public function deleteObjectsExpiringBefore( $date, $progressCallback = false, $limit = INF ) { + public function deleteObjectsExpiringBefore( + $timestamp, + callable $progressCallback = null, + $limit = INF + ) { // stub return false; } diff --git a/includes/libs/objectcache/CachedBagOStuff.php b/includes/libs/objectcache/CachedBagOStuff.php index 0bdd34914c..e193497c7f 100644 --- a/includes/libs/objectcache/CachedBagOStuff.php +++ b/includes/libs/objectcache/CachedBagOStuff.php @@ -82,9 +82,18 @@ class CachedBagOStuff extends HashBagOStuff { $this->backend->setDebug( $bool ); } - public function deleteObjectsExpiringBefore( $date, $progressCallback = false, $limit = INF ) { - parent::deleteObjectsExpiringBefore( $date, $progressCallback, $limit ); - return $this->backend->deleteObjectsExpiringBefore( $date, $progressCallback, $limit ); + public function deleteObjectsExpiringBefore( + $timestamp, + callable $progressCallback = null, + $limit = INF + ) { + parent::deleteObjectsExpiringBefore( $timestamp, $progressCallback, $limit ); + + return $this->backend->deleteObjectsExpiringBefore( + $timestamp, + $progressCallback, + $limit + ); } public function makeKeyInternal( $keyspace, $args ) { diff --git a/includes/libs/objectcache/MultiWriteBagOStuff.php b/includes/libs/objectcache/MultiWriteBagOStuff.php index 7ca04ee7ca..4c6750fcbe 100644 --- a/includes/libs/objectcache/MultiWriteBagOStuff.php +++ b/includes/libs/objectcache/MultiWriteBagOStuff.php @@ -208,10 +208,14 @@ class MultiWriteBagOStuff extends BagOStuff { return $this->caches[0]->unlock( $key ); } - public function deleteObjectsExpiringBefore( $date, $progressCallback = false, $limit = INF ) { + public function deleteObjectsExpiringBefore( + $timestamp, + callable $progressCallback = null, + $limit = INF + ) { $ret = false; foreach ( $this->caches as $cache ) { - if ( $cache->deleteObjectsExpiringBefore( $date, $progressCallback, $limit ) ) { + if ( $cache->deleteObjectsExpiringBefore( $timestamp, $progressCallback, $limit ) ) { $ret = true; } } diff --git a/includes/libs/objectcache/ReplicatedBagOStuff.php b/includes/libs/objectcache/ReplicatedBagOStuff.php index 6fac0ade3b..8502ce2704 100644 --- a/includes/libs/objectcache/ReplicatedBagOStuff.php +++ b/includes/libs/objectcache/ReplicatedBagOStuff.php @@ -108,8 +108,16 @@ class ReplicatedBagOStuff extends BagOStuff { return $this->writeStore->unlock( $key ); } - public function deleteObjectsExpiringBefore( $date, $progressCallback = false, $limit = INF ) { - return $this->writeStore->deleteObjectsExpiringBefore( $date, $progressCallback ); + public function deleteObjectsExpiringBefore( + $timestamp, + callable $progressCallback = null, + $limit = INF + ) { + return $this->writeStore->deleteObjectsExpiringBefore( + $timestamp, + $progressCallback, + $limit + ); } public function getMulti( array $keys, $flags = 0 ) { diff --git a/includes/objectcache/SqlBagOStuff.php b/includes/objectcache/SqlBagOStuff.php index 2ef94c4297..c3a5897daa 100644 --- a/includes/objectcache/SqlBagOStuff.php +++ b/includes/objectcache/SqlBagOStuff.php @@ -43,8 +43,8 @@ class SqlBagOStuff extends BagOStuff { protected $serverTags; /** @var int */ protected $numServers; - /** @var int */ - protected $lastExpireAll = 0; + /** @var int UNIX timestamp */ + protected $lastGarbageCollect = 0; /** @var int */ protected $purgePeriod = 10; /** @var int */ @@ -67,6 +67,9 @@ class SqlBagOStuff extends BagOStuff { /** @var array Exceptions */ protected $connFailureErrors = []; + /** @var int */ + const GARBAGE_COLLECT_DELAY_SEC = 1; + /** * Constructor. Parameters are: * - server: A server info structure in the format required by each @@ -338,8 +341,6 @@ class SqlBagOStuff extends BagOStuff { $keysByTable[$serverIndex][$tableName][] = $key; } - $this->garbageCollect(); // expire old entries if any - $result = true; $exptime = (int)$expiry; /** @noinspection PhpUnusedLocalVariableInspection */ @@ -348,6 +349,7 @@ class SqlBagOStuff extends BagOStuff { $db = null; try { $db = $this->getDB( $serverIndex ); + $this->occasionallyGarbageCollect( $db ); } catch ( DBError $e ) { $this->handleWriteError( $e, $db, $serverIndex ); $result = false; @@ -601,7 +603,10 @@ class SqlBagOStuff extends BagOStuff { * @return bool */ protected function isExpired( $db, $exptime ) { - return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time(); + return ( + $exptime != $this->getMaxDateTime( $db ) && + wfTimestamp( TS_UNIX, $exptime ) < time() + ); } /** @@ -616,116 +621,147 @@ class SqlBagOStuff extends BagOStuff { } } - protected function garbageCollect() { - if ( !$this->purgePeriod || $this->replicaOnly ) { - // Disabled - return; - } - // Only purge on one in every $this->purgePeriod writes - if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) { - return; - } - $now = time(); - // Avoid repeating the delete within a few seconds - if ( $now > ( $this->lastExpireAll + 1 ) ) { - $this->lastExpireAll = $now; - $this->deleteObjectsExpiringBefore( - wfTimestamp( TS_MW, $now ), - false, - $this->purgeLimit - ); + /** + * @param IDatabase $db + * @throws DBError + */ + protected function occasionallyGarbageCollect( IDatabase $db ) { + if ( + // Random purging is enabled + $this->purgePeriod && + // This is not using a replica DB + !$this->replicaOnly && + // Only purge on one in every $this->purgePeriod writes + mt_rand( 0, $this->purgePeriod - 1 ) == 0 && + // Avoid repeating the delete within a few seconds + ( time() - $this->lastGarbageCollect ) > self::GARBAGE_COLLECT_DELAY_SEC + ) { + $garbageCollector = function () use ( $db ) { + $this->deleteServerObjectsExpiringBefore( $db, time(), null, $this->purgeLimit ); + $this->lastGarbageCollect = time(); + }; + if ( $this->asyncHandler ) { + $this->lastGarbageCollect = time(); // avoid duplicate enqueues + ( $this->asyncHandler )( $garbageCollector ); + } else { + $garbageCollector(); + } } } public function expireAll() { - $this->deleteObjectsExpiringBefore( wfTimestampNow() ); + $this->deleteObjectsExpiringBefore( time() ); } public function deleteObjectsExpiringBefore( $timestamp, - $progressCallback = false, + callable $progressCallback = null, $limit = INF ) { /** @noinspection PhpUnusedLocalVariableInspection */ $silenceScope = $this->silenceTransactionProfiler(); - $count = 0; - for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { + $serverIndexes = range( 0, $this->numServers - 1 ); + shuffle( $serverIndexes ); + + $ok = true; + + $keysDeletedCount = 0; + foreach ( $serverIndexes as $numServersDone => $serverIndex ) { $db = null; try { $db = $this->getDB( $serverIndex ); - $dbTimestamp = $db->timestamp( $timestamp ); - $totalSeconds = false; - $baseConds = [ 'exptime < ' . $db->addQuotes( $dbTimestamp ) ]; - for ( $i = 0; $i < $this->shards; $i++ ) { - $maxExpTime = false; - while ( true ) { - $conds = $baseConds; - if ( $maxExpTime !== false ) { - $conds[] = 'exptime >= ' . $db->addQuotes( $maxExpTime ); - } - $rows = $db->select( - $this->getTableNameByShard( $i ), - [ 'keyname', 'exptime' ], - $conds, - __METHOD__, - [ 'LIMIT' => 100, 'ORDER BY' => 'exptime' ] - ); - if ( $rows === false || !$rows->numRows() ) { - break; - } - $keys = []; - $row = $rows->current(); - $minExpTime = $row->exptime; - if ( $totalSeconds === false ) { - $totalSeconds = wfTimestamp( TS_UNIX, $timestamp ) - - wfTimestamp( TS_UNIX, $minExpTime ); - } - foreach ( $rows as $row ) { - $keys[] = $row->keyname; - $maxExpTime = $row->exptime; - } - - $db->delete( - $this->getTableNameByShard( $i ), - [ - 'exptime >= ' . $db->addQuotes( $minExpTime ), - 'exptime < ' . $db->addQuotes( $dbTimestamp ), - 'keyname' => $keys - ], - __METHOD__ - ); - $count += $db->affectedRows(); - if ( $count >= $limit ) { - return true; - } - - if ( is_callable( $progressCallback ) ) { - if ( intval( $totalSeconds ) === 0 ) { - $percent = 0; - } else { - $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp ) - - wfTimestamp( TS_UNIX, $maxExpTime ); - if ( $remainingSeconds > $totalSeconds ) { - $totalSeconds = $remainingSeconds; - } - $processedSeconds = $totalSeconds - $remainingSeconds; - $percent = ( $i + $processedSeconds / $totalSeconds ) - / $this->shards * 100; - } - $percent = ( $percent / $this->numServers ) - + ( $serverIndex / $this->numServers * 100 ); - call_user_func( $progressCallback, $percent ); - } - } - } + $this->deleteServerObjectsExpiringBefore( + $db, + $timestamp, + $progressCallback, + $limit, + $numServersDone, + $keysDeletedCount + ); } catch ( DBError $e ) { $this->handleWriteError( $e, $db, $serverIndex ); - return false; + $ok = false; } } - return true; + return $ok; + } + + /** + * @param IDatabase $db + * @param string|int $timestamp + * @param callable|null $progressCallback + * @param int $limit + * @param int $serversDoneCount + * @param int &$keysDeletedCount + * @throws DBError + */ + private function deleteServerObjectsExpiringBefore( + IDatabase $db, + $timestamp, + $progressCallback, + $limit, + $serversDoneCount = 0, + &$keysDeletedCount = 0 + ) { + $cutoffUnix = wfTimestamp( TS_UNIX, $timestamp ); + $shardIndexes = range( 0, $this->shards - 1 ); + shuffle( $shardIndexes ); + + foreach ( $shardIndexes as $numShardsDone => $shardIndex ) { + $continue = null; // last exptime + $lag = null; // purge lag + do { + $res = $db->select( + $this->getTableNameByShard( $shardIndex ), + [ 'keyname', 'exptime' ], + array_merge( + [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ], + $continue ? [ 'exptime >= ' . $db->addQuotes( $continue ) ] : [] + ), + __METHOD__, + [ 'LIMIT' => min( $limit, 100 ), 'ORDER BY' => 'exptime' ] + ); + + if ( $res->numRows() ) { + $row = $res->current(); + if ( $lag === null ) { + $lag = max( $cutoffUnix - wfTimestamp( TS_UNIX, $row->exptime ), 1 ); + } + + $keys = []; + foreach ( $res as $row ) { + $keys[] = $row->keyname; + $continue = $row->exptime; + } + + $db->delete( + $this->getTableNameByShard( $shardIndex ), + [ + 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ), + 'keyname' => $keys + ], + __METHOD__ + ); + $keysDeletedCount += $db->affectedRows(); + } + + if ( is_callable( $progressCallback ) ) { + if ( $lag ) { + $remainingLag = $cutoffUnix - wfTimestamp( TS_UNIX, $continue ); + $processedLag = max( $lag - $remainingLag, 0 ); + $doneRatio = ( $numShardsDone + $processedLag / $lag ) / $this->shards; + } else { + $doneRatio = 1; + } + + $overallRatio = ( $doneRatio / $this->numServers ) + + ( $serversDoneCount / $this->numServers ); + call_user_func( $progressCallback, $overallRatio * 100 ); + } + } while ( $res->numRows() && $keysDeletedCount < $limit ); + } } /** @@ -763,6 +799,8 @@ class SqlBagOStuff extends BagOStuff { } list( $serverIndex ) = $this->getTableByKey( $key ); + + $db = null; try { $db = $this->getDB( $serverIndex ); $ok = $db->lock( $key, __METHOD__, $timeout ); @@ -793,6 +831,8 @@ class SqlBagOStuff extends BagOStuff { unset( $this->locks[$key] ); list( $serverIndex ) = $this->getTableByKey( $key ); + + $db = null; try { $db = $this->getDB( $serverIndex ); $ok = $db->unlock( $key, __METHOD__ ); -- 2.20.1