objectcache: deleteObjectsExpiringBefore() signature and code improvements
authorAaron Schulz <aschulz@wikimedia.org>
Tue, 9 Jul 2019 06:02:05 +0000 (23:02 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 9 Jul 2019 09:32:06 +0000 (02:32 -0700)
Make the method use a callable type hint for $progressCallback.

Also make the SQLBagOStuff subclass shuffle() the array of server and table
indexes in case the limit keeps getting applied. Only garbage collect on DBs
that were going to be written to anyway. Also use DeferredUpdates if possible.

Change-Id: I723e6377c26750ff98e33f7ab103c6162ae65f43

includes/libs/objectcache/BagOStuff.php
includes/libs/objectcache/CachedBagOStuff.php
includes/libs/objectcache/MultiWriteBagOStuff.php
includes/libs/objectcache/ReplicatedBagOStuff.php
includes/objectcache/SqlBagOStuff.php

index 00bf57d..7ebbe8b 100644 (file)
@@ -652,15 +652,19 @@ abstract class BagOStuff implements IExpiringStore, IStoreKeyEncoder, LoggerAwar
 
        /**
         * Delete all objects expiring before a certain date.
-        * @param string $date The reference date in MW format
-        * @param callable|bool $progressCallback Optional, a function which will be called
+        * @param string|int $timestamp The reference date in MW or TS_UNIX format
+        * @param callable|null $progressCallback Optional, a function which will be called
         *     regularly during long-running operations with the percentage progress
         *     as the first parameter. [optional]
         * @param int $limit Maximum number of keys to delete [default: INF]
         *
         * @return bool Success, false if unimplemented
         */
-       public function deleteObjectsExpiringBefore( $date, $progressCallback = false, $limit = INF ) {
+       public function deleteObjectsExpiringBefore(
+               $timestamp,
+               callable $progressCallback = null,
+               $limit = INF
+       ) {
                // stub
                return false;
        }
index 0bdd349..e193497 100644 (file)
@@ -82,9 +82,18 @@ class CachedBagOStuff extends HashBagOStuff {
                $this->backend->setDebug( $bool );
        }
 
-       public function deleteObjectsExpiringBefore( $date, $progressCallback = false, $limit = INF ) {
-               parent::deleteObjectsExpiringBefore( $date, $progressCallback, $limit );
-               return $this->backend->deleteObjectsExpiringBefore( $date, $progressCallback, $limit );
+       public function deleteObjectsExpiringBefore(
+               $timestamp,
+               callable $progressCallback = null,
+               $limit = INF
+       ) {
+               parent::deleteObjectsExpiringBefore( $timestamp, $progressCallback, $limit );
+
+               return $this->backend->deleteObjectsExpiringBefore(
+                       $timestamp,
+                       $progressCallback,
+                       $limit
+               );
        }
 
        public function makeKeyInternal( $keyspace, $args ) {
index 7ca04ee..4c6750f 100644 (file)
@@ -208,10 +208,14 @@ class MultiWriteBagOStuff extends BagOStuff {
                return $this->caches[0]->unlock( $key );
        }
 
-       public function deleteObjectsExpiringBefore( $date, $progressCallback = false, $limit = INF ) {
+       public function deleteObjectsExpiringBefore(
+               $timestamp,
+               callable $progressCallback = null,
+               $limit = INF
+       ) {
                $ret = false;
                foreach ( $this->caches as $cache ) {
-                       if ( $cache->deleteObjectsExpiringBefore( $date, $progressCallback, $limit ) ) {
+                       if ( $cache->deleteObjectsExpiringBefore( $timestamp, $progressCallback, $limit ) ) {
                                $ret = true;
                        }
                }
index 6fac0ad..8502ce2 100644 (file)
@@ -108,8 +108,16 @@ class ReplicatedBagOStuff extends BagOStuff {
                return $this->writeStore->unlock( $key );
        }
 
-       public function deleteObjectsExpiringBefore( $date, $progressCallback = false, $limit = INF ) {
-               return $this->writeStore->deleteObjectsExpiringBefore( $date, $progressCallback );
+       public function deleteObjectsExpiringBefore(
+               $timestamp,
+               callable $progressCallback = null,
+               $limit = INF
+       ) {
+               return $this->writeStore->deleteObjectsExpiringBefore(
+                       $timestamp,
+                       $progressCallback,
+                       $limit
+               );
        }
 
        public function getMulti( array $keys, $flags = 0 ) {
index 2ef94c4..c3a5897 100644 (file)
@@ -43,8 +43,8 @@ class SqlBagOStuff extends BagOStuff {
        protected $serverTags;
        /** @var int */
        protected $numServers;
-       /** @var int */
-       protected $lastExpireAll = 0;
+       /** @var int UNIX timestamp */
+       protected $lastGarbageCollect = 0;
        /** @var int */
        protected $purgePeriod = 10;
        /** @var int */
@@ -67,6 +67,9 @@ class SqlBagOStuff extends BagOStuff {
        /** @var array Exceptions */
        protected $connFailureErrors = [];
 
+       /** @var int */
+       const GARBAGE_COLLECT_DELAY_SEC = 1;
+
        /**
         * Constructor. Parameters are:
         *   - server:      A server info structure in the format required by each
@@ -338,8 +341,6 @@ class SqlBagOStuff extends BagOStuff {
                        $keysByTable[$serverIndex][$tableName][] = $key;
                }
 
-               $this->garbageCollect(); // expire old entries if any
-
                $result = true;
                $exptime = (int)$expiry;
                /** @noinspection PhpUnusedLocalVariableInspection */
@@ -348,6 +349,7 @@ class SqlBagOStuff extends BagOStuff {
                        $db = null;
                        try {
                                $db = $this->getDB( $serverIndex );
+                               $this->occasionallyGarbageCollect( $db );
                        } catch ( DBError $e ) {
                                $this->handleWriteError( $e, $db, $serverIndex );
                                $result = false;
@@ -601,7 +603,10 @@ class SqlBagOStuff extends BagOStuff {
         * @return bool
         */
        protected function isExpired( $db, $exptime ) {
-               return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time();
+               return (
+                       $exptime != $this->getMaxDateTime( $db ) &&
+                       wfTimestamp( TS_UNIX, $exptime ) < time()
+               );
        }
 
        /**
@@ -616,116 +621,147 @@ class SqlBagOStuff extends BagOStuff {
                }
        }
 
-       protected function garbageCollect() {
-               if ( !$this->purgePeriod || $this->replicaOnly ) {
-                       // Disabled
-                       return;
-               }
-               // Only purge on one in every $this->purgePeriod writes
-               if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) {
-                       return;
-               }
-               $now = time();
-               // Avoid repeating the delete within a few seconds
-               if ( $now > ( $this->lastExpireAll + 1 ) ) {
-                       $this->lastExpireAll = $now;
-                       $this->deleteObjectsExpiringBefore(
-                               wfTimestamp( TS_MW, $now ),
-                               false,
-                               $this->purgeLimit
-                       );
+       /**
+        * @param IDatabase $db
+        * @throws DBError
+        */
+       protected function occasionallyGarbageCollect( IDatabase $db ) {
+               if (
+                       // Random purging is enabled
+                       $this->purgePeriod &&
+                       // This is not using a replica DB
+                       !$this->replicaOnly &&
+                       // Only purge on one in every $this->purgePeriod writes
+                       mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
+                       // Avoid repeating the delete within a few seconds
+                       ( time() - $this->lastGarbageCollect ) > self::GARBAGE_COLLECT_DELAY_SEC
+               ) {
+                       $garbageCollector = function () use ( $db ) {
+                               $this->deleteServerObjectsExpiringBefore( $db, time(), null, $this->purgeLimit );
+                               $this->lastGarbageCollect = time();
+                       };
+                       if ( $this->asyncHandler ) {
+                               $this->lastGarbageCollect = time(); // avoid duplicate enqueues
+                               ( $this->asyncHandler )( $garbageCollector );
+                       } else {
+                               $garbageCollector();
+                       }
                }
        }
 
        public function expireAll() {
-               $this->deleteObjectsExpiringBefore( wfTimestampNow() );
+               $this->deleteObjectsExpiringBefore( time() );
        }
 
        public function deleteObjectsExpiringBefore(
                $timestamp,
-               $progressCallback = false,
+               callable $progressCallback = null,
                $limit = INF
        ) {
                /** @noinspection PhpUnusedLocalVariableInspection */
                $silenceScope = $this->silenceTransactionProfiler();
 
-               $count = 0;
-               for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
+               $serverIndexes = range( 0, $this->numServers - 1 );
+               shuffle( $serverIndexes );
+
+               $ok = true;
+
+               $keysDeletedCount = 0;
+               foreach ( $serverIndexes as $numServersDone => $serverIndex ) {
                        $db = null;
                        try {
                                $db = $this->getDB( $serverIndex );
-                               $dbTimestamp = $db->timestamp( $timestamp );
-                               $totalSeconds = false;
-                               $baseConds = [ 'exptime < ' . $db->addQuotes( $dbTimestamp ) ];
-                               for ( $i = 0; $i < $this->shards; $i++ ) {
-                                       $maxExpTime = false;
-                                       while ( true ) {
-                                               $conds = $baseConds;
-                                               if ( $maxExpTime !== false ) {
-                                                       $conds[] = 'exptime >= ' . $db->addQuotes( $maxExpTime );
-                                               }
-                                               $rows = $db->select(
-                                                       $this->getTableNameByShard( $i ),
-                                                       [ 'keyname', 'exptime' ],
-                                                       $conds,
-                                                       __METHOD__,
-                                                       [ 'LIMIT' => 100, 'ORDER BY' => 'exptime' ]
-                                               );
-                                               if ( $rows === false || !$rows->numRows() ) {
-                                                       break;
-                                               }
-                                               $keys = [];
-                                               $row = $rows->current();
-                                               $minExpTime = $row->exptime;
-                                               if ( $totalSeconds === false ) {
-                                                       $totalSeconds = wfTimestamp( TS_UNIX, $timestamp )
-                                                               - wfTimestamp( TS_UNIX, $minExpTime );
-                                               }
-                                               foreach ( $rows as $row ) {
-                                                       $keys[] = $row->keyname;
-                                                       $maxExpTime = $row->exptime;
-                                               }
-
-                                               $db->delete(
-                                                       $this->getTableNameByShard( $i ),
-                                                       [
-                                                               'exptime >= ' . $db->addQuotes( $minExpTime ),
-                                                               'exptime < ' . $db->addQuotes( $dbTimestamp ),
-                                                               'keyname' => $keys
-                                                       ],
-                                                       __METHOD__
-                                               );
-                                               $count += $db->affectedRows();
-                                               if ( $count >= $limit ) {
-                                                       return true;
-                                               }
-
-                                               if ( is_callable( $progressCallback ) ) {
-                                                       if ( intval( $totalSeconds ) === 0 ) {
-                                                               $percent = 0;
-                                                       } else {
-                                                               $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp )
-                                                                       - wfTimestamp( TS_UNIX, $maxExpTime );
-                                                               if ( $remainingSeconds > $totalSeconds ) {
-                                                                       $totalSeconds = $remainingSeconds;
-                                                               }
-                                                               $processedSeconds = $totalSeconds - $remainingSeconds;
-                                                               $percent = ( $i + $processedSeconds / $totalSeconds )
-                                                                       / $this->shards * 100;
-                                                       }
-                                                       $percent = ( $percent / $this->numServers )
-                                                               + ( $serverIndex / $this->numServers * 100 );
-                                                       call_user_func( $progressCallback, $percent );
-                                               }
-                                       }
-                               }
+                               $this->deleteServerObjectsExpiringBefore(
+                                       $db,
+                                       $timestamp,
+                                       $progressCallback,
+                                       $limit,
+                                       $numServersDone,
+                                       $keysDeletedCount
+                               );
                        } catch ( DBError $e ) {
                                $this->handleWriteError( $e, $db, $serverIndex );
-                               return false;
+                               $ok = false;
                        }
                }
 
-               return true;
+               return $ok;
+       }
+
+       /**
+        * @param IDatabase $db
+        * @param string|int $timestamp
+        * @param callable|null $progressCallback
+        * @param int $limit
+        * @param int $serversDoneCount
+        * @param int &$keysDeletedCount
+        * @throws DBError
+        */
+       private function deleteServerObjectsExpiringBefore(
+               IDatabase $db,
+               $timestamp,
+               $progressCallback,
+               $limit,
+               $serversDoneCount = 0,
+               &$keysDeletedCount = 0
+       ) {
+               $cutoffUnix = wfTimestamp( TS_UNIX, $timestamp );
+               $shardIndexes = range( 0, $this->shards - 1 );
+               shuffle( $shardIndexes );
+
+               foreach ( $shardIndexes as $numShardsDone => $shardIndex ) {
+                       $continue = null; // last exptime
+                       $lag = null; // purge lag
+                       do {
+                               $res = $db->select(
+                                       $this->getTableNameByShard( $shardIndex ),
+                                       [ 'keyname', 'exptime' ],
+                                       array_merge(
+                                               [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
+                                               $continue ? [ 'exptime >= ' . $db->addQuotes( $continue ) ] : []
+                                       ),
+                                       __METHOD__,
+                                       [ 'LIMIT' => min( $limit, 100 ), 'ORDER BY' => 'exptime' ]
+                               );
+
+                               if ( $res->numRows() ) {
+                                       $row = $res->current();
+                                       if ( $lag === null ) {
+                                               $lag = max( $cutoffUnix - wfTimestamp( TS_UNIX, $row->exptime ), 1 );
+                                       }
+
+                                       $keys = [];
+                                       foreach ( $res as $row ) {
+                                               $keys[] = $row->keyname;
+                                               $continue = $row->exptime;
+                                       }
+
+                                       $db->delete(
+                                               $this->getTableNameByShard( $shardIndex ),
+                                               [
+                                                       'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
+                                                       'keyname' => $keys
+                                               ],
+                                               __METHOD__
+                                       );
+                                       $keysDeletedCount += $db->affectedRows();
+                               }
+
+                               if ( is_callable( $progressCallback ) ) {
+                                       if ( $lag ) {
+                                               $remainingLag = $cutoffUnix - wfTimestamp( TS_UNIX, $continue );
+                                               $processedLag = max( $lag - $remainingLag, 0 );
+                                               $doneRatio = ( $numShardsDone + $processedLag / $lag ) / $this->shards;
+                                       } else {
+                                               $doneRatio = 1;
+                                       }
+
+                                       $overallRatio = ( $doneRatio / $this->numServers )
+                                               + ( $serversDoneCount / $this->numServers );
+                                       call_user_func( $progressCallback, $overallRatio * 100 );
+                               }
+                       } while ( $res->numRows() && $keysDeletedCount < $limit );
+               }
        }
 
        /**
@@ -763,6 +799,8 @@ class SqlBagOStuff extends BagOStuff {
                }
 
                list( $serverIndex ) = $this->getTableByKey( $key );
+
+               $db = null;
                try {
                        $db = $this->getDB( $serverIndex );
                        $ok = $db->lock( $key, __METHOD__, $timeout );
@@ -793,6 +831,8 @@ class SqlBagOStuff extends BagOStuff {
                        unset( $this->locks[$key] );
 
                        list( $serverIndex ) = $this->getTableByKey( $key );
+
+                       $db = null;
                        try {
                                $db = $this->getDB( $serverIndex );
                                $ok = $db->unlock( $key, __METHOD__ );