3 use Wikimedia\Rdbms\IDatabase
;
4 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface
;
5 use MediaWiki\Linker\LinkTarget
;
6 use MediaWiki\MediaWikiServices
;
7 use Wikimedia\Assert\Assert
;
8 use Wikimedia\ScopedCallback
;
9 use Wikimedia\Rdbms\LoadBalancer
;
12 * Storage layer class for WatchedItems.
13 * Database interaction & caching
14 * TODO caching should be factored out into a CachingWatchedItemStore class
16 * Uses database because this uses User::isAnon
23 class WatchedItemStore
implements WatchedItemStoreInterface
, StatsdAwareInterface
{
28 private $loadBalancer;
33 private $readOnlyMode;
41 * @var array[] Looks like $cacheIndex[Namespace ID][Target DB Key][User Id] => 'key'
42 * The index is needed so that on mass changes all relevant items can be un-cached.
43 * For example: Clearing a users watchlist of all items or updating notification timestamps
44 * for all users watching a single target.
46 private $cacheIndex = [];
51 private $deferredUpdatesAddCallableUpdateCallback;
56 private $revisionGetTimestampFromIdCallback;
59 * @var StatsdDataFactoryInterface
64 * @param LoadBalancer $loadBalancer
65 * @param HashBagOStuff $cache
66 * @param ReadOnlyMode $readOnlyMode
68 public function __construct(
69 LoadBalancer
$loadBalancer,
71 ReadOnlyMode
$readOnlyMode
73 $this->loadBalancer
= $loadBalancer;
74 $this->cache
= $cache;
75 $this->readOnlyMode
= $readOnlyMode;
76 $this->stats
= new NullStatsdDataFactory();
77 $this->deferredUpdatesAddCallableUpdateCallback
= [ 'DeferredUpdates', 'addCallableUpdate' ];
78 $this->revisionGetTimestampFromIdCallback
= [ 'Revision', 'getTimestampFromId' ];
81 public function setStatsdDataFactory( StatsdDataFactoryInterface
$stats ) {
82 $this->stats
= $stats;
86 * Overrides the DeferredUpdates::addCallableUpdate callback
87 * This is intended for use while testing and will fail if MW_PHPUNIT_TEST is not defined.
89 * @param callable $callback
91 * @see DeferredUpdates::addCallableUpdate for callback signiture
93 * @return ScopedCallback to reset the overridden value
96 public function overrideDeferredUpdatesAddCallableUpdateCallback( callable
$callback ) {
97 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
98 throw new MWException(
99 'Cannot override DeferredUpdates::addCallableUpdate callback in operation.'
102 $previousValue = $this->deferredUpdatesAddCallableUpdateCallback
;
103 $this->deferredUpdatesAddCallableUpdateCallback
= $callback;
104 return new ScopedCallback( function () use ( $previousValue ) {
105 $this->deferredUpdatesAddCallableUpdateCallback
= $previousValue;
110 * Overrides the Revision::getTimestampFromId callback
111 * This is intended for use while testing and will fail if MW_PHPUNIT_TEST is not defined.
113 * @param callable $callback
114 * @see Revision::getTimestampFromId for callback signiture
116 * @return ScopedCallback to reset the overridden value
117 * @throws MWException
119 public function overrideRevisionGetTimestampFromIdCallback( callable
$callback ) {
120 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
121 throw new MWException(
122 'Cannot override Revision::getTimestampFromId callback in operation.'
125 $previousValue = $this->revisionGetTimestampFromIdCallback
;
126 $this->revisionGetTimestampFromIdCallback
= $callback;
127 return new ScopedCallback( function () use ( $previousValue ) {
128 $this->revisionGetTimestampFromIdCallback
= $previousValue;
132 private function getCacheKey( User
$user, LinkTarget
$target ) {
133 return $this->cache
->makeKey(
134 (string)$target->getNamespace(),
136 (string)$user->getId()
140 private function cache( WatchedItem
$item ) {
141 $user = $item->getUser();
142 $target = $item->getLinkTarget();
143 $key = $this->getCacheKey( $user, $target );
144 $this->cache
->set( $key, $item );
145 $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()][$user->getId()] = $key;
146 $this->stats
->increment( 'WatchedItemStore.cache' );
149 private function uncache( User
$user, LinkTarget
$target ) {
150 $this->cache
->delete( $this->getCacheKey( $user, $target ) );
151 unset( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()][$user->getId()] );
152 $this->stats
->increment( 'WatchedItemStore.uncache' );
155 private function uncacheLinkTarget( LinkTarget
$target ) {
156 $this->stats
->increment( 'WatchedItemStore.uncacheLinkTarget' );
157 if ( !isset( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()] ) ) {
160 foreach ( $this->cacheIndex
[$target->getNamespace()][$target->getDBkey()] as $key ) {
161 $this->stats
->increment( 'WatchedItemStore.uncacheLinkTarget.items' );
162 $this->cache
->delete( $key );
166 private function uncacheUser( User
$user ) {
167 $this->stats
->increment( 'WatchedItemStore.uncacheUser' );
168 foreach ( $this->cacheIndex
as $ns => $dbKeyArray ) {
169 foreach ( $dbKeyArray as $dbKey => $userArray ) {
170 if ( isset( $userArray[$user->getId()] ) ) {
171 $this->stats
->increment( 'WatchedItemStore.uncacheUser.items' );
172 $this->cache
->delete( $userArray[$user->getId()] );
180 * @param LinkTarget $target
182 * @return WatchedItem|false
184 private function getCached( User
$user, LinkTarget
$target ) {
185 return $this->cache
->get( $this->getCacheKey( $user, $target ) );
189 * Return an array of conditions to select or update the appropriate database
193 * @param LinkTarget $target
197 private function dbCond( User
$user, LinkTarget
$target ) {
199 'wl_user' => $user->getId(),
200 'wl_namespace' => $target->getNamespace(),
201 'wl_title' => $target->getDBkey(),
206 * @param int $dbIndex DB_MASTER or DB_REPLICA
209 * @throws MWException
211 private function getConnectionRef( $dbIndex ) {
212 return $this->loadBalancer
->getConnectionRef( $dbIndex, [ 'watchlist' ] );
218 public function countWatchedItems( User
$user ) {
219 $dbr = $this->getConnectionRef( DB_REPLICA
);
220 $return = (int)$dbr->selectField(
224 'wl_user' => $user->getId()
235 public function countWatchers( LinkTarget
$target ) {
236 $dbr = $this->getConnectionRef( DB_REPLICA
);
237 $return = (int)$dbr->selectField(
241 'wl_namespace' => $target->getNamespace(),
242 'wl_title' => $target->getDBkey(),
253 public function countVisitingWatchers( LinkTarget
$target, $threshold ) {
254 $dbr = $this->getConnectionRef( DB_REPLICA
);
255 $visitingWatchers = (int)$dbr->selectField(
259 'wl_namespace' => $target->getNamespace(),
260 'wl_title' => $target->getDBkey(),
261 'wl_notificationtimestamp >= ' .
262 $dbr->addQuotes( $dbr->timestamp( $threshold ) ) .
263 ' OR wl_notificationtimestamp IS NULL'
268 return $visitingWatchers;
274 public function countWatchersMultiple( array $targets, array $options = [] ) {
275 $dbOptions = [ 'GROUP BY' => [ 'wl_namespace', 'wl_title' ] ];
277 $dbr = $this->getConnectionRef( DB_REPLICA
);
279 if ( array_key_exists( 'minimumWatchers', $options ) ) {
280 $dbOptions['HAVING'] = 'COUNT(*) >= ' . (int)$options['minimumWatchers'];
283 $lb = new LinkBatch( $targets );
286 [ 'wl_title', 'wl_namespace', 'watchers' => 'COUNT(*)' ],
287 [ $lb->constructSet( 'wl', $dbr ) ],
293 foreach ( $targets as $linkTarget ) {
294 $watchCounts[$linkTarget->getNamespace()][$linkTarget->getDBkey()] = 0;
297 foreach ( $res as $row ) {
298 $watchCounts[$row->wl_namespace
][$row->wl_title
] = (int)$row->watchers
;
307 public function countVisitingWatchersMultiple(
308 array $targetsWithVisitThresholds,
309 $minimumWatchers = null
311 $dbr = $this->getConnectionRef( DB_REPLICA
);
313 $conds = $this->getVisitingWatchersCondition( $dbr, $targetsWithVisitThresholds );
315 $dbOptions = [ 'GROUP BY' => [ 'wl_namespace', 'wl_title' ] ];
316 if ( $minimumWatchers !== null ) {
317 $dbOptions['HAVING'] = 'COUNT(*) >= ' . (int)$minimumWatchers;
321 [ 'wl_namespace', 'wl_title', 'watchers' => 'COUNT(*)' ],
328 foreach ( $targetsWithVisitThresholds as list( $target ) ) {
329 /* @var LinkTarget $target */
330 $watcherCounts[$target->getNamespace()][$target->getDBkey()] = 0;
333 foreach ( $res as $row ) {
334 $watcherCounts[$row->wl_namespace
][$row->wl_title
] = (int)$row->watchers
;
337 return $watcherCounts;
341 * Generates condition for the query used in a batch count visiting watchers.
343 * @param IDatabase $db
344 * @param array $targetsWithVisitThresholds array of pairs (LinkTarget, last visit threshold)
347 private function getVisitingWatchersCondition(
349 array $targetsWithVisitThresholds
351 $missingTargets = [];
352 $namespaceConds = [];
353 foreach ( $targetsWithVisitThresholds as list( $target, $threshold ) ) {
354 if ( $threshold === null ) {
355 $missingTargets[] = $target;
358 /* @var LinkTarget $target */
359 $namespaceConds[$target->getNamespace()][] = $db->makeList( [
360 'wl_title = ' . $db->addQuotes( $target->getDBkey() ),
362 'wl_notificationtimestamp >= ' . $db->addQuotes( $db->timestamp( $threshold ) ),
363 'wl_notificationtimestamp IS NULL'
369 foreach ( $namespaceConds as $namespace => $pageConds ) {
370 $conds[] = $db->makeList( [
371 'wl_namespace = ' . $namespace,
372 '(' . $db->makeList( $pageConds, LIST_OR
) . ')'
376 if ( $missingTargets ) {
377 $lb = new LinkBatch( $missingTargets );
378 $conds[] = $lb->constructSet( 'wl', $db );
381 return $db->makeList( $conds, LIST_OR
);
387 public function getWatchedItem( User
$user, LinkTarget
$target ) {
388 if ( $user->isAnon() ) {
392 $cached = $this->getCached( $user, $target );
394 $this->stats
->increment( 'WatchedItemStore.getWatchedItem.cached' );
397 $this->stats
->increment( 'WatchedItemStore.getWatchedItem.load' );
398 return $this->loadWatchedItem( $user, $target );
404 public function loadWatchedItem( User
$user, LinkTarget
$target ) {
405 // Only loggedin user can have a watchlist
406 if ( $user->isAnon() ) {
410 $dbr = $this->getConnectionRef( DB_REPLICA
);
411 $row = $dbr->selectRow(
413 'wl_notificationtimestamp',
414 $this->dbCond( $user, $target ),
422 $item = new WatchedItem(
425 wfTimestampOrNull( TS_MW
, $row->wl_notificationtimestamp
)
427 $this->cache( $item );
435 public function getWatchedItemsForUser( User
$user, array $options = [] ) {
436 $options +
= [ 'forWrite' => false ];
439 if ( array_key_exists( 'sort', $options ) ) {
441 ( in_array( $options['sort'], [ self
::SORT_ASC
, self
::SORT_DESC
] ) ),
442 '$options[\'sort\']',
443 'must be SORT_ASC or SORT_DESC'
445 $dbOptions['ORDER BY'] = [
446 "wl_namespace {$options['sort']}",
447 "wl_title {$options['sort']}"
450 $db = $this->getConnectionRef( $options['forWrite'] ? DB_MASTER
: DB_REPLICA
);
454 [ 'wl_namespace', 'wl_title', 'wl_notificationtimestamp' ],
455 [ 'wl_user' => $user->getId() ],
461 foreach ( $res as $row ) {
462 // @todo: Should we add these to the process cache?
463 $watchedItems[] = new WatchedItem(
465 new TitleValue( (int)$row->wl_namespace
, $row->wl_title
),
466 $row->wl_notificationtimestamp
470 return $watchedItems;
476 public function isWatched( User
$user, LinkTarget
$target ) {
477 return (bool)$this->getWatchedItem( $user, $target );
483 public function getNotificationTimestampsBatch( User
$user, array $targets ) {
485 foreach ( $targets as $target ) {
486 $timestamps[$target->getNamespace()][$target->getDBkey()] = false;
489 if ( $user->isAnon() ) {
494 foreach ( $targets as $target ) {
495 $cachedItem = $this->getCached( $user, $target );
497 $timestamps[$target->getNamespace()][$target->getDBkey()] =
498 $cachedItem->getNotificationTimestamp();
500 $targetsToLoad[] = $target;
504 if ( !$targetsToLoad ) {
508 $dbr = $this->getConnectionRef( DB_REPLICA
);
510 $lb = new LinkBatch( $targetsToLoad );
513 [ 'wl_namespace', 'wl_title', 'wl_notificationtimestamp' ],
515 $lb->constructSet( 'wl', $dbr ),
516 'wl_user' => $user->getId(),
521 foreach ( $res as $row ) {
522 $timestamps[$row->wl_namespace
][$row->wl_title
] =
523 wfTimestampOrNull( TS_MW
, $row->wl_notificationtimestamp
);
532 public function addWatch( User
$user, LinkTarget
$target ) {
533 $this->addWatchBatchForUser( $user, [ $target ] );
539 public function addWatchBatchForUser( User
$user, array $targets ) {
540 if ( $this->readOnlyMode
->isReadOnly() ) {
543 // Only loggedin user can have a watchlist
544 if ( $user->isAnon() ) {
554 foreach ( $targets as $target ) {
556 'wl_user' => $user->getId(),
557 'wl_namespace' => $target->getNamespace(),
558 'wl_title' => $target->getDBkey(),
559 'wl_notificationtimestamp' => null,
561 $items[] = new WatchedItem(
566 $this->uncache( $user, $target );
569 $dbw = $this->getConnectionRef( DB_MASTER
);
570 foreach ( array_chunk( $rows, 100 ) as $toInsert ) {
571 // Use INSERT IGNORE to avoid overwriting the notification timestamp
572 // if there's already an entry for this page
573 $dbw->insert( 'watchlist', $toInsert, __METHOD__
, 'IGNORE' );
575 // Update process cache to ensure skin doesn't claim that the current
576 // page is unwatched in the response of action=watch itself (T28292).
577 // This would otherwise be re-queried from a slave by isWatched().
578 foreach ( $items as $item ) {
579 $this->cache( $item );
588 public function removeWatch( User
$user, LinkTarget
$target ) {
589 // Only logged in user can have a watchlist
590 if ( $this->readOnlyMode
->isReadOnly() ||
$user->isAnon() ) {
594 $this->uncache( $user, $target );
596 $dbw = $this->getConnectionRef( DB_MASTER
);
597 $dbw->delete( 'watchlist',
599 'wl_user' => $user->getId(),
600 'wl_namespace' => $target->getNamespace(),
601 'wl_title' => $target->getDBkey(),
604 $success = (bool)$dbw->affectedRows();
612 public function setNotificationTimestampsForUser( User
$user, $timestamp, array $targets = [] ) {
613 // Only loggedin user can have a watchlist
614 if ( $user->isAnon() ) {
618 $dbw = $this->getConnectionRef( DB_MASTER
);
620 $conds = [ 'wl_user' => $user->getId() ];
622 $batch = new LinkBatch( $targets );
623 $conds[] = $batch->constructSet( 'wl', $dbw );
626 if ( $timestamp !== null ) {
627 $timestamp = $dbw->timestamp( $timestamp );
630 $success = $dbw->update(
632 [ 'wl_notificationtimestamp' => $timestamp ],
637 $this->uncacheUser( $user );
645 public function updateNotificationTimestamp( User
$editor, LinkTarget
$target, $timestamp ) {
646 $dbw = $this->getConnectionRef( DB_MASTER
);
647 $uids = $dbw->selectFieldValues(
651 'wl_user != ' . intval( $editor->getId() ),
652 'wl_namespace' => $target->getNamespace(),
653 'wl_title' => $target->getDBkey(),
654 'wl_notificationtimestamp IS NULL',
659 $watchers = array_map( 'intval', $uids );
661 // Update wl_notificationtimestamp for all watching users except the editor
663 DeferredUpdates
::addCallableUpdate(
664 function () use ( $timestamp, $watchers, $target, $fname ) {
665 global $wgUpdateRowsPerQuery;
667 $dbw = $this->getConnectionRef( DB_MASTER
);
668 $factory = MediaWikiServices
::getInstance()->getDBLoadBalancerFactory();
669 $ticket = $factory->getEmptyTransactionTicket( __METHOD__
);
671 $watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery );
672 foreach ( $watchersChunks as $watchersChunk ) {
673 $dbw->update( 'watchlist',
675 'wl_notificationtimestamp' => $dbw->timestamp( $timestamp )
676 ], [ /* WHERE - TODO Use wl_id T130067 */
677 'wl_user' => $watchersChunk,
678 'wl_namespace' => $target->getNamespace(),
679 'wl_title' => $target->getDBkey(),
682 if ( count( $watchersChunks ) > 1 ) {
683 $factory->commitAndWaitForReplication(
684 __METHOD__
, $ticket, [ 'domain' => $dbw->getDomainID() ]
688 $this->uncacheLinkTarget( $target );
690 DeferredUpdates
::POSTSEND
,
701 public function resetNotificationTimestamp( User
$user, Title
$title, $force = '', $oldid = 0 ) {
702 // Only loggedin user can have a watchlist
703 if ( $this->readOnlyMode
->isReadOnly() ||
$user->isAnon() ) {
708 if ( $force != 'force' ) {
709 $item = $this->loadWatchedItem( $user, $title );
710 if ( !$item ||
$item->getNotificationTimestamp() === null ) {
715 // If the page is watched by the user (or may be watched), update the timestamp
716 $job = new ActivityUpdateJob(
719 'type' => 'updateWatchlistNotification',
720 'userid' => $user->getId(),
721 'notifTime' => $this->getNotificationTimestamp( $user, $title, $item, $force, $oldid ),
726 // Try to run this post-send
727 // Calls DeferredUpdates::addCallableUpdate in normal operation
729 $this->deferredUpdatesAddCallableUpdateCallback
,
730 function () use ( $job ) {
735 $this->uncache( $user, $title );
740 private function getNotificationTimestamp( User
$user, Title
$title, $item, $force, $oldid ) {
742 // No oldid given, assuming latest revision; clear the timestamp.
746 if ( !$title->getNextRevisionID( $oldid ) ) {
747 // Oldid given and is the latest revision for this title; clear the timestamp.
751 if ( $item === null ) {
752 $item = $this->loadWatchedItem( $user, $title );
756 // This can only happen if $force is enabled.
760 // Oldid given and isn't the latest; update the timestamp.
761 // This will result in no further notification emails being sent!
762 // Calls Revision::getTimestampFromId in normal operation
763 $notificationTimestamp = call_user_func(
764 $this->revisionGetTimestampFromIdCallback
,
769 // We need to go one second to the future because of various strict comparisons
770 // throughout the codebase
771 $ts = new MWTimestamp( $notificationTimestamp );
772 $ts->timestamp
->add( new DateInterval( 'PT1S' ) );
773 $notificationTimestamp = $ts->getTimestamp( TS_MW
);
775 if ( $notificationTimestamp < $item->getNotificationTimestamp() ) {
776 if ( $force != 'force' ) {
779 // This is a little silly…
780 return $item->getNotificationTimestamp();
784 return $notificationTimestamp;
790 public function countUnreadNotifications( User
$user, $unreadLimit = null ) {
792 if ( $unreadLimit !== null ) {
793 $unreadLimit = (int)$unreadLimit;
794 $queryOptions['LIMIT'] = $unreadLimit;
797 $dbr = $this->getConnectionRef( DB_REPLICA
);
798 $rowCount = $dbr->selectRowCount(
802 'wl_user' => $user->getId(),
803 'wl_notificationtimestamp IS NOT NULL',
809 if ( !isset( $unreadLimit ) ) {
813 if ( $rowCount >= $unreadLimit ) {
823 public function duplicateAllAssociatedEntries( LinkTarget
$oldTarget, LinkTarget
$newTarget ) {
824 $oldTarget = Title
::newFromLinkTarget( $oldTarget );
825 $newTarget = Title
::newFromLinkTarget( $newTarget );
827 $this->duplicateEntry( $oldTarget->getSubjectPage(), $newTarget->getSubjectPage() );
828 $this->duplicateEntry( $oldTarget->getTalkPage(), $newTarget->getTalkPage() );
834 public function duplicateEntry( LinkTarget
$oldTarget, LinkTarget
$newTarget ) {
835 $dbw = $this->getConnectionRef( DB_MASTER
);
837 $result = $dbw->select(
839 [ 'wl_user', 'wl_notificationtimestamp' ],
841 'wl_namespace' => $oldTarget->getNamespace(),
842 'wl_title' => $oldTarget->getDBkey(),
848 $newNamespace = $newTarget->getNamespace();
849 $newDBkey = $newTarget->getDBkey();
851 # Construct array to replace into the watchlist
853 foreach ( $result as $row ) {
855 'wl_user' => $row->wl_user
,
856 'wl_namespace' => $newNamespace,
857 'wl_title' => $newDBkey,
858 'wl_notificationtimestamp' => $row->wl_notificationtimestamp
,
862 if ( !empty( $values ) ) {
864 # Note that multi-row replace is very efficient for MySQL but may be inefficient for
865 # some other DBMSes, mostly due to poor simulation by us
868 [ [ 'wl_user', 'wl_namespace', 'wl_title' ] ],