*/
private $loadBalancer;
+ /**
+ * @var JobQueueGroup
+ */
+ private $queueGroup;
+
+ /**
+ * @var BagOStuff
+ */
+ private $stash;
+
/**
* @var ReadOnlyMode
*/
*/
private $cache;
+ /**
+ * @var HashBagOStuff
+ */
+ private $latestUpdateCache;
+
/**
* @var array[] Looks like $cacheIndex[Namespace ID][Target DB Key][User Id] => 'key'
* The index is needed so that on mass changes all relevant items can be un-cached.
/**
* @param ILBFactory $lbFactory
+ * @param JobQueueGroup $queueGroup
+ * @param BagOStuff $stash
* @param HashBagOStuff $cache
* @param ReadOnlyMode $readOnlyMode
* @param int $updateRowsPerQuery
*/
public function __construct(
ILBFactory $lbFactory,
+ JobQueueGroup $queueGroup,
+ BagOStuff $stash,
HashBagOStuff $cache,
ReadOnlyMode $readOnlyMode,
$updateRowsPerQuery
) {
$this->lbFactory = $lbFactory;
$this->loadBalancer = $lbFactory->getMainLB();
+ $this->queueGroup = $queueGroup;
+ $this->stash = $stash;
$this->cache = $cache;
$this->readOnlyMode = $readOnlyMode;
$this->stats = new NullStatsdDataFactory();
$this->revisionGetTimestampFromIdCallback =
[ Revision::class, 'getTimestampFromId' ];
$this->updateRowsPerQuery = $updateRowsPerQuery;
+
+ $this->latestUpdateCache = new HashBagOStuff( [ 'maxKeys' => 3 ] );
}
/**
}
}
}
+
+ $pageSeenKey = $this->getPageSeenTimestampsKey( $user );
+ $this->latestUpdateCache->delete( $pageSeenKey );
+ $this->stash->delete( $pageSeenKey );
}
/**
*/
public function clearUserWatchedItemsUsingJobQueue( User $user ) {
$job = ClearUserWatchlistJob::newForUser( $user, $this->getMaxId() );
- // TODO inject me.
- JobQueueGroup::singleton()->push( $job );
+ $this->queueGroup->push( $job );
}
/**
}
$dbr = $this->getConnectionRef( DB_REPLICA );
+
$row = $dbr->selectRow(
'watchlist',
'wl_notificationtimestamp',
$item = new WatchedItem(
$user,
$target,
- wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp )
+ $this->getLatestNotificationTimestamp( $row->wl_notificationtimestamp, $user, $target )
);
$this->cache( $item );
$watchedItems = [];
foreach ( $res as $row ) {
+ $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title );
// @todo: Should we add these to the process cache?
$watchedItems[] = new WatchedItem(
$user,
new TitleValue( (int)$row->wl_namespace, $row->wl_title ),
- $row->wl_notificationtimestamp
+ $this->getLatestNotificationTimestamp(
+ $row->wl_notificationtimestamp, $user, $target )
);
}
);
foreach ( $res as $row ) {
+ $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title );
$timestamps[$row->wl_namespace][$row->wl_title] =
- wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp );
+ $this->getLatestNotificationTimestamp(
+ $row->wl_notificationtimestamp, $user, $target );
}
return $timestamps;
}
/**
+ * Set the "last viewed" timestamps for certain titles on a user's watchlist.
+ *
+ * If the $targets parameter is omitted or set to [], this method simply wraps
+ * resetAllNotificationTimestampsForUser(), and in that case you should instead call that method
+ * directly; support for omitting $targets is for backwards compatibility.
+ *
+ * If $targets is omitted or set to [], timestamps will be updated for every title on the user's
+ * watchlist, and this will be done through a DeferredUpdate. If $targets is a non-empty array,
+ * only the specified titles will be updated, and this will be done immediately (not deferred).
+ *
* @since 1.27
* @param User $user
- * @param string|int $timestamp
- * @param LinkTarget[] $targets
+ * @param string|int $timestamp Value to set the "last viewed" timestamp to (null to clear)
+ * @param LinkTarget[] $targets Titles to set the timestamp for; [] means the entire watchlist
* @return bool
*/
public function setNotificationTimestampsForUser( User $user, $timestamp, array $targets = [] ) {
// Only loggedin user can have a watchlist
- if ( $user->isAnon() ) {
+ if ( $user->isAnon() || $this->readOnlyMode->isReadOnly() ) {
return false;
}
- $dbw = $this->getConnectionRef( DB_MASTER );
-
- $conds = [ 'wl_user' => $user->getId() ];
- if ( $targets ) {
- $batch = new LinkBatch( $targets );
- $conds[] = $batch->constructSet( 'wl', $dbw );
+ if ( !$targets ) {
+ // Backwards compatibility
+ $this->resetAllNotificationTimestampsForUser( $user, $timestamp );
+ return true;
}
+ $rows = $this->getTitleDbKeysGroupedByNamespace( $targets );
+
+ $dbw = $this->getConnectionRef( DB_MASTER );
if ( $timestamp !== null ) {
$timestamp = $dbw->timestamp( $timestamp );
}
+ $ticket = $this->lbFactory->getEmptyTransactionTicket( __METHOD__ );
+ $affectedSinceWait = 0;
- $success = $dbw->update(
- 'watchlist',
- [ 'wl_notificationtimestamp' => $timestamp ],
- $conds,
- __METHOD__
- );
+ // Batch update items per namespace
+ foreach ( $rows as $namespace => $namespaceTitles ) {
+ $rowBatches = array_chunk( $namespaceTitles, $this->updateRowsPerQuery );
+ foreach ( $rowBatches as $toUpdate ) {
+ $dbw->update(
+ 'watchlist',
+ [ 'wl_notificationtimestamp' => $timestamp ],
+ [
+ 'wl_user' => $user->getId(),
+ 'wl_namespace' => $namespace,
+ 'wl_title' => $toUpdate
+ ]
+ );
+ $affectedSinceWait += $dbw->affectedRows();
+ // Wait for replication every time we've touched updateRowsPerQuery rows
+ if ( $affectedSinceWait >= $this->updateRowsPerQuery ) {
+ $this->lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
+ $affectedSinceWait = 0;
+ }
+ }
+ }
$this->uncacheUser( $user );
- return $success;
+ return true;
+ }
+
+ public function getLatestNotificationTimestamp( $timestamp, User $user, LinkTarget $target ) {
+ $timestamp = wfTimestampOrNull( TS_MW, $timestamp );
+ if ( $timestamp === null ) {
+ return null; // no notification
+ }
+
+ $seenTimestamps = $this->getPageSeenTimestamps( $user );
+ if (
+ $seenTimestamps &&
+ $seenTimestamps->get( $this->getPageSeenKey( $target ) ) >= $timestamp
+ ) {
+ // If a reset job did not yet run, then the "seen" timestamp will be higher
+ return null;
+ }
+
+ return $timestamp;
}
- public function resetAllNotificationTimestampsForUser( User $user ) {
+ /**
+ * Schedule a DeferredUpdate that sets all of the "last viewed" timestamps for a given user
+ * to the same value.
+ * @param User $user
+ * @param string|int|null $timestamp Value to set all timestamps to, null to clear them
+ */
+ public function resetAllNotificationTimestampsForUser( User $user, $timestamp = null ) {
// Only loggedin user can have a watchlist
if ( $user->isAnon() ) {
return;
// If the page is watched by the user (or may be watched), update the timestamp
$job = new ClearWatchlistNotificationsJob(
$user->getUserPage(),
- [ 'userId' => $user->getId(), 'casTime' => time() ]
+ [ 'userId' => $user->getId(), 'timestamp' => $timestamp, 'casTime' => time() ]
);
// Try to run this post-send
* @return bool
*/
public function resetNotificationTimestamp( User $user, Title $title, $force = '', $oldid = 0 ) {
+ $time = time();
+
// Only loggedin user can have a watchlist
if ( $this->readOnlyMode->isReadOnly() || $user->isAnon() ) {
return false;
}
- if ( ! Hooks::run( 'BeforeResetNotificationTimestamp', [ &$user, &$title, $force, &$oldid ] ) ) {
+ if ( !Hooks::run( 'BeforeResetNotificationTimestamp', [ &$user, &$title, $force, &$oldid ] ) ) {
return false;
}
}
}
+ // Get the timestamp (TS_MW) of this revision to track the latest one seen
+ $seenTime = call_user_func(
+ $this->revisionGetTimestampFromIdCallback,
+ $title,
+ $oldid ?: $title->getLatestRevID()
+ );
+
+ // Mark the item as read immediately in lightweight storage
+ $this->stash->merge(
+ $this->getPageSeenTimestampsKey( $user ),
+ function ( $cache, $key, $current ) use ( $title, $seenTime ) {
+ $value = $current ?: new MapCacheLRU( 300 );
+ $subKey = $this->getPageSeenKey( $title );
+
+ if ( $seenTime > $value->get( $subKey ) ) {
+ // Revision is newer than the last one seen
+ $value->set( $subKey, $seenTime );
+ $this->latestUpdateCache->set( $key, $value, IExpiringStore::TTL_PROC_LONG );
+ } elseif ( $seenTime === false ) {
+ // Revision does not exist
+ $value->set( $subKey, wfTimestamp( TS_MW ) );
+ $this->latestUpdateCache->set( $key, $value, IExpiringStore::TTL_PROC_LONG );
+ } else {
+ return false; // nothing to update
+ }
+
+ return $value;
+ },
+ IExpiringStore::TTL_HOUR
+ );
+
// If the page is watched by the user (or may be watched), update the timestamp
$job = new ActivityUpdateJob(
$title,
'type' => 'updateWatchlistNotification',
'userid' => $user->getId(),
'notifTime' => $this->getNotificationTimestamp( $user, $title, $item, $force, $oldid ),
- 'curTime' => time()
+ 'curTime' => $time
]
);
+ // Try to enqueue this post-send
+ $this->queueGroup->lazyPush( $job );
- // Try to run this post-send
- // Calls DeferredUpdates::addCallableUpdate in normal operation
- call_user_func(
- $this->deferredUpdatesAddCallableUpdateCallback,
- function () use ( $job ) {
- $job->run();
+ $this->uncache( $user, $title );
+
+ return true;
+ }
+
+ /**
+ * @param User $user
+ * @return MapCacheLRU|null The map contains prefixed title keys and TS_MW values
+ */
+ private function getPageSeenTimestamps( User $user ) {
+ $key = $this->getPageSeenTimestampsKey( $user );
+
+ return $this->latestUpdateCache->getWithSetCallback(
+ $key,
+ IExpiringStore::TTL_PROC_LONG,
+ function () use ( $key ) {
+ return $this->stash->get( $key ) ?: null;
}
);
+ }
- $this->uncache( $user, $title );
+ /**
+ * @param User $user
+ * @return string
+ */
+ private function getPageSeenTimestampsKey( User $user ) {
+ return $this->stash->makeGlobalKey(
+ 'watchlist-recent-updates',
+ $this->lbFactory->getLocalDomainID(),
+ $user->getId()
+ );
+ }
- return true;
+ /**
+ * @param LinkTarget $target
+ * @return string
+ */
+ private function getPageSeenKey( LinkTarget $target ) {
+ return "{$target->getNamespace()}:{$target->getDBkey()}";
}
private function getNotificationTimestamp( User $user, Title $title, $item, $force, $oldid ) {
* @return int|bool
*/
public function countUnreadNotifications( User $user, $unreadLimit = null ) {
+ $dbr = $this->getConnectionRef( DB_REPLICA );
+
$queryOptions = [];
if ( $unreadLimit !== null ) {
$unreadLimit = (int)$unreadLimit;
$queryOptions['LIMIT'] = $unreadLimit;
}
- $dbr = $this->getConnectionRef( DB_REPLICA );
- $rowCount = $dbr->selectRowCount(
- 'watchlist',
- '1',
- [
- 'wl_user' => $user->getId(),
- 'wl_notificationtimestamp IS NOT NULL',
- ],
- __METHOD__,
- $queryOptions
- );
+ $conds = [
+ 'wl_user' => $user->getId(),
+ 'wl_notificationtimestamp IS NOT NULL'
+ ];
- if ( !isset( $unreadLimit ) ) {
+ $rowCount = $dbr->selectRowCount( 'watchlist', '1', $conds, __METHOD__, $queryOptions );
+
+ if ( $unreadLimit === null ) {
return $rowCount;
}
}
/**
- * @param TitleValue[] $titles
+ * @param LinkTarget[] $titles
* @return array
*/
private function getTitleDbKeysGroupedByNamespace( array $titles ) {