From 7c12727fff020b7154875c992f942214a7bbb2bf Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 2 Mar 2018 16:21:36 -0800 Subject: [PATCH] Stash WatchedItem changes so that the jobs run from the queue Previously, the jobs ran at post-send instead of via runners. The stash is still written to immediately and will quickly replicate to all datacenters. Since the stash and DB data is merged on access, users should still almost always see their changes on the next request. There is still the small chance of unusually high latency at the in-memory stash layer, which should be rare. Also inject the JobQueueGroup into the store instance instead of adding more singleton references. Bug: T188801 Change-Id: Ic1c2b5a3592469b5b8386012a9c3365fdcf5b8e1 --- includes/ServiceWiring.php | 5 +- .../watcheditem/NoWriteWatchedItemStore.php | 3 + .../watcheditem/WatchedItemQueryService.php | 21 +- .../WatchedItemQueryServiceExtension.php | 4 +- includes/watcheditem/WatchedItemStore.php | 143 +++++++-- .../watcheditem/WatchedItemStoreInterface.php | 14 + .../WatchedItemQueryServiceUnitTest.php | 19 +- .../WatchedItemStoreIntegrationTest.php | 7 + .../watcheditem/WatchedItemStoreUnitTest.php | 289 +++++++++++------- 9 files changed, 354 insertions(+), 151 deletions(-) diff --git a/includes/ServiceWiring.php b/includes/ServiceWiring.php index 46dd9133b1..e5f891e312 100644 --- a/includes/ServiceWiring.php +++ b/includes/ServiceWiring.php @@ -598,13 +598,16 @@ return [ return new WatchedItemQueryService( $services->getDBLoadBalancer(), $services->getCommentStore(), - $services->getActorMigration() + $services->getActorMigration(), + $services->getWatchedItemStore() ); }, 'WatchedItemStore' => function ( MediaWikiServices $services ) : WatchedItemStore { $store = new WatchedItemStore( $services->getDBLoadBalancerFactory(), + JobQueueGroup::singleton(), + $services->getMainObjectStash(), new HashBagOStuff( [ 'maxKeys' => 100 ] ), $services->getReadOnlyMode(), $services->getMainConfig()->get( 'UpdateRowsPerQuery' ) diff --git a/includes/watcheditem/NoWriteWatchedItemStore.php b/includes/watcheditem/NoWriteWatchedItemStore.php index 28012078d0..39d7a5d2f8 100644 --- a/includes/watcheditem/NoWriteWatchedItemStore.php +++ b/includes/watcheditem/NoWriteWatchedItemStore.php @@ -152,4 +152,7 @@ class NoWriteWatchedItemStore implements WatchedItemStoreInterface { throw new DBReadOnlyError( null, self::DB_READONLY_ERROR ); } + public function getLatestNotificationTimestamp( $timestamp, User $user, LinkTarget $target ) { + return wfTimestampOrNull( TS_MW, $timestamp ); + } } diff --git a/includes/watcheditem/WatchedItemQueryService.php b/includes/watcheditem/WatchedItemQueryService.php index a85e7e8f10..f44d6fd8d7 100644 --- a/includes/watcheditem/WatchedItemQueryService.php +++ b/includes/watcheditem/WatchedItemQueryService.php @@ -65,14 +65,19 @@ class WatchedItemQueryService { /** @var ActorMigration */ private $actorMigration; + /** @var WatchedItemStoreInterface */ + private $watchedItemStore; + public function __construct( LoadBalancer $loadBalancer, CommentStore $commentStore, - ActorMigration $actorMigration + ActorMigration $actorMigration, + WatchedItemStoreInterface $watchedItemStore ) { $this->loadBalancer = $loadBalancer; $this->commentStore = $commentStore; $this->actorMigration = $actorMigration; + $this->watchedItemStore = $watchedItemStore; } /** @@ -228,11 +233,14 @@ class WatchedItemQueryService { break; } + $target = new TitleValue( (int)$row->rc_namespace, $row->rc_title ); $items[] = [ new WatchedItem( $user, - new TitleValue( (int)$row->rc_namespace, $row->rc_title ), - $row->wl_notificationtimestamp + $target, + $this->watchedItemStore->getLatestNotificationTimestamp( + $row->wl_notificationtimestamp, $user, $target + ) ), $this->getRecentChangeFieldsFromRow( $row ) ]; @@ -307,11 +315,14 @@ class WatchedItemQueryService { $watchedItems = []; foreach ( $res as $row ) { + $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title ); // todo these could all be cached at some point? $watchedItems[] = new WatchedItem( $user, - new TitleValue( (int)$row->wl_namespace, $row->wl_title ), - $row->wl_notificationtimestamp + $target, + $this->watchedItemStore->getLatestNotificationTimestamp( + $row->wl_notificationtimestamp, $user, $target + ) ); } diff --git a/includes/watcheditem/WatchedItemQueryServiceExtension.php b/includes/watcheditem/WatchedItemQueryServiceExtension.php index 873ae2d76c..a0e64c5d12 100644 --- a/includes/watcheditem/WatchedItemQueryServiceExtension.php +++ b/includes/watcheditem/WatchedItemQueryServiceExtension.php @@ -1,6 +1,6 @@ 'key' * The index is needed so that on mass changes all relevant items can be un-cached. @@ -68,18 +83,24 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac /** * @param ILBFactory $lbFactory + * @param JobQueueGroup $queueGroup + * @param BagOStuff $stash * @param HashBagOStuff $cache * @param ReadOnlyMode $readOnlyMode * @param int $updateRowsPerQuery */ public function __construct( ILBFactory $lbFactory, + JobQueueGroup $queueGroup, + BagOStuff $stash, HashBagOStuff $cache, ReadOnlyMode $readOnlyMode, $updateRowsPerQuery ) { $this->lbFactory = $lbFactory; $this->loadBalancer = $lbFactory->getMainLB(); + $this->queueGroup = $queueGroup; + $this->stash = $stash; $this->cache = $cache; $this->readOnlyMode = $readOnlyMode; $this->stats = new NullStatsdDataFactory(); @@ -88,6 +109,8 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac $this->revisionGetTimestampFromIdCallback = [ Revision::class, 'getTimestampFromId' ]; $this->updateRowsPerQuery = $updateRowsPerQuery; + + $this->latestUpdateCache = new HashBagOStuff( [ 'maxKeys' => 3 ] ); } /** @@ -286,8 +309,7 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac */ public function clearUserWatchedItemsUsingJobQueue( User $user ) { $job = ClearUserWatchlistJob::newForUser( $user, $this->getMaxId() ); - // TODO inject me. - JobQueueGroup::singleton()->push( $job ); + $this->queueGroup->push( $job ); } /** @@ -568,6 +590,7 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac } $dbr = $this->getConnectionRef( DB_REPLICA ); + $row = $dbr->selectRow( 'watchlist', 'wl_notificationtimestamp', @@ -582,7 +605,7 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac $item = new WatchedItem( $user, $target, - wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp ) + $this->getLatestNotificationTimestamp( $row->wl_notificationtimestamp, $user, $target ) ); $this->cache( $item ); @@ -622,11 +645,13 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac $watchedItems = []; foreach ( $res as $row ) { + $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title ); // @todo: Should we add these to the process cache? $watchedItems[] = new WatchedItem( $user, new TitleValue( (int)$row->wl_namespace, $row->wl_title ), - $row->wl_notificationtimestamp + $this->getLatestNotificationTimestamp( + $row->wl_notificationtimestamp, $user, $target ) ); } @@ -688,8 +713,10 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac ); foreach ( $res as $row ) { + $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title ); $timestamps[$row->wl_namespace][$row->wl_title] = - wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp ); + $this->getLatestNotificationTimestamp( + $row->wl_notificationtimestamp, $user, $target ); } return $timestamps; @@ -802,7 +829,7 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac $timestamp = $dbw->timestamp( $timestamp ); } - $success = $dbw->update( + $dbw->update( 'watchlist', [ 'wl_notificationtimestamp' => $timestamp ], $conds, @@ -811,7 +838,25 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac $this->uncacheUser( $user ); - return $success; + return true; + } + + public function getLatestNotificationTimestamp( $timestamp, User $user, LinkTarget $target ) { + $timestamp = wfTimestampOrNull( TS_MW, $timestamp ); + if ( $timestamp === null ) { + return null; // no notification + } + + $seenTimestamps = $this->getPageSeenTimestamps( $user ); + if ( + $seenTimestamps && + $seenTimestamps->get( $this->getPageSeenKey( $target ) ) >= $timestamp + ) { + // If a reset job did not yet run, then the "seen" timestamp will be higher + return null; + } + + return $timestamp; } public function resetAllNotificationTimestampsForUser( User $user ) { @@ -902,6 +947,8 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac * @return bool */ public function resetNotificationTimestamp( User $user, Title $title, $force = '', $oldid = 0 ) { + $time = time(); + // Only loggedin user can have a watchlist if ( $this->readOnlyMode->isReadOnly() || $user->isAnon() ) { return false; @@ -919,6 +966,20 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac } } + // Mark the item as read immediately in lightweight storage + $this->stash->merge( + $this->getPageSeenTimestampsKey( $user ), + function ( $cache, $key, $current ) use ( $time, $title ) { + $value = $current ?: new MapCacheLRU( 300 ); + $value->set( $this->getPageSeenKey( $title ), wfTimestamp( TS_MW, $time ) ); + + $this->latestUpdateCache->set( $key, $value, IExpiringStore::TTL_PROC_LONG ); + + return $value; + }, + IExpiringStore::TTL_HOUR + ); + // If the page is watched by the user (or may be watched), update the timestamp $job = new ActivityUpdateJob( $title, @@ -926,22 +987,51 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac 'type' => 'updateWatchlistNotification', 'userid' => $user->getId(), 'notifTime' => $this->getNotificationTimestamp( $user, $title, $item, $force, $oldid ), - 'curTime' => time() + 'curTime' => $time ] ); + // Try to enqueue this post-send + $this->queueGroup->lazyPush( $job ); - // Try to run this post-send - // Calls DeferredUpdates::addCallableUpdate in normal operation - call_user_func( - $this->deferredUpdatesAddCallableUpdateCallback, - function () use ( $job ) { - $job->run(); + $this->uncache( $user, $title ); + + return true; + } + + /** + * @param User $user + * @return MapCacheLRU|null + */ + private function getPageSeenTimestamps( User $user ) { + $key = $this->getPageSeenTimestampsKey( $user ); + + return $this->latestUpdateCache->getWithSetCallback( + $key, + IExpiringStore::TTL_PROC_LONG, + function () use ( $key ) { + return $this->stash->get( $key ) ?: null; } ); + } - $this->uncache( $user, $title ); + /** + * @param User $user + * @return string + */ + private function getPageSeenTimestampsKey( User $user ) { + return $this->stash->makeGlobalKey( + 'watchlist-recent-updates', + $this->lbFactory->getLocalDomainID(), + $user->getId() + ); + } - return true; + /** + * @param LinkTarget $target + * @return string + */ + private function getPageSeenKey( LinkTarget $target ) { + return "{$target->getNamespace()}:{$target->getDBkey()}"; } private function getNotificationTimestamp( User $user, Title $title, $item, $force, $oldid ) { @@ -998,25 +1088,22 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac * @return int|bool */ public function countUnreadNotifications( User $user, $unreadLimit = null ) { + $dbr = $this->getConnectionRef( DB_REPLICA ); + $queryOptions = []; if ( $unreadLimit !== null ) { $unreadLimit = (int)$unreadLimit; $queryOptions['LIMIT'] = $unreadLimit; } - $dbr = $this->getConnectionRef( DB_REPLICA ); - $rowCount = $dbr->selectRowCount( - 'watchlist', - '1', - [ - 'wl_user' => $user->getId(), - 'wl_notificationtimestamp IS NOT NULL', - ], - __METHOD__, - $queryOptions - ); + $conds = [ + 'wl_user' => $user->getId(), + 'wl_notificationtimestamp IS NOT NULL' + ]; + + $rowCount = $dbr->selectRowCount( 'watchlist', '1', $conds, __METHOD__, $queryOptions ); - if ( !isset( $unreadLimit ) ) { + if ( $unreadLimit === null ) { return $rowCount; } diff --git a/includes/watcheditem/WatchedItemStoreInterface.php b/includes/watcheditem/WatchedItemStoreInterface.php index 274d3f4812..349d98a231 100644 --- a/includes/watcheditem/WatchedItemStoreInterface.php +++ b/includes/watcheditem/WatchedItemStoreInterface.php @@ -326,4 +326,18 @@ interface WatchedItemStoreInterface { */ public function removeWatchBatchForUser( User $user, array $targets ); + /** + * Convert $timestamp to TS_MW or return null if the page was visited since then by $user + * + * Use this only on single-user methods (having higher read-after-write expectations) + * and not in places involving arbitrary batches of different users + * + * Usage of this method should be limited to WatchedItem* classes + * + * @param string|null $timestamp Value of wl_notificationtimestamp from the DB + * @param User $user + * @param LinkTarget $target + * @return string TS_MW timestamp or null + */ + public function getLatestNotificationTimestamp( $timestamp, User $user, LinkTarget $target ); } diff --git a/tests/phpunit/includes/watcheditem/WatchedItemQueryServiceUnitTest.php b/tests/phpunit/includes/watcheditem/WatchedItemQueryServiceUnitTest.php index 50e6c202f4..17694b0eda 100644 --- a/tests/phpunit/includes/watcheditem/WatchedItemQueryServiceUnitTest.php +++ b/tests/phpunit/includes/watcheditem/WatchedItemQueryServiceUnitTest.php @@ -72,7 +72,8 @@ class WatchedItemQueryServiceUnitTest extends MediaWikiTestCase { return new WatchedItemQueryService( $this->getMockLoadBalancer( $mockDb ), $this->getMockCommentStore(), - $this->getMockActorMigration() + $this->getMockActorMigration(), + $this->getMockWatchedItemStore() ); } @@ -139,6 +140,22 @@ class WatchedItemQueryServiceUnitTest extends MediaWikiTestCase { return $mock; } + /** + * @param PHPUnit_Framework_MockObject_MockObject|Database $mockDb + * @return PHPUnit_Framework_MockObject_MockObject|WatchedItemStore + */ + private function getMockWatchedItemStore() { + $mock = $this->getMockBuilder( WatchedItemStore::class ) + ->disableOriginalConstructor() + ->getMock(); + $mock->expects( $this->any() ) + ->method( 'getLatestNotificationTimestamp' ) + ->will( $this->returnCallback( function ( $timestamp ) { + return $timestamp; + } ) ); + return $mock; + } + /** * @param int $id * @return PHPUnit_Framework_MockObject_MockObject|User diff --git a/tests/phpunit/includes/watcheditem/WatchedItemStoreIntegrationTest.php b/tests/phpunit/includes/watcheditem/WatchedItemStoreIntegrationTest.php index 3102929ec7..6a383a2211 100644 --- a/tests/phpunit/includes/watcheditem/WatchedItemStoreIntegrationTest.php +++ b/tests/phpunit/includes/watcheditem/WatchedItemStoreIntegrationTest.php @@ -167,6 +167,13 @@ class WatchedItemStoreIntegrationTest extends MediaWikiTestCase { [ $title->getNamespace() => [ $title->getDBkey() => null ] ], $store->getNotificationTimestampsBatch( $user, [ $title ] ) ); + + // Run the job queue + JobQueueGroup::destroySingletons(); + $jobs = new RunJobs; + $jobs->loadParamsAndArgs( null, [ 'quiet' => true ], null ); + $jobs->execute(); + $this->assertEquals( $initialVisitingWatchers, $store->countVisitingWatchers( $title, '20150202020202' ) diff --git a/tests/phpunit/includes/watcheditem/WatchedItemStoreUnitTest.php b/tests/phpunit/includes/watcheditem/WatchedItemStoreUnitTest.php index 240b3f520f..280ad907f3 100644 --- a/tests/phpunit/includes/watcheditem/WatchedItemStoreUnitTest.php +++ b/tests/phpunit/includes/watcheditem/WatchedItemStoreUnitTest.php @@ -59,6 +59,26 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { return $mock; } + /** + * @return PHPUnit_Framework_MockObject_MockObject|JobQueueGroup + */ + private function getMockJobQueueGroup() { + $mock = $this->getMockBuilder( JobQueueGroup::class ) + ->disableOriginalConstructor() + ->getMock(); + $mock->expects( $this->any() ) + ->method( 'push' ) + ->will( $this->returnCallback( function ( Job $job ) { + $job->run(); + } ) ); + $mock->expects( $this->any() ) + ->method( 'lazyPush' ) + ->will( $this->returnCallback( function ( Job $job ) { + $job->run(); + } ) ); + return $mock; + } + /** * @return PHPUnit_Framework_MockObject_MockObject|HashBagOStuff */ @@ -118,11 +138,16 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { return $fakeRow; } - private function newWatchedItemStore( LBFactory $lbFactory, HashBagOStuff $cache, + private function newWatchedItemStore( + LBFactory $lbFactory, + JobQueueGroup $queueGroup, + HashBagOStuff $cache, ReadOnlyMode $readOnlyMode ) { return new WatchedItemStore( $lbFactory, + $queueGroup, + new HashBagOStuff(), $cache, $readOnlyMode, 1000 @@ -161,6 +186,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -193,6 +219,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -223,6 +250,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -254,6 +282,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -306,6 +335,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -373,6 +403,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -422,6 +453,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -504,6 +536,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -609,6 +642,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -663,6 +697,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -701,6 +736,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -736,6 +772,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -774,6 +811,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -805,6 +843,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $this->getMockCache(), $this->getMockReadOnlyMode() ); @@ -864,6 +903,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -911,6 +951,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1005,6 +1046,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1038,6 +1080,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1059,6 +1102,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1072,6 +1116,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { public function testAddWatchBatchForUser_readOnlyDBReturnsFalse() { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $this->getMockDb() ), + $this->getMockJobQueueGroup(), $this->getMockCache(), $this->getMockReadOnlyMode( true ) ); @@ -1122,6 +1167,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1147,6 +1193,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1171,6 +1218,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1206,6 +1254,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1241,6 +1290,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1264,6 +1314,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1313,6 +1364,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1364,6 +1416,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1389,6 +1442,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1434,6 +1488,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1469,6 +1524,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1507,6 +1563,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1531,6 +1588,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1572,6 +1630,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1623,6 +1682,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $mockLoadBalancer, + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1637,6 +1697,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { public function testGetWatchedItemsForUser_badSortOptionThrowsException() { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $this->getMockDb() ), + $this->getMockJobQueueGroup(), $this->getMockCache(), $this->getMockReadOnlyMode() ); @@ -1679,6 +1740,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1716,6 +1778,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1740,6 +1803,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1808,6 +1872,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1859,6 +1924,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1921,6 +1987,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1962,6 +2029,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -1989,6 +2057,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -2014,6 +2083,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -2048,6 +2118,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -2092,29 +2163,26 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { ->method( 'delete' ) ->with( '0:SomeDbKey:1' ); + $mockQueueGroup = $this->getMockJobQueueGroup(); + $mockQueueGroup->expects( $this->once() ) + ->method( 'lazyPush' ) + ->willReturnCallback( function ( ActivityUpdateJob $job ) { + // don't run + } ); + $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $mockQueueGroup, $mockCache, $this->getMockReadOnlyMode() ); - // Note: This does not actually assert the job is correct - $callableCallCounter = 0; - $mockCallback = function ( $callable ) use ( &$callableCallCounter ) { - $callableCallCounter++; - $this->assertInternalType( 'callable', $callable ); - }; - $scopedOverride = $store->overrideDeferredUpdatesAddCallableUpdateCallback( $mockCallback ); - $this->assertTrue( $store->resetNotificationTimestamp( $user, $title ) ); - $this->assertEquals( 1, $callableCallCounter ); - - ScopedCallback::consume( $scopedOverride ); } public function testResetNotificationTimestamp_noItemForced() { @@ -2132,19 +2200,19 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { ->method( 'delete' ) ->with( '0:SomeDbKey:1' ); + $mockQueueGroup = $this->getMockJobQueueGroup(); $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $mockQueueGroup, $mockCache, $this->getMockReadOnlyMode() ); - // Note: This does not actually assert the job is correct - $callableCallCounter = 0; - $mockCallback = function ( $callable ) use ( &$callableCallCounter ) { - $callableCallCounter++; - $this->assertInternalType( 'callable', $callable ); - }; - $scopedOverride = $store->overrideDeferredUpdatesAddCallableUpdateCallback( $mockCallback ); + $mockQueueGroup->expects( $this->any() ) + ->method( 'lazyPush' ) + ->will( $this->returnCallback( function ( ActivityUpdateJob $job ) { + // don't run + } ) ); $this->assertTrue( $store->resetNotificationTimestamp( @@ -2153,9 +2221,6 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { 'force' ) ); - $this->assertEquals( 1, $callableCallCounter ); - - ScopedCallback::consume( $scopedOverride ); } /** @@ -2179,20 +2244,11 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { } private function verifyCallbackJob( - $callback, + ActivityUpdateJob $job, LinkTarget $expectedTitle, $expectedUserId, callable $notificationTimestampCondition ) { - $this->assertInternalType( 'callable', $callback ); - - $callbackReflector = new ReflectionFunction( $callback ); - $vars = $callbackReflector->getStaticVariables(); - $this->assertArrayHasKey( 'job', $vars ); - $this->assertInstanceOf( ActivityUpdateJob::class, $vars['job'] ); - - /** @var ActivityUpdateJob $job */ - $job = $vars['job']; $this->assertEquals( $expectedTitle->getDBkey(), $job->getTitle()->getDBkey() ); $this->assertEquals( $expectedTitle->getNamespace(), $job->getTitle()->getNamespace() ); @@ -2225,26 +2281,28 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { ->method( 'delete' ) ->with( '0:SomeTitle:1' ); + $mockQueueGroup = $this->getMockJobQueueGroup(); $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $mockQueueGroup, $mockCache, $this->getMockReadOnlyMode() ); - $callableCallCounter = 0; - $scopedOverride = $store->overrideDeferredUpdatesAddCallableUpdateCallback( - function ( $callable ) use ( &$callableCallCounter, $title, $user ) { - $callableCallCounter++; - $this->verifyCallbackJob( - $callable, - $title, - $user->getId(), - function ( $time ) { - return $time === null; - } - ); - } - ); + $mockQueueGroup->expects( $this->any() ) + ->method( 'lazyPush' ) + ->will( $this->returnCallback( + function ( ActivityUpdateJob $job ) use ( $title, $user ) { + $this->verifyCallbackJob( + $job, + $title, + $user->getId(), + function ( $time ) { + return $time === null; + } + ); + } + ) ); $this->assertTrue( $store->resetNotificationTimestamp( @@ -2254,9 +2312,6 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $oldid ) ); - $this->assertEquals( 1, $callableCallCounter ); - - ScopedCallback::consume( $scopedOverride ); } public function testResetNotificationTimestamp_oldidSpecifiedNotLatestRevisionForced() { @@ -2293,26 +2348,28 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { ->method( 'delete' ) ->with( '0:SomeDbKey:1' ); + $mockQueueGroup = $this->getMockJobQueueGroup(); $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $mockQueueGroup, $mockCache, $this->getMockReadOnlyMode() ); - $addUpdateCallCounter = 0; - $scopedOverrideDeferred = $store->overrideDeferredUpdatesAddCallableUpdateCallback( - function ( $callable ) use ( &$addUpdateCallCounter, $title, $user ) { - $addUpdateCallCounter++; - $this->verifyCallbackJob( - $callable, - $title, - $user->getId(), - function ( $time ) { - return $time !== null && $time > '20151212010101'; - } - ); - } - ); + $mockQueueGroup->expects( $this->any() ) + ->method( 'lazyPush' ) + ->will( $this->returnCallback( + function ( ActivityUpdateJob $job ) use ( $title, $user ) { + $this->verifyCallbackJob( + $job, + $title, + $user->getId(), + function ( $time ) { + return $time !== null && $time > '20151212010101'; + } + ); + } + ) ); $getTimestampCallCounter = 0; $scopedOverrideRevision = $store->overrideRevisionGetTimestampFromIdCallback( @@ -2331,10 +2388,8 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $oldid ) ); - $this->assertEquals( 1, $addUpdateCallCounter ); $this->assertEquals( 1, $getTimestampCallCounter ); - ScopedCallback::consume( $scopedOverrideDeferred ); ScopedCallback::consume( $scopedOverrideRevision ); } @@ -2368,26 +2423,28 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { ->method( 'delete' ) ->with( '0:SomeDbKey:1' ); + $mockQueueGroup = $this->getMockJobQueueGroup(); $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $mockQueueGroup, $mockCache, $this->getMockReadOnlyMode() ); - $callableCallCounter = 0; - $scopedOverride = $store->overrideDeferredUpdatesAddCallableUpdateCallback( - function ( $callable ) use ( &$callableCallCounter, $title, $user ) { - $callableCallCounter++; - $this->verifyCallbackJob( - $callable, - $title, - $user->getId(), - function ( $time ) { - return $time === null; - } - ); - } - ); + $mockQueueGroup->expects( $this->any() ) + ->method( 'lazyPush' ) + ->will( $this->returnCallback( + function ( ActivityUpdateJob $job ) use ( $title, $user ) { + $this->verifyCallbackJob( + $job, + $title, + $user->getId(), + function ( $time ) { + return $time === null; + } + ); + } + ) ); $this->assertTrue( $store->resetNotificationTimestamp( @@ -2397,9 +2454,6 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $oldid ) ); - $this->assertEquals( 1, $callableCallCounter ); - - ScopedCallback::consume( $scopedOverride ); } public function testResetNotificationTimestamp_futureNotificationTimestampForced() { @@ -2436,26 +2490,28 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { ->method( 'delete' ) ->with( '0:SomeDbKey:1' ); + $mockQueueGroup = $this->getMockJobQueueGroup(); $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $mockQueueGroup, $mockCache, $this->getMockReadOnlyMode() ); - $addUpdateCallCounter = 0; - $scopedOverrideDeferred = $store->overrideDeferredUpdatesAddCallableUpdateCallback( - function ( $callable ) use ( &$addUpdateCallCounter, $title, $user ) { - $addUpdateCallCounter++; - $this->verifyCallbackJob( - $callable, - $title, - $user->getId(), - function ( $time ) { - return $time === '30151212010101'; - } - ); - } - ); + $mockQueueGroup->expects( $this->any() ) + ->method( 'lazyPush' ) + ->will( $this->returnCallback( + function ( ActivityUpdateJob $job ) use ( $title, $user ) { + $this->verifyCallbackJob( + $job, + $title, + $user->getId(), + function ( $time ) { + return $time === '30151212010101'; + } + ); + } + ) ); $getTimestampCallCounter = 0; $scopedOverrideRevision = $store->overrideRevisionGetTimestampFromIdCallback( @@ -2474,10 +2530,8 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $oldid ) ); - $this->assertEquals( 1, $addUpdateCallCounter ); $this->assertEquals( 1, $getTimestampCallCounter ); - ScopedCallback::consume( $scopedOverrideDeferred ); ScopedCallback::consume( $scopedOverrideRevision ); } @@ -2515,26 +2569,28 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { ->method( 'delete' ) ->with( '0:SomeDbKey:1' ); + $mockQueueGroup = $this->getMockJobQueueGroup(); $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $mockQueueGroup, $mockCache, $this->getMockReadOnlyMode() ); - $addUpdateCallCounter = 0; - $scopedOverrideDeferred = $store->overrideDeferredUpdatesAddCallableUpdateCallback( - function ( $callable ) use ( &$addUpdateCallCounter, $title, $user ) { - $addUpdateCallCounter++; - $this->verifyCallbackJob( - $callable, - $title, - $user->getId(), - function ( $time ) { - return $time === false; - } - ); - } - ); + $mockQueueGroup->expects( $this->any() ) + ->method( 'lazyPush' ) + ->will( $this->returnCallback( + function ( ActivityUpdateJob $job ) use ( $title, $user ) { + $this->verifyCallbackJob( + $job, + $title, + $user->getId(), + function ( $time ) { + return $time === false; + } + ); + } + ) ); $getTimestampCallCounter = 0; $scopedOverrideRevision = $store->overrideRevisionGetTimestampFromIdCallback( @@ -2553,16 +2609,15 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $oldid ) ); - $this->assertEquals( 1, $addUpdateCallCounter ); $this->assertEquals( 1, $getTimestampCallCounter ); - ScopedCallback::consume( $scopedOverrideDeferred ); ScopedCallback::consume( $scopedOverrideRevision ); } public function testSetNotificationTimestampsForUser_anonUser() { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $this->getMockDb() ), + $this->getMockJobQueueGroup(), $this->getMockCache(), $this->getMockReadOnlyMode() ); @@ -2590,6 +2645,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $this->getMockCache(), $this->getMockReadOnlyMode() ); @@ -2620,6 +2676,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $this->getMockCache(), $this->getMockReadOnlyMode() ); @@ -2659,6 +2716,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $this->getMockCache(), $this->getMockReadOnlyMode() ); @@ -2702,6 +2760,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -2743,6 +2802,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); @@ -2787,6 +2847,7 @@ class WatchedItemStoreUnitTest extends MediaWikiTestCase { $store = $this->newWatchedItemStore( $this->getMockLBFactory( $mockDb ), + $this->getMockJobQueueGroup(), $mockCache, $this->getMockReadOnlyMode() ); -- 2.20.1