From d83419782177813f65400f68a15ebdad9df3eaa5 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 8 Apr 2015 13:41:10 -0700 Subject: [PATCH] Moved ActiveUsers updates to recent changes jobs * This avoids writes on view and is more reliable * Also made the wfWaitForSlaves() there actually work Bug: T95501 Bug: T92357 Bug: T89027 Change-Id: I0a006fc92a9268feb185c9d88aa04002ea51ecd3 --- includes/DefaultSettings.php | 3 +- includes/api/ApiQueryAllUsers.php | 5 - .../jobqueue/jobs/RecentChangesUpdateJob.php | 132 ++++++++++++- includes/page/WikiPage.php | 8 +- includes/specials/SpecialActiveusers.php | 176 ++---------------- 5 files changed, 150 insertions(+), 174 deletions(-) diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index b08fe4d65a..3cfeb8c2fe 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -6474,8 +6474,7 @@ $wgJobQueueAggregator = array( * Expensive Querypages are already updated. */ $wgSpecialPageCacheUpdates = array( - 'Statistics' => array( 'SiteStatsUpdate', 'cacheUpdate' ), - 'Activeusers' => array( 'SpecialActiveUsers', 'cacheUpdate' ), + 'Statistics' => array( 'SiteStatsUpdate', 'cacheUpdate' ) ); /** diff --git a/includes/api/ApiQueryAllUsers.php b/includes/api/ApiQueryAllUsers.php index d7354e2660..516885917f 100644 --- a/includes/api/ApiQueryAllUsers.php +++ b/includes/api/ApiQueryAllUsers.php @@ -48,11 +48,6 @@ class ApiQueryAllUsers extends ApiQueryBase { $params = $this->extractRequestParams(); $activeUserDays = $this->getConfig()->get( 'ActiveUserDays' ); - if ( $params['activeusers'] ) { - // Update active user cache - SpecialActiveUsers::mergeActiveUsers( 300, $activeUserDays ); - } - $db = $this->getDB(); $prop = $params['prop']; diff --git a/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/includes/jobqueue/jobs/RecentChangesUpdateJob.php index b550f50b2c..cc04595d75 100644 --- a/includes/jobqueue/jobs/RecentChangesUpdateJob.php +++ b/includes/jobqueue/jobs/RecentChangesUpdateJob.php @@ -46,11 +46,24 @@ class RecentChangesUpdateJob extends Job { ); } + /** + * @return RecentChangesUpdateJob + * @since 1.26 + */ + final public static function newCacheUpdateJob() { + return new self( + SpecialPage::getTitleFor( 'Recentchanges' ), array( 'type' => 'cacheUpdate' ) + ); + } + public function run() { if ( $this->params['type'] === 'purge' ) { $this->purgeExpiredRows(); + } elseif ( $this->params['type'] === 'cacheUpdate' ) { + $this->updateActiveUsers(); } else { - throw new Exception( "Invalid 'type' parameter '{$this->params['type']}'." ); + throw new InvalidArgumentException( + "Invalid 'type' parameter '{$this->params['type']}'." ); } return true; @@ -78,7 +91,7 @@ class RecentChangesUpdateJob extends Job { if ( $rcIds ) { $dbw->delete( 'recentchanges', array( 'rc_id' => $rcIds ), __METHOD__ ); } - // No need for this to be in a transaction. + // Commit in chunks to avoid slave lag $dbw->commit( __METHOD__, 'flush' ); if ( count( $rcIds ) === $batchSize ) { @@ -92,4 +105,119 @@ class RecentChangesUpdateJob extends Job { $dbw->unlock( $lockKey, __METHOD__ ); } + + protected function updateActiveUsers() { + global $wgActiveUserDays; + + // Users that made edits at least this many days ago are "active" + $days = $wgActiveUserDays; + // Pull in the full window of active users in this update + $window = $wgActiveUserDays * 86400; + + $dbw = wfGetDB( DB_MASTER ); + // JobRunner uses DBO_TRX, but doesn't call begin/commit itself; + // onTransactionIdle() will run immediately since there is no trx. + $dbw->onTransactionIdle( function() use ( $dbw, $days, $window ) { + // Avoid disconnect/ping() cycle that makes locks fall off + $dbw->setSessionOptions( array( 'connTimeout' => 900 ) ); + + $lockKey = wfWikiID() . '-activeusers'; + if ( !$dbw->lock( $lockKey, __METHOD__, 1 ) ) { + return false; // exclusive update (avoids duplicate entries) + } + + $nowUnix = time(); + // Get the last-updated timestamp for the cache + $cTime = $dbw->selectField( 'querycache_info', + 'qci_timestamp', + array( 'qci_type' => 'activeusers' ) + ); + $cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1; + + // Pick the date range to fetch from. This is normally from the last + // update to till the present time, but has a limited window for sanity. + // If the window is limited, multiple runs are need to fully populate it. + $sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 ); + $eTimestamp = min( $sTimestamp + $window, $nowUnix ); + + // Get all the users active since the last update + $res = $dbw->select( + array( 'recentchanges' ), + array( 'rc_user_text', 'lastedittime' => 'MAX(rc_timestamp)' ), + array( + 'rc_user > 0', // actual accounts + 'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata + 'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ), + 'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ), + 'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) ) + ), + __METHOD__, + array( + 'GROUP BY' => array( 'rc_user_text' ), + 'ORDER BY' => 'NULL' // avoid filesort + ) + ); + $names = array(); + foreach ( $res as $row ) { + $names[$row->rc_user_text] = $row->lastedittime; + } + + // Rotate out users that have not edited in too long (according to old data set) + $dbw->delete( 'querycachetwo', + array( + 'qcc_type' => 'activeusers', + 'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX + ), + __METHOD__ + ); + + // Find which of the recently active users are already accounted for + if ( count( $names ) ) { + $res = $dbw->select( 'querycachetwo', + array( 'user_name' => 'qcc_title' ), + array( + 'qcc_type' => 'activeusers', + 'qcc_namespace' => NS_USER, + 'qcc_title' => array_keys( $names ) ), + __METHOD__ + ); + foreach ( $res as $row ) { + unset( $names[$row->user_name] ); + } + } + + // Insert the users that need to be added to the list + if ( count( $names ) ) { + $newRows = array(); + foreach ( $names as $name => $lastEditTime ) { + $newRows[] = array( + 'qcc_type' => 'activeusers', + 'qcc_namespace' => NS_USER, + 'qcc_title' => $name, + 'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ), + 'qcc_namespacetwo' => 0, // unused + 'qcc_titletwo' => '' // unused + ); + } + foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) { + $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ ); + wfWaitForSlaves(); + } + } + + // If a transaction was already started, it might have an old + // snapshot, so kludge the timestamp range back as needed. + $asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() ); + + // Touch the data freshness timestamp + $dbw->replace( 'querycache_info', + array( 'qci_type' ), + array( 'qci_type' => 'activeusers', + 'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ), // not always $now + __METHOD__ + ); + + $dbw->unlock( $lockKey, __METHOD__ ); + } ); + } } diff --git a/includes/page/WikiPage.php b/includes/page/WikiPage.php index 059a3f8b20..752565da15 100644 --- a/includes/page/WikiPage.php +++ b/includes/page/WikiPage.php @@ -2205,8 +2205,12 @@ class WikiPage implements Page, IDBAccessObject { Hooks::run( 'ArticleEditUpdates', array( &$this, &$editInfo, $options['changed'] ) ); if ( Hooks::run( 'ArticleEditUpdatesDeleteFromRecentchanges', array( &$this ) ) ) { - // Flush old entries from the `recentchanges` table - JobQueueGroup::singleton()->push( RecentChangesUpdateJob::newPurgeJob() ); + JobQueueGroup::singleton()->push( array( + // Flush old entries from the `recentchanges` table + RecentChangesUpdateJob::newPurgeJob(), + // Update the cached list of active users + RecentChangesUpdateJob::newCacheUpdateJob() + ) ); } if ( !$this->exists() ) { diff --git a/includes/specials/SpecialActiveusers.php b/includes/specials/SpecialActiveusers.php index 2c00175061..5e2ee1c2d7 100644 --- a/includes/specials/SpecialActiveusers.php +++ b/includes/specials/SpecialActiveusers.php @@ -267,12 +267,21 @@ class SpecialActiveUsers extends SpecialPage { $out->wrapWikiMsg( "
\n$1\n
", array( 'activeusers-intro', $this->getLanguage()->formatNum( $days ) ) ); - // Occasionally merge in new updates - $seconds = min( self::mergeActiveUsers( 300, $days ), $days * 86400 ); - if ( $seconds > 0 ) { + // Get the timestamp of the last cache update + $dbr = wfGetDB( DB_SLAVE, 'recentchanges' ); + $cTime = $dbr->selectField( 'querycache_info', + 'qci_timestamp', + array( 'qci_type' => 'activeusers' ) + ); + + $secondsOld = $cTime + ? time() - wfTimestamp( TS_UNIX, $cTime ) + : $days * 86400; // fully stale :) + + if ( $secondsOld > 0 ) { // Mention the level of staleness $out->addWikiMsg( 'cachedspecial-viewing-cached-ttl', - $this->getLanguage()->formatDuration( $seconds ) ); + $this->getLanguage()->formatDuration( $secondsOld ) ); } $up = new ActiveUsersPager( $this->getContext(), null, $par ); @@ -295,163 +304,4 @@ class SpecialActiveUsers extends SpecialPage { protected function getGroupName() { return 'users'; } - - /** - * @param int $period Seconds (do updates no more often than this) - * @param int $days How many days user must be idle before he is considered inactive - * @return int How many seconds old the cache is - */ - public static function mergeActiveUsers( $period, $days ) { - $dbr = wfGetDB( DB_SLAVE, 'recentchanges' ); - $cTime = $dbr->selectField( 'querycache_info', - 'qci_timestamp', - array( 'qci_type' => 'activeusers' ) - ); - - if ( !wfReadOnly() ) { - if ( !$cTime || ( time() - wfTimestamp( TS_UNIX, $cTime ) ) > $period ) { - $dbw = wfGetDB( DB_MASTER ); - $cond = $cTime - ? array( 'rc_timestamp > ' . $dbw->addQuotes( $cTime ) ) - : array(); - if ( $dbw->estimateRowCount( 'recentchanges', '*', $cond ) <= 10000 ) { - $window = $days * 86400; // small wiki - } else { - $window = $period * 2; - } - $cTime = self::doQueryCacheUpdate( $dbw, $days, $window ) ?: $cTime; - } - } - - return ( time() - - ( $cTime ? wfTimestamp( TS_UNIX, $cTime ) : $days * 86400 ) ); - } - - /** - * @param IDatabase $dbw Passed in from updateSpecialPages.php - * @return void - */ - public static function cacheUpdate( IDatabase $dbw ) { - global $wgActiveUserDays; - - self::doQueryCacheUpdate( $dbw, $wgActiveUserDays, $wgActiveUserDays * 86400 ); - } - - /** - * Update the query cache as needed - * - * @param IDatabase $dbw - * @param int $days How many days user must be idle before he is considered inactive - * @param int $window Maximum time range of new data to scan (in seconds) - * @return int|bool UNIX timestamp the cache is now up-to-date as of (false on error) - */ - protected static function doQueryCacheUpdate( IDatabase $dbw, $days, $window ) { - $dbw->startAtomic( __METHOD__ ); - - $lockKey = wfWikiID() . '-activeusers'; - if ( !$dbw->lock( $lockKey, __METHOD__, 1 ) ) { - return false; // exclusive update (avoids duplicate entries) - } - - $nowUnix = time(); - // Get the last-updated timestamp for the cache - $cTime = $dbw->selectField( 'querycache_info', - 'qci_timestamp', - array( 'qci_type' => 'activeusers' ) - ); - $cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1; - - // Pick the date range to fetch from. This is normally from the last - // update to till the present time, but has a limited window for sanity. - // If the window is limited, multiple runs are need to fully populate it. - $sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 ); - $eTimestamp = min( $sTimestamp + $window, $nowUnix ); - - // Get all the users active since the last update - $res = $dbw->select( - array( 'recentchanges' ), - array( 'rc_user_text', 'lastedittime' => 'MAX(rc_timestamp)' ), - array( - 'rc_user > 0', // actual accounts - 'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata - 'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ), - 'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ), - 'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) ) - ), - __METHOD__, - array( - 'GROUP BY' => array( 'rc_user_text' ), - 'ORDER BY' => 'NULL' // avoid filesort - ) - ); - $names = array(); - foreach ( $res as $row ) { - $names[$row->rc_user_text] = $row->lastedittime; - } - - // Rotate out users that have not edited in too long (according to old data set) - $dbw->delete( 'querycachetwo', - array( - 'qcc_type' => 'activeusers', - 'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX - ), - __METHOD__ - ); - - // Find which of the recently active users are already accounted for - if ( count( $names ) ) { - $res = $dbw->select( 'querycachetwo', - array( 'user_name' => 'qcc_title' ), - array( - 'qcc_type' => 'activeusers', - 'qcc_namespace' => NS_USER, - 'qcc_title' => array_keys( $names ) ), - __METHOD__, - // See the latest data (ignoring trx snapshot) to avoid - // duplicates if this method was called in a transaction - array( 'LOCK IN SHARE MODE' ) - ); - foreach ( $res as $row ) { - unset( $names[$row->user_name] ); - } - } - - // Insert the users that need to be added to the list (which their last edit time - if ( count( $names ) ) { - $newRows = array(); - foreach ( $names as $name => $lastEditTime ) { - $newRows[] = array( - 'qcc_type' => 'activeusers', - 'qcc_namespace' => NS_USER, - 'qcc_title' => $name, - 'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ), - 'qcc_namespacetwo' => 0, // unused - 'qcc_titletwo' => '' // unused - ); - } - foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) { - $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ ); - if ( !$dbw->trxLevel() ) { - wfWaitForSlaves(); - } - } - } - - // If a transaction was already started, it might have an old - // snapshot, so kludge the timestamp range back as needed. - $asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() ); - - // Touch the data freshness timestamp - $dbw->replace( 'querycache_info', - array( 'qci_type' ), - array( 'qci_type' => 'activeusers', - 'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ), // not always $now - __METHOD__ - ); - - $dbw->unlock( $lockKey, __METHOD__ ); - $dbw->endAtomic( __METHOD__ ); - - return $eTimestamp; - } } -- 2.20.1