3 use Wikimedia\Rdbms\IDatabase
;
4 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface
;
5 use MediaWiki\Linker\LinkTarget
;
6 use MediaWiki\MediaWikiServices
;
7 use Wikimedia\Assert\Assert
;
8 use Wikimedia\ScopedCallback
;
9 use Wikimedia\Rdbms\LoadBalancer
;
12 * Storage layer class for WatchedItems.
13 * Database interaction & caching
14 * TODO caching should be factored out into a CachingWatchedItemStore class
16 * Uses database because this uses User::isAnon
23 class WatchedItemStore
implements WatchedItemStoreInterface
, StatsdAwareInterface
{
28 private $loadBalancer;
33 private $readOnlyMode;
41 * @var array[] Looks like $cacheIndex[Namespace ID][Target DB Key][User Id] => 'key'
42 * The index is needed so that on mass changes all relevant items can be un-cached.
43 * For example: Clearing a users watchlist of all items or updating notification timestamps
44 * for all users watching a single target.
46 private $cacheIndex = [];
51 private $deferredUpdatesAddCallableUpdateCallback;
56 private $revisionGetTimestampFromIdCallback;
59 * @var StatsdDataFactoryInterface
64 * @param LoadBalancer $loadBalancer
65 * @param HashBagOStuff $cache
66 * @param ReadOnlyMode $readOnlyMode
68 public function __construct(
69 LoadBalancer
$loadBalancer,
71 ReadOnlyMode
$readOnlyMode
73 $this->loadBalancer
= $loadBalancer;
74 $this->cache
= $cache;
75 $this->readOnlyMode
= $readOnlyMode;
76 $this->stats
= new NullStatsdDataFactory();
77 $this->deferredUpdatesAddCallableUpdateCallback
= [ 'DeferredUpdates', 'addCallableUpdate' ];
78 $this->revisionGetTimestampFromIdCallback
= [ 'Revision', 'getTimestampFromId' ];
81 public function setStatsdDataFactory( StatsdDataFactoryInterface
$stats ) {
82 $this->stats
= $stats;
86 * Overrides the DeferredUpdates::addCallableUpdate callback
87 * This is intended for use while testing and will fail if MW_PHPUNIT_TEST is not defined.
89 * @param callable $callback
91 * @see DeferredUpdates::addCallableUpdate for callback signiture
93 * @return ScopedCallback to reset the overridden value
96 public function overrideDeferredUpdatesAddCallableUpdateCallback( callable
$callback ) {
97 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
98 throw new MWException(
99 'Cannot override DeferredUpdates::addCallableUpdate callback in operation.'
102 $previousValue = $this->deferredUpdatesAddCallableUpdateCallback
;
103 $this->deferredUpdatesAddCallableUpdateCallback
= $callback;
104 return new ScopedCallback( function () use ( $previousValue ) {
105 $this->deferredUpdatesAddCallableUpdateCallback
= $previousValue;
110 * Overrides the Revision::getTimestampFromId callback
111 * This is intended for use while testing and will fail if MW_PHPUNIT_TEST is not defined.
113 * @param callable $callback
114 * @see Revision::getTimestampFromId for callback signiture
116 * @return ScopedCallback to reset the overridden value
117 * @throws MWException
119 public function overrideRevisionGetTimestampFromIdCallback( callable
$callback ) {
120 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
121 throw new MWException(
122 'Cannot override Revision::getTimestampFromId callback in operation.'
125 $previousValue = $this->revisionGetTimestampFromIdCallback
;
126 $this->revisionGetTimestampFromIdCallback
= $callback;
127 return new ScopedCallback( function () use ( $previousValue ) {
128 $this->revisionGetTimestampFromIdCallback
= $previousValue;
132 private function getCacheKey( User
$user, LinkTarget
$target ) {
133 return $this->cache
->makeKey(
134 (string)$target->getNamespace(),
136 (string)$user->getId()
140 private function cache( WatchedItem
$item ) {
141 $user = $item->getUser();
142 $target = $item->getLinkTarget();
143 $key = $this->getCacheKey( $user, $target );
144 $this->cache
->set( $key, $item );
145 $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()][$user->getId()] = $key;
146 $this->stats
->increment( 'WatchedItemStore.cache' );
149 private function uncache( User
$user, LinkTarget
$target ) {
150 $this->cache
->delete( $this->getCacheKey( $user, $target ) );
151 unset( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()][$user->getId()] );
152 $this->stats
->increment( 'WatchedItemStore.uncache' );
155 private function uncacheLinkTarget( LinkTarget
$target ) {
156 $this->stats
->increment( 'WatchedItemStore.uncacheLinkTarget' );
157 if ( !isset( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()] ) ) {
160 foreach ( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()] as $key ) {
161 $this->stats
->increment( 'WatchedItemStore.uncacheLinkTarget.items' );
162 $this->cache
->delete( $key );
166 private function uncacheUser( User
$user ) {
167 $this->stats
->increment( 'WatchedItemStore.uncacheUser' );
168 foreach ( $this->cacheIndex
as $ns => $dbKeyArray ) {
169 foreach ( $dbKeyArray as $dbKey => $userArray ) {
170 if ( isset( $userArray[$user->getId()] ) ) {
171 $this->stats
->increment( 'WatchedItemStore.uncacheUser.items' );
172 $this->cache
->delete( $userArray[$user->getId()] );
180 * @param LinkTarget $target
182 * @return WatchedItem|false
184 private function getCached( User
$user, LinkTarget
$target ) {
185 return $this->cache
->get( $this->getCacheKey( $user, $target ) );
189 * Return an array of conditions to select or update the appropriate database
193 * @param LinkTarget $target
197 private function dbCond( User
$user, LinkTarget
$target ) {
199 'wl_user' => $user->getId(),
200 'wl_namespace' => $target->getNamespace(),
201 'wl_title' => $target->getDBkey(),
206 * @param int $dbIndex DB_MASTER or DB_REPLICA
209 * @throws MWException
211 private function getConnectionRef( $dbIndex ) {
212 return $this->loadBalancer
->getConnectionRef( $dbIndex, [ 'watchlist' ] );
216 * Queues a job that will clear the users watchlist using the Job Queue.
222 public function clearUserWatchedItemsUsingJobQueue( User
$user ) {
223 $job = ClearUserWatchlistJob
::newForUser( $user, $this->getMaxId() );
225 JobQueueGroup
::singleton()->push( $job );
230 * @return int The maximum current wl_id
232 public function getMaxId() {
233 $dbr = $this->getConnectionRef( DB_REPLICA
);
234 return (int)$dbr->selectField(
245 public function countWatchedItems( User
$user ) {
246 $dbr = $this->getConnectionRef( DB_REPLICA
);
247 $return = (int)$dbr->selectField(
251 'wl_user' => $user->getId()
262 public function countWatchers( LinkTarget
$target ) {
263 $dbr = $this->getConnectionRef( DB_REPLICA
);
264 $return = (int)$dbr->selectField(
268 'wl_namespace' => $target->getNamespace(),
269 'wl_title' => $target->getDBkey(),
280 public function countVisitingWatchers( LinkTarget
$target, $threshold ) {
281 $dbr = $this->getConnectionRef( DB_REPLICA
);
282 $visitingWatchers = (int)$dbr->selectField(
286 'wl_namespace' => $target->getNamespace(),
287 'wl_title' => $target->getDBkey(),
288 'wl_notificationtimestamp >= ' .
289 $dbr->addQuotes( $dbr->timestamp( $threshold ) ) .
290 ' OR wl_notificationtimestamp IS NULL'
295 return $visitingWatchers;
301 public function countWatchersMultiple( array $targets, array $options = [] ) {
302 $dbOptions = [ 'GROUP BY' => [ 'wl_namespace', 'wl_title' ] ];
304 $dbr = $this->getConnectionRef( DB_REPLICA
);
306 if ( array_key_exists( 'minimumWatchers', $options ) ) {
307 $dbOptions['HAVING'] = 'COUNT(*) >= ' . (int)$options['minimumWatchers'];
310 $lb = new LinkBatch( $targets );
313 [ 'wl_title', 'wl_namespace', 'watchers' => 'COUNT(*)' ],
314 [ $lb->constructSet( 'wl', $dbr ) ],
320 foreach ( $targets as $linkTarget ) {
321 $watchCounts[$linkTarget->getNamespace()][$linkTarget->getDBkey()] = 0;
324 foreach ( $res as $row ) {
325 $watchCounts[$row->wl_namespace
][$row->wl_title
] = (int)$row->watchers
;
334 public function countVisitingWatchersMultiple(
335 array $targetsWithVisitThresholds,
336 $minimumWatchers = null
338 $dbr = $this->getConnectionRef( DB_REPLICA
);
340 $conds = $this->getVisitingWatchersCondition( $dbr, $targetsWithVisitThresholds );
342 $dbOptions = [ 'GROUP BY' => [ 'wl_namespace', 'wl_title' ] ];
343 if ( $minimumWatchers !== null ) {
344 $dbOptions['HAVING'] = 'COUNT(*) >= ' . (int)$minimumWatchers;
348 [ 'wl_namespace', 'wl_title', 'watchers' => 'COUNT(*)' ],
355 foreach ( $targetsWithVisitThresholds as list( $target ) ) {
356 /* @var LinkTarget $target */
357 $watcherCounts[$target->getNamespace()][$target->getDBkey()] = 0;
360 foreach ( $res as $row ) {
361 $watcherCounts[$row->wl_namespace
][$row->wl_title
] = (int)$row->watchers
;
364 return $watcherCounts;
368 * Generates condition for the query used in a batch count visiting watchers.
370 * @param IDatabase $db
371 * @param array $targetsWithVisitThresholds array of pairs (LinkTarget, last visit threshold)
374 private function getVisitingWatchersCondition(
376 array $targetsWithVisitThresholds
378 $missingTargets = [];
379 $namespaceConds = [];
380 foreach ( $targetsWithVisitThresholds as list( $target, $threshold ) ) {
381 if ( $threshold === null ) {
382 $missingTargets[] = $target;
385 /* @var LinkTarget $target */
386 $namespaceConds[$target->getNamespace()][] = $db->makeList( [
387 'wl_title = ' . $db->addQuotes( $target->getDBkey() ),
389 'wl_notificationtimestamp >= ' . $db->addQuotes( $db->timestamp( $threshold ) ),
390 'wl_notificationtimestamp IS NULL'
396 foreach ( $namespaceConds as $namespace => $pageConds ) {
397 $conds[] = $db->makeList( [
398 'wl_namespace = ' . $namespace,
399 '(' . $db->makeList( $pageConds, LIST_OR
) . ')'
403 if ( $missingTargets ) {
404 $lb = new LinkBatch( $missingTargets );
405 $conds[] = $lb->constructSet( 'wl', $db );
408 return $db->makeList( $conds, LIST_OR
);
414 public function getWatchedItem( User
$user, LinkTarget
$target ) {
415 if ( $user->isAnon() ) {
419 $cached = $this->getCached( $user, $target );
421 $this->stats
->increment( 'WatchedItemStore.getWatchedItem.cached' );
424 $this->stats
->increment( 'WatchedItemStore.getWatchedItem.load' );
425 return $this->loadWatchedItem( $user, $target );
431 public function loadWatchedItem( User
$user, LinkTarget
$target ) {
432 // Only loggedin user can have a watchlist
433 if ( $user->isAnon() ) {
437 $dbr = $this->getConnectionRef( DB_REPLICA
);
438 $row = $dbr->selectRow(
440 'wl_notificationtimestamp',
441 $this->dbCond( $user, $target ),
449 $item = new WatchedItem(
452 wfTimestampOrNull( TS_MW
, $row->wl_notificationtimestamp
)
454 $this->cache( $item );
462 public function getWatchedItemsForUser( User
$user, array $options = [] ) {
463 $options +
= [ 'forWrite' => false ];
466 if ( array_key_exists( 'sort', $options ) ) {
468 ( in_array( $options['sort'], [ self
::SORT_ASC
, self
::SORT_DESC
] ) ),
469 '$options[\'sort\']',
470 'must be SORT_ASC or SORT_DESC'
472 $dbOptions['ORDER BY'] = [
473 "wl_namespace {$options['sort']}",
474 "wl_title {$options['sort']}"
477 $db = $this->getConnectionRef( $options['forWrite'] ? DB_MASTER
: DB_REPLICA
);
481 [ 'wl_namespace', 'wl_title', 'wl_notificationtimestamp' ],
482 [ 'wl_user' => $user->getId() ],
488 foreach ( $res as $row ) {
489 // @todo: Should we add these to the process cache?
490 $watchedItems[] = new WatchedItem(
492 new TitleValue( (int)$row->wl_namespace
, $row->wl_title
),
493 $row->wl_notificationtimestamp
497 return $watchedItems;
503 public function isWatched( User
$user, LinkTarget
$target ) {
504 return (bool)$this->getWatchedItem( $user, $target );
510 public function getNotificationTimestampsBatch( User
$user, array $targets ) {
512 foreach ( $targets as $target ) {
513 $timestamps[$target->getNamespace()][$target->getDBkey()] = false;
516 if ( $user->isAnon() ) {
521 foreach ( $targets as $target ) {
522 $cachedItem = $this->getCached( $user, $target );
524 $timestamps[$target->getNamespace()][$target->getDBkey()] =
525 $cachedItem->getNotificationTimestamp();
527 $targetsToLoad[] = $target;
531 if ( !$targetsToLoad ) {
535 $dbr = $this->getConnectionRef( DB_REPLICA
);
537 $lb = new LinkBatch( $targetsToLoad );
540 [ 'wl_namespace', 'wl_title', 'wl_notificationtimestamp' ],
542 $lb->constructSet( 'wl', $dbr ),
543 'wl_user' => $user->getId(),
548 foreach ( $res as $row ) {
549 $timestamps[$row->wl_namespace
][$row->wl_title
] =
550 wfTimestampOrNull( TS_MW
, $row->wl_notificationtimestamp
);
559 public function addWatch( User
$user, LinkTarget
$target ) {
560 $this->addWatchBatchForUser( $user, [ $target ] );
566 public function addWatchBatchForUser( User
$user, array $targets ) {
567 if ( $this->readOnlyMode
->isReadOnly() ) {
570 // Only loggedin user can have a watchlist
571 if ( $user->isAnon() ) {
581 foreach ( $targets as $target ) {
583 'wl_user' => $user->getId(),
584 'wl_namespace' => $target->getNamespace(),
585 'wl_title' => $target->getDBkey(),
586 'wl_notificationtimestamp' => null,
588 $items[] = new WatchedItem(
593 $this->uncache( $user, $target );
596 $dbw = $this->getConnectionRef( DB_MASTER
);
597 foreach ( array_chunk( $rows, 100 ) as $toInsert ) {
598 // Use INSERT IGNORE to avoid overwriting the notification timestamp
599 // if there's already an entry for this page
600 $dbw->insert( 'watchlist', $toInsert, __METHOD__
, 'IGNORE' );
602 // Update process cache to ensure skin doesn't claim that the current
603 // page is unwatched in the response of action=watch itself (T28292).
604 // This would otherwise be re-queried from a slave by isWatched().
605 foreach ( $items as $item ) {
606 $this->cache( $item );
615 public function removeWatch( User
$user, LinkTarget
$target ) {
616 // Only logged in user can have a watchlist
617 if ( $this->readOnlyMode
->isReadOnly() ||
$user->isAnon() ) {
621 $this->uncache( $user, $target );
623 $dbw = $this->getConnectionRef( DB_MASTER
);
624 $dbw->delete( 'watchlist',
626 'wl_user' => $user->getId(),
627 'wl_namespace' => $target->getNamespace(),
628 'wl_title' => $target->getDBkey(),
631 $success = (bool)$dbw->affectedRows();
639 public function setNotificationTimestampsForUser( User
$user, $timestamp, array $targets = [] ) {
640 // Only loggedin user can have a watchlist
641 if ( $user->isAnon() ) {
645 $dbw = $this->getConnectionRef( DB_MASTER
);
647 $conds = [ 'wl_user' => $user->getId() ];
649 $batch = new LinkBatch( $targets );
650 $conds[] = $batch->constructSet( 'wl', $dbw );
653 if ( $timestamp !== null ) {
654 $timestamp = $dbw->timestamp( $timestamp );
657 $success = $dbw->update(
659 [ 'wl_notificationtimestamp' => $timestamp ],
664 $this->uncacheUser( $user );
672 public function updateNotificationTimestamp( User
$editor, LinkTarget
$target, $timestamp ) {
673 $dbw = $this->getConnectionRef( DB_MASTER
);
674 $uids = $dbw->selectFieldValues(
678 'wl_user != ' . intval( $editor->getId() ),
679 'wl_namespace' => $target->getNamespace(),
680 'wl_title' => $target->getDBkey(),
681 'wl_notificationtimestamp IS NULL',
686 $watchers = array_map( 'intval', $uids );
688 // Update wl_notificationtimestamp for all watching users except the editor
690 DeferredUpdates
::addCallableUpdate(
691 function () use ( $timestamp, $watchers, $target, $fname ) {
692 global $wgUpdateRowsPerQuery;
694 $dbw = $this->getConnectionRef( DB_MASTER
);
695 $factory = MediaWikiServices
::getInstance()->getDBLoadBalancerFactory();
696 $ticket = $factory->getEmptyTransactionTicket( __METHOD__
);
698 $watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery );
699 foreach ( $watchersChunks as $watchersChunk ) {
700 $dbw->update( 'watchlist',
702 'wl_notificationtimestamp' => $dbw->timestamp( $timestamp )
703 ], [ /* WHERE - TODO Use wl_id T130067 */
704 'wl_user' => $watchersChunk,
705 'wl_namespace' => $target->getNamespace(),
706 'wl_title' => $target->getDBkey(),
709 if ( count( $watchersChunks ) > 1 ) {
710 $factory->commitAndWaitForReplication(
711 __METHOD__
, $ticket, [ 'domain' => $dbw->getDomainID() ]
715 $this->uncacheLinkTarget( $target );
717 DeferredUpdates
::POSTSEND
,
728 public function resetNotificationTimestamp( User
$user, Title
$title, $force = '', $oldid = 0 ) {
729 // Only loggedin user can have a watchlist
730 if ( $this->readOnlyMode
->isReadOnly() ||
$user->isAnon() ) {
735 if ( $force != 'force' ) {
736 $item = $this->loadWatchedItem( $user, $title );
737 if ( !$item ||
$item->getNotificationTimestamp() === null ) {
742 // If the page is watched by the user (or may be watched), update the timestamp
743 $job = new ActivityUpdateJob(
746 'type' => 'updateWatchlistNotification',
747 'userid' => $user->getId(),
748 'notifTime' => $this->getNotificationTimestamp( $user, $title, $item, $force, $oldid ),
753 // Try to run this post-send
754 // Calls DeferredUpdates::addCallableUpdate in normal operation
756 $this->deferredUpdatesAddCallableUpdateCallback
,
757 function () use ( $job ) {
762 $this->uncache( $user, $title );
767 private function getNotificationTimestamp( User
$user, Title
$title, $item, $force, $oldid ) {
769 // No oldid given, assuming latest revision; clear the timestamp.
773 if ( !$title->getNextRevisionID( $oldid ) ) {
774 // Oldid given and is the latest revision for this title; clear the timestamp.
778 if ( $item === null ) {
779 $item = $this->loadWatchedItem( $user, $title );
783 // This can only happen if $force is enabled.
787 // Oldid given and isn't the latest; update the timestamp.
788 // This will result in no further notification emails being sent!
789 // Calls Revision::getTimestampFromId in normal operation
790 $notificationTimestamp = call_user_func(
791 $this->revisionGetTimestampFromIdCallback
,
796 // We need to go one second to the future because of various strict comparisons
797 // throughout the codebase
798 $ts = new MWTimestamp( $notificationTimestamp );
799 $ts->timestamp
->add( new DateInterval( 'PT1S' ) );
800 $notificationTimestamp = $ts->getTimestamp( TS_MW
);
802 if ( $notificationTimestamp < $item->getNotificationTimestamp() ) {
803 if ( $force != 'force' ) {
806 // This is a little silly…
807 return $item->getNotificationTimestamp();
811 return $notificationTimestamp;
817 public function countUnreadNotifications( User
$user, $unreadLimit = null ) {
819 if ( $unreadLimit !== null ) {
820 $unreadLimit = (int)$unreadLimit;
821 $queryOptions['LIMIT'] = $unreadLimit;
824 $dbr = $this->getConnectionRef( DB_REPLICA
);
825 $rowCount = $dbr->selectRowCount(
829 'wl_user' => $user->getId(),
830 'wl_notificationtimestamp IS NOT NULL',
836 if ( !isset( $unreadLimit ) ) {
840 if ( $rowCount >= $unreadLimit ) {
850 public function duplicateAllAssociatedEntries( LinkTarget
$oldTarget, LinkTarget
$newTarget ) {
851 $oldTarget = Title
::newFromLinkTarget( $oldTarget );
852 $newTarget = Title
::newFromLinkTarget( $newTarget );
854 $this->duplicateEntry( $oldTarget->getSubjectPage(), $newTarget->getSubjectPage() );
855 $this->duplicateEntry( $oldTarget->getTalkPage(), $newTarget->getTalkPage() );
861 public function duplicateEntry( LinkTarget
$oldTarget, LinkTarget
$newTarget ) {
862 $dbw = $this->getConnectionRef( DB_MASTER
);
864 $result = $dbw->select(
866 [ 'wl_user', 'wl_notificationtimestamp' ],
868 'wl_namespace' => $oldTarget->getNamespace(),
869 'wl_title' => $oldTarget->getDBkey(),
875 $newNamespace = $newTarget->getNamespace();
876 $newDBkey = $newTarget->getDBkey();
878 # Construct array to replace into the watchlist
880 foreach ( $result as $row ) {
882 'wl_user' => $row->wl_user
,
883 'wl_namespace' => $newNamespace,
884 'wl_title' => $newDBkey,
885 'wl_notificationtimestamp' => $row->wl_notificationtimestamp
,
889 if ( !empty( $values ) ) {
891 # Note that multi-row replace is very efficient for MySQL but may be inefficient for
892 # some other DBMSes, mostly due to poor simulation by us
895 [ [ 'wl_user', 'wl_namespace', 'wl_title' ] ],