3 use Wikimedia\Rdbms\IDatabase
;
4 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface
;
5 use MediaWiki\Linker\LinkTarget
;
6 use MediaWiki\MediaWikiServices
;
7 use Wikimedia\Assert\Assert
;
8 use Wikimedia\ScopedCallback
;
9 use Wikimedia\Rdbms\LoadBalancer
;
12 * Storage layer class for WatchedItems.
13 * Database interaction & caching
14 * TODO caching should be factored out into a CachingWatchedItemStore class
16 * Uses database because this uses User::isAnon
23 class WatchedItemStore
implements WatchedItemStoreInterface
, StatsdAwareInterface
{
28 private $loadBalancer;
33 private $readOnlyMode;
41 * @var array[] Looks like $cacheIndex[Namespace ID][Target DB Key][User Id] => 'key'
42 * The index is needed so that on mass changes all relevant items can be un-cached.
43 * For example: Clearing a users watchlist of all items or updating notification timestamps
44 * for all users watching a single target.
46 private $cacheIndex = [];
51 private $deferredUpdatesAddCallableUpdateCallback;
56 private $revisionGetTimestampFromIdCallback;
59 * @var StatsdDataFactoryInterface
64 * @param LoadBalancer $loadBalancer
65 * @param HashBagOStuff $cache
66 * @param ReadOnlyMode $readOnlyMode
68 public function __construct(
69 LoadBalancer
$loadBalancer,
71 ReadOnlyMode
$readOnlyMode
73 $this->loadBalancer
= $loadBalancer;
74 $this->cache
= $cache;
75 $this->readOnlyMode
= $readOnlyMode;
76 $this->stats
= new NullStatsdDataFactory();
77 $this->deferredUpdatesAddCallableUpdateCallback
= [ DeferredUpdates
::class, 'addCallableUpdate' ];
78 $this->revisionGetTimestampFromIdCallback
= [ Revision
::class, 'getTimestampFromId' ];
82 * @param StatsdDataFactoryInterface $stats
84 public function setStatsdDataFactory( StatsdDataFactoryInterface
$stats ) {
85 $this->stats
= $stats;
89 * Overrides the DeferredUpdates::addCallableUpdate callback
90 * This is intended for use while testing and will fail if MW_PHPUNIT_TEST is not defined.
92 * @param callable $callback
94 * @see DeferredUpdates::addCallableUpdate for callback signiture
96 * @return ScopedCallback to reset the overridden value
99 public function overrideDeferredUpdatesAddCallableUpdateCallback( callable
$callback ) {
100 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
101 throw new MWException(
102 'Cannot override DeferredUpdates::addCallableUpdate callback in operation.'
105 $previousValue = $this->deferredUpdatesAddCallableUpdateCallback
;
106 $this->deferredUpdatesAddCallableUpdateCallback
= $callback;
107 return new ScopedCallback( function () use ( $previousValue ) {
108 $this->deferredUpdatesAddCallableUpdateCallback
= $previousValue;
113 * Overrides the Revision::getTimestampFromId callback
114 * This is intended for use while testing and will fail if MW_PHPUNIT_TEST is not defined.
116 * @param callable $callback
117 * @see Revision::getTimestampFromId for callback signiture
119 * @return ScopedCallback to reset the overridden value
120 * @throws MWException
122 public function overrideRevisionGetTimestampFromIdCallback( callable
$callback ) {
123 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
124 throw new MWException(
125 'Cannot override Revision::getTimestampFromId callback in operation.'
128 $previousValue = $this->revisionGetTimestampFromIdCallback
;
129 $this->revisionGetTimestampFromIdCallback
= $callback;
130 return new ScopedCallback( function () use ( $previousValue ) {
131 $this->revisionGetTimestampFromIdCallback
= $previousValue;
135 private function getCacheKey( User
$user, LinkTarget
$target ) {
136 return $this->cache
->makeKey(
137 (string)$target->getNamespace(),
139 (string)$user->getId()
143 private function cache( WatchedItem
$item ) {
144 $user = $item->getUser();
145 $target = $item->getLinkTarget();
146 $key = $this->getCacheKey( $user, $target );
147 $this->cache
->set( $key, $item );
148 $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()][$user->getId()] = $key;
149 $this->stats
->increment( 'WatchedItemStore.cache' );
152 private function uncache( User
$user, LinkTarget
$target ) {
153 $this->cache
->delete( $this->getCacheKey( $user, $target ) );
154 unset( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()][$user->getId()] );
155 $this->stats
->increment( 'WatchedItemStore.uncache' );
158 private function uncacheLinkTarget( LinkTarget
$target ) {
159 $this->stats
->increment( 'WatchedItemStore.uncacheLinkTarget' );
160 if ( !isset( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()] ) ) {
163 foreach ( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()] as $key ) {
164 $this->stats
->increment( 'WatchedItemStore.uncacheLinkTarget.items' );
165 $this->cache
->delete( $key );
169 private function uncacheUser( User
$user ) {
170 $this->stats
->increment( 'WatchedItemStore.uncacheUser' );
171 foreach ( $this->cacheIndex
as $ns => $dbKeyArray ) {
172 foreach ( $dbKeyArray as $dbKey => $userArray ) {
173 if ( isset( $userArray[$user->getId()] ) ) {
174 $this->stats
->increment( 'WatchedItemStore.uncacheUser.items' );
175 $this->cache
->delete( $userArray[$user->getId()] );
183 * @param LinkTarget $target
185 * @return WatchedItem|false
187 private function getCached( User
$user, LinkTarget
$target ) {
188 return $this->cache
->get( $this->getCacheKey( $user, $target ) );
192 * Return an array of conditions to select or update the appropriate database
196 * @param LinkTarget $target
200 private function dbCond( User
$user, LinkTarget
$target ) {
202 'wl_user' => $user->getId(),
203 'wl_namespace' => $target->getNamespace(),
204 'wl_title' => $target->getDBkey(),
209 * @param int $dbIndex DB_MASTER or DB_REPLICA
212 * @throws MWException
214 private function getConnectionRef( $dbIndex ) {
215 return $this->loadBalancer
->getConnectionRef( $dbIndex, [ 'watchlist' ] );
219 * Queues a job that will clear the users watchlist using the Job Queue.
225 public function clearUserWatchedItemsUsingJobQueue( User
$user ) {
226 $job = ClearUserWatchlistJob
::newForUser( $user, $this->getMaxId() );
228 JobQueueGroup
::singleton()->push( $job );
233 * @return int The maximum current wl_id
235 public function getMaxId() {
236 $dbr = $this->getConnectionRef( DB_REPLICA
);
237 return (int)$dbr->selectField(
250 public function countWatchedItems( User
$user ) {
251 $dbr = $this->getConnectionRef( DB_REPLICA
);
252 $return = (int)$dbr->selectField(
256 'wl_user' => $user->getId()
266 * @param LinkTarget $target
269 public function countWatchers( LinkTarget
$target ) {
270 $dbr = $this->getConnectionRef( DB_REPLICA
);
271 $return = (int)$dbr->selectField(
275 'wl_namespace' => $target->getNamespace(),
276 'wl_title' => $target->getDBkey(),
286 * @param LinkTarget $target
287 * @param string|int $threshold
290 public function countVisitingWatchers( LinkTarget
$target, $threshold ) {
291 $dbr = $this->getConnectionRef( DB_REPLICA
);
292 $visitingWatchers = (int)$dbr->selectField(
296 'wl_namespace' => $target->getNamespace(),
297 'wl_title' => $target->getDBkey(),
298 'wl_notificationtimestamp >= ' .
299 $dbr->addQuotes( $dbr->timestamp( $threshold ) ) .
300 ' OR wl_notificationtimestamp IS NULL'
305 return $visitingWatchers;
310 * @param LinkTarget[] $targets
311 * @param array $options
314 public function countWatchersMultiple( array $targets, array $options = [] ) {
315 $dbOptions = [ 'GROUP BY' => [ 'wl_namespace', 'wl_title' ] ];
317 $dbr = $this->getConnectionRef( DB_REPLICA
);
319 if ( array_key_exists( 'minimumWatchers', $options ) ) {
320 $dbOptions['HAVING'] = 'COUNT(*) >= ' . (int)$options['minimumWatchers'];
323 $lb = new LinkBatch( $targets );
326 [ 'wl_title', 'wl_namespace', 'watchers' => 'COUNT(*)' ],
327 [ $lb->constructSet( 'wl', $dbr ) ],
333 foreach ( $targets as $linkTarget ) {
334 $watchCounts[$linkTarget->getNamespace()][$linkTarget->getDBkey()] = 0;
337 foreach ( $res as $row ) {
338 $watchCounts[$row->wl_namespace
][$row->wl_title
] = (int)$row->watchers
;
346 * @param array $targetsWithVisitThresholds
347 * @param int|null $minimumWatchers
350 public function countVisitingWatchersMultiple(
351 array $targetsWithVisitThresholds,
352 $minimumWatchers = null
354 $dbr = $this->getConnectionRef( DB_REPLICA
);
356 $conds = $this->getVisitingWatchersCondition( $dbr, $targetsWithVisitThresholds );
358 $dbOptions = [ 'GROUP BY' => [ 'wl_namespace', 'wl_title' ] ];
359 if ( $minimumWatchers !== null ) {
360 $dbOptions['HAVING'] = 'COUNT(*) >= ' . (int)$minimumWatchers;
364 [ 'wl_namespace', 'wl_title', 'watchers' => 'COUNT(*)' ],
371 foreach ( $targetsWithVisitThresholds as list( $target ) ) {
372 /* @var LinkTarget $target */
373 $watcherCounts[$target->getNamespace()][$target->getDBkey()] = 0;
376 foreach ( $res as $row ) {
377 $watcherCounts[$row->wl_namespace
][$row->wl_title
] = (int)$row->watchers
;
380 return $watcherCounts;
384 * Generates condition for the query used in a batch count visiting watchers.
386 * @param IDatabase $db
387 * @param array $targetsWithVisitThresholds array of pairs (LinkTarget, last visit threshold)
390 private function getVisitingWatchersCondition(
392 array $targetsWithVisitThresholds
394 $missingTargets = [];
395 $namespaceConds = [];
396 foreach ( $targetsWithVisitThresholds as list( $target, $threshold ) ) {
397 if ( $threshold === null ) {
398 $missingTargets[] = $target;
401 /* @var LinkTarget $target */
402 $namespaceConds[$target->getNamespace()][] = $db->makeList( [
403 'wl_title = ' . $db->addQuotes( $target->getDBkey() ),
405 'wl_notificationtimestamp >= ' . $db->addQuotes( $db->timestamp( $threshold ) ),
406 'wl_notificationtimestamp IS NULL'
412 foreach ( $namespaceConds as $namespace => $pageConds ) {
413 $conds[] = $db->makeList( [
414 'wl_namespace = ' . $namespace,
415 '(' . $db->makeList( $pageConds, LIST_OR
) . ')'
419 if ( $missingTargets ) {
420 $lb = new LinkBatch( $missingTargets );
421 $conds[] = $lb->constructSet( 'wl', $db );
424 return $db->makeList( $conds, LIST_OR
);
430 * @param LinkTarget $target
433 public function getWatchedItem( User
$user, LinkTarget
$target ) {
434 if ( $user->isAnon() ) {
438 $cached = $this->getCached( $user, $target );
440 $this->stats
->increment( 'WatchedItemStore.getWatchedItem.cached' );
443 $this->stats
->increment( 'WatchedItemStore.getWatchedItem.load' );
444 return $this->loadWatchedItem( $user, $target );
450 * @param LinkTarget $target
453 public function loadWatchedItem( User
$user, LinkTarget
$target ) {
454 // Only loggedin user can have a watchlist
455 if ( $user->isAnon() ) {
459 $dbr = $this->getConnectionRef( DB_REPLICA
);
460 $row = $dbr->selectRow(
462 'wl_notificationtimestamp',
463 $this->dbCond( $user, $target ),
471 $item = new WatchedItem(
474 wfTimestampOrNull( TS_MW
, $row->wl_notificationtimestamp
)
476 $this->cache( $item );
484 * @param array $options
485 * @return WatchedItem[]
487 public function getWatchedItemsForUser( User
$user, array $options = [] ) {
488 $options +
= [ 'forWrite' => false ];
491 if ( array_key_exists( 'sort', $options ) ) {
493 ( in_array( $options['sort'], [ self
::SORT_ASC
, self
::SORT_DESC
] ) ),
494 '$options[\'sort\']',
495 'must be SORT_ASC or SORT_DESC'
497 $dbOptions['ORDER BY'] = [
498 "wl_namespace {$options['sort']}",
499 "wl_title {$options['sort']}"
502 $db = $this->getConnectionRef( $options['forWrite'] ? DB_MASTER
: DB_REPLICA
);
506 [ 'wl_namespace', 'wl_title', 'wl_notificationtimestamp' ],
507 [ 'wl_user' => $user->getId() ],
513 foreach ( $res as $row ) {
514 // @todo: Should we add these to the process cache?
515 $watchedItems[] = new WatchedItem(
517 new TitleValue( (int)$row->wl_namespace
, $row->wl_title
),
518 $row->wl_notificationtimestamp
522 return $watchedItems;
528 * @param LinkTarget $target
531 public function isWatched( User
$user, LinkTarget
$target ) {
532 return (bool)$this->getWatchedItem( $user, $target );
538 * @param LinkTarget[] $targets
541 public function getNotificationTimestampsBatch( User
$user, array $targets ) {
543 foreach ( $targets as $target ) {
544 $timestamps[$target->getNamespace()][$target->getDBkey()] = false;
547 if ( $user->isAnon() ) {
552 foreach ( $targets as $target ) {
553 $cachedItem = $this->getCached( $user, $target );
555 $timestamps[$target->getNamespace()][$target->getDBkey()] =
556 $cachedItem->getNotificationTimestamp();
558 $targetsToLoad[] = $target;
562 if ( !$targetsToLoad ) {
566 $dbr = $this->getConnectionRef( DB_REPLICA
);
568 $lb = new LinkBatch( $targetsToLoad );
571 [ 'wl_namespace', 'wl_title', 'wl_notificationtimestamp' ],
573 $lb->constructSet( 'wl', $dbr ),
574 'wl_user' => $user->getId(),
579 foreach ( $res as $row ) {
580 $timestamps[$row->wl_namespace
][$row->wl_title
] =
581 wfTimestampOrNull( TS_MW
, $row->wl_notificationtimestamp
);
590 * @param LinkTarget $target
592 public function addWatch( User
$user, LinkTarget
$target ) {
593 $this->addWatchBatchForUser( $user, [ $target ] );
599 * @param LinkTarget[] $targets
602 public function addWatchBatchForUser( User
$user, array $targets ) {
603 if ( $this->readOnlyMode
->isReadOnly() ) {
606 // Only loggedin user can have a watchlist
607 if ( $user->isAnon() ) {
617 foreach ( $targets as $target ) {
619 'wl_user' => $user->getId(),
620 'wl_namespace' => $target->getNamespace(),
621 'wl_title' => $target->getDBkey(),
622 'wl_notificationtimestamp' => null,
624 $items[] = new WatchedItem(
629 $this->uncache( $user, $target );
632 $dbw = $this->getConnectionRef( DB_MASTER
);
633 foreach ( array_chunk( $rows, 100 ) as $toInsert ) {
634 // Use INSERT IGNORE to avoid overwriting the notification timestamp
635 // if there's already an entry for this page
636 $dbw->insert( 'watchlist', $toInsert, __METHOD__
, 'IGNORE' );
638 // Update process cache to ensure skin doesn't claim that the current
639 // page is unwatched in the response of action=watch itself (T28292).
640 // This would otherwise be re-queried from a slave by isWatched().
641 foreach ( $items as $item ) {
642 $this->cache( $item );
651 * @param LinkTarget $target
654 public function removeWatch( User
$user, LinkTarget
$target ) {
655 // Only logged in user can have a watchlist
656 if ( $this->readOnlyMode
->isReadOnly() ||
$user->isAnon() ) {
660 $this->uncache( $user, $target );
662 $dbw = $this->getConnectionRef( DB_MASTER
);
663 $dbw->delete( 'watchlist',
665 'wl_user' => $user->getId(),
666 'wl_namespace' => $target->getNamespace(),
667 'wl_title' => $target->getDBkey(),
670 $success = (bool)$dbw->affectedRows();
678 * @param string|int $timestamp
679 * @param LinkTarget[] $targets
682 public function setNotificationTimestampsForUser( User
$user, $timestamp, array $targets = [] ) {
683 // Only loggedin user can have a watchlist
684 if ( $user->isAnon() ) {
688 $dbw = $this->getConnectionRef( DB_MASTER
);
690 $conds = [ 'wl_user' => $user->getId() ];
692 $batch = new LinkBatch( $targets );
693 $conds[] = $batch->constructSet( 'wl', $dbw );
696 if ( $timestamp !== null ) {
697 $timestamp = $dbw->timestamp( $timestamp );
700 $success = $dbw->update(
702 [ 'wl_notificationtimestamp' => $timestamp ],
707 $this->uncacheUser( $user );
714 * @param User $editor
715 * @param LinkTarget $target
716 * @param string|int $timestamp
719 public function updateNotificationTimestamp( User
$editor, LinkTarget
$target, $timestamp ) {
720 $dbw = $this->getConnectionRef( DB_MASTER
);
721 $uids = $dbw->selectFieldValues(
725 'wl_user != ' . intval( $editor->getId() ),
726 'wl_namespace' => $target->getNamespace(),
727 'wl_title' => $target->getDBkey(),
728 'wl_notificationtimestamp IS NULL',
733 $watchers = array_map( 'intval', $uids );
735 // Update wl_notificationtimestamp for all watching users except the editor
737 DeferredUpdates
::addCallableUpdate(
738 function () use ( $timestamp, $watchers, $target, $fname ) {
739 global $wgUpdateRowsPerQuery;
741 $dbw = $this->getConnectionRef( DB_MASTER
);
742 $factory = MediaWikiServices
::getInstance()->getDBLoadBalancerFactory();
743 $ticket = $factory->getEmptyTransactionTicket( __METHOD__
);
745 $watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery );
746 foreach ( $watchersChunks as $watchersChunk ) {
747 $dbw->update( 'watchlist',
749 'wl_notificationtimestamp' => $dbw->timestamp( $timestamp )
750 ], [ /* WHERE - TODO Use wl_id T130067 */
751 'wl_user' => $watchersChunk,
752 'wl_namespace' => $target->getNamespace(),
753 'wl_title' => $target->getDBkey(),
756 if ( count( $watchersChunks ) > 1 ) {
757 $factory->commitAndWaitForReplication(
758 __METHOD__
, $ticket, [ 'domain' => $dbw->getDomainID() ]
762 $this->uncacheLinkTarget( $target );
764 DeferredUpdates
::POSTSEND
,
775 * @param Title $title
776 * @param string $force
780 public function resetNotificationTimestamp( User
$user, Title
$title, $force = '', $oldid = 0 ) {
781 // Only loggedin user can have a watchlist
782 if ( $this->readOnlyMode
->isReadOnly() ||
$user->isAnon() ) {
787 if ( $force != 'force' ) {
788 $item = $this->loadWatchedItem( $user, $title );
789 if ( !$item ||
$item->getNotificationTimestamp() === null ) {
794 // If the page is watched by the user (or may be watched), update the timestamp
795 $job = new ActivityUpdateJob(
798 'type' => 'updateWatchlistNotification',
799 'userid' => $user->getId(),
800 'notifTime' => $this->getNotificationTimestamp( $user, $title, $item, $force, $oldid ),
805 // Try to run this post-send
806 // Calls DeferredUpdates::addCallableUpdate in normal operation
808 $this->deferredUpdatesAddCallableUpdateCallback
,
809 function () use ( $job ) {
814 $this->uncache( $user, $title );
819 private function getNotificationTimestamp( User
$user, Title
$title, $item, $force, $oldid ) {
821 // No oldid given, assuming latest revision; clear the timestamp.
825 if ( !$title->getNextRevisionID( $oldid ) ) {
826 // Oldid given and is the latest revision for this title; clear the timestamp.
830 if ( $item === null ) {
831 $item = $this->loadWatchedItem( $user, $title );
835 // This can only happen if $force is enabled.
839 // Oldid given and isn't the latest; update the timestamp.
840 // This will result in no further notification emails being sent!
841 // Calls Revision::getTimestampFromId in normal operation
842 $notificationTimestamp = call_user_func(
843 $this->revisionGetTimestampFromIdCallback
,
848 // We need to go one second to the future because of various strict comparisons
849 // throughout the codebase
850 $ts = new MWTimestamp( $notificationTimestamp );
851 $ts->timestamp
->add( new DateInterval( 'PT1S' ) );
852 $notificationTimestamp = $ts->getTimestamp( TS_MW
);
854 if ( $notificationTimestamp < $item->getNotificationTimestamp() ) {
855 if ( $force != 'force' ) {
858 // This is a little silly…
859 return $item->getNotificationTimestamp();
863 return $notificationTimestamp;
869 * @param int|null $unreadLimit
872 public function countUnreadNotifications( User
$user, $unreadLimit = null ) {
874 if ( $unreadLimit !== null ) {
875 $unreadLimit = (int)$unreadLimit;
876 $queryOptions['LIMIT'] = $unreadLimit;
879 $dbr = $this->getConnectionRef( DB_REPLICA
);
880 $rowCount = $dbr->selectRowCount(
884 'wl_user' => $user->getId(),
885 'wl_notificationtimestamp IS NOT NULL',
891 if ( !isset( $unreadLimit ) ) {
895 if ( $rowCount >= $unreadLimit ) {
904 * @param LinkTarget $oldTarget
905 * @param LinkTarget $newTarget
907 public function duplicateAllAssociatedEntries( LinkTarget
$oldTarget, LinkTarget
$newTarget ) {
908 $oldTarget = Title
::newFromLinkTarget( $oldTarget );
909 $newTarget = Title
::newFromLinkTarget( $newTarget );
911 $this->duplicateEntry( $oldTarget->getSubjectPage(), $newTarget->getSubjectPage() );
912 $this->duplicateEntry( $oldTarget->getTalkPage(), $newTarget->getTalkPage() );
917 * @param LinkTarget $oldTarget
918 * @param LinkTarget $newTarget
920 public function duplicateEntry( LinkTarget
$oldTarget, LinkTarget
$newTarget ) {
921 $dbw = $this->getConnectionRef( DB_MASTER
);
923 $result = $dbw->select(
925 [ 'wl_user', 'wl_notificationtimestamp' ],
927 'wl_namespace' => $oldTarget->getNamespace(),
928 'wl_title' => $oldTarget->getDBkey(),
934 $newNamespace = $newTarget->getNamespace();
935 $newDBkey = $newTarget->getDBkey();
937 # Construct array to replace into the watchlist
939 foreach ( $result as $row ) {
941 'wl_user' => $row->wl_user
,
942 'wl_namespace' => $newNamespace,
943 'wl_title' => $newDBkey,
944 'wl_notificationtimestamp' => $row->wl_notificationtimestamp
,
948 if ( !empty( $values ) ) {
950 # Note that multi-row replace is very efficient for MySQL but may be inefficient for
951 # some other DBMSes, mostly due to poor simulation by us
954 [ [ 'wl_user', 'wl_namespace', 'wl_title' ] ],