From e8cb20737acbf4b0de0e9420d91ae0813bc0a523 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Sat, 21 Dec 2013 12:57:45 -0800 Subject: [PATCH] Added $wgJobBackoffThrottling to replace wmf sleep() hack * This also adds a Job::workItemCount() method * Removed unused USE_PRIORITY constant * A few small cleanups in runJobs.php Change-Id: Ife9370e488fa63dcd1f702ed98f3b7f26057f10c --- includes/DefaultSettings.php | 10 +++ includes/job/Job.php | 9 ++ includes/job/JobQueueGroup.php | 26 +++--- includes/job/jobs/HTMLCacheUpdateJob.php | 4 + includes/job/jobs/RefreshLinksJob.php | 4 + maintenance/runJobs.php | 110 +++++++++++++++++++++-- 6 files changed, 144 insertions(+), 19 deletions(-) diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index b1265c1b2c..e705ab46be 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -6093,9 +6093,19 @@ $wgJobClasses = array( * - Jobs that you would never want to run as part of a page rendering request. * - Jobs that you want to run on specialized machines ( like transcoding, or a particular * machine on your cluster has 'outside' web access you could restrict uploadFromUrl ) + * These settings should be global to all wikis. */ $wgJobTypesExcludedFromDefaultQueue = array( 'AssembleUploadChunks', 'PublishStashedFile' ); +/** + * Map of job types to how many job "work items" should be run per second + * on each job runner process. The meaning of "work items" varies per job, + * but typically would be something like "pages to update". A single job + * may have a variable number of work items, as is the case with batch jobs. + * These settings should be global to all wikis. + */ +$wgJobBackoffThrottling = array(); + /** * Map of job types to configuration arrays. * This determines which queue class and storage system is used for each job type. diff --git a/includes/job/Job.php b/includes/job/Job.php index 77652a4d4e..067ede1ad7 100644 --- a/includes/job/Job.php +++ b/includes/job/Job.php @@ -197,6 +197,15 @@ abstract class Job { return true; } + /** + * @return integer Number of actually "work items" handled in this job + * @see $wgJobBackoffThrottling + * @since 1.23 + */ + public function workItemCount() { + return 1; + } + /** * Subclasses may need to override this to make duplication detection work. * The resulting map conveys everything that makes the job unique. This is diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index 9d206a3778..d71df15d15 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -44,7 +44,6 @@ class JobQueueGroup { const TYPE_ANY = 2; // integer; any job const USE_CACHE = 1; // integer; use process or persistent cache - const USE_PRIORITY = 2; // integer; respect deprioritization const PROC_CACHE_TTL = 15; // integer; seconds @@ -149,18 +148,21 @@ class JobQueueGroup { * This pops a job off a queue as specified by $wgJobTypeConf and * updates the aggregate job queue information cache as needed. * - * @param int|string $qtype JobQueueGroup::TYPE_DEFAULT or type string + * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string * @param int $flags Bitfield of JobQueueGroup::USE_* constants + * @param array $blacklist List of job types to ignore * @return Job|bool Returns false on failure */ - public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) { + public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = array() ) { + $job = false; + if ( is_string( $qtype ) ) { // specific job type - $job = $this->get( $qtype )->pop(); - if ( !$job ) { - JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); + if ( !in_array( $qtype, $blacklist ) ) { + $job = $this->get( $qtype )->pop(); + if ( !$job ) { + JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); + } } - - return $job; } else { // any job in the "default" jobs types if ( $flags & self::USE_CACHE ) { if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { @@ -174,20 +176,22 @@ class JobQueueGroup { if ( $qtype == self::TYPE_DEFAULT ) { $types = array_intersect( $types, $this->getDefaultQueueTypes() ); } + + $types = array_diff( $types, $blacklist ); // avoid selected types shuffle( $types ); // avoid starvation foreach ( $types as $type ) { // for each queue... $job = $this->get( $type )->pop(); if ( $job ) { // found - return $job; + break; } else { // not found JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); $this->cache->clear( 'queues-ready' ); } } - - return false; // no jobs found } + + return $job; } /** diff --git a/includes/job/jobs/HTMLCacheUpdateJob.php b/includes/job/jobs/HTMLCacheUpdateJob.php index 833616dbbe..61d100c4d1 100644 --- a/includes/job/jobs/HTMLCacheUpdateJob.php +++ b/includes/job/jobs/HTMLCacheUpdateJob.php @@ -154,4 +154,8 @@ class HTMLCacheUpdateJob extends Job { } } } + + public function workItemCount() { + return isset( $this->params['pages'] ) ? count( $this->params['pages'] ) : 1; + } } diff --git a/includes/job/jobs/RefreshLinksJob.php b/includes/job/jobs/RefreshLinksJob.php index d8526ee09b..bdf0fdf07c 100644 --- a/includes/job/jobs/RefreshLinksJob.php +++ b/includes/job/jobs/RefreshLinksJob.php @@ -170,4 +170,8 @@ class RefreshLinksJob extends Job { return $info; } + + public function workItemCount() { + return isset( $this->params['pages'] ) ? count( $this->params['pages'] ) : 1; + } } diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php index 8a4a5ab0cc..deea5ed961 100644 --- a/maintenance/runJobs.php +++ b/maintenance/runJobs.php @@ -64,12 +64,12 @@ class RunJobs extends Maintenance { } } } + + $type = $this->getOption( 'type', false ); $maxJobs = $this->getOption( 'maxjobs', false ); $maxTime = $this->getOption( 'maxtime', false ); $startTime = time(); - $type = $this->getOption( 'type', false ); $wgTitle = Title::newFromText( 'RunJobs.php' ); - $jobsRun = 0; // counter $group = JobQueueGroup::singleton(); // Handle any required periodic queue maintenance @@ -78,12 +78,21 @@ class RunJobs extends Maintenance { $this->runJobsLog( "Executed $count periodic queue task(s)." ); } - $flags = JobQueueGroup::USE_CACHE | JobQueueGroup::USE_PRIORITY; + $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry) + $startingBackoffs = $backoffs; // avoid unnecessary writes + $backoffExpireFunc = function( $t ) { return $t > time(); }; + + $jobsRun = 0; // counter + $flags = JobQueueGroup::USE_CACHE; $lastTime = time(); // time since last slave check do { - $job = ( $type === false ) - ? $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags ) - : $group->pop( $type ); // job from a single queue + if ( $type === false ) { + $backoffs = array_filter( $backoffs, $backoffExpireFunc ); + $blacklist = array_keys( $backoffs ); + $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist ); + } else { + $group->pop( $type ); // job from a single queue + } if ( $job ) { // found a job ++$jobsRun; $this->runJobsLog( $job->toString() . " STARTING" ); @@ -117,6 +126,14 @@ class RunJobs extends Maintenance { $this->runJobsLog( $job->toString() . " t=$timeMs good" ); } + // Back off of certain jobs for a while + $ttw = $this->getBackoffTimeToWait( $job ); + if ( $ttw > 0 ) { + $jType = $job->getType(); + $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0; + $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw ); + } + // Break out if we hit the job count or wall time limits... if ( $maxJobs && $jobsRun >= $maxJobs ) { break; @@ -139,10 +156,87 @@ class RunJobs extends Maintenance { $this->assertMemoryOK(); } } while ( $job ); // stop when there are no jobs + // Sync the persistent backoffs for the next runJobs.php pass + $backoffs = array_filter( $backoffs, $backoffExpireFunc ); + if ( $backoffs !== $startingBackoffs ) { + $this->syncBackoffs( $backoffs ); + } + } + + /** + * @param Job $job + * @return integer Seconds for this runner to avoid doing more jobs of this type + * @see $wgJobBackoffThrottling + */ + private function getBackoffTimeToWait( Job $job ) { + global $wgJobBackoffThrottling; + + if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ) { + return 0; // not throttled + } + $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()]; + if ( $itemsPerSecond <= 0 ) { + return 0; // not throttled + } + + $seconds = 0; + if ( $job->workItemCount() > 0 ) { + $seconds = floor( $job->workItemCount() / $itemsPerSecond ); + $remainder = $job->workItemCount() % $itemsPerSecond; + $seconds += ( mt_rand( 1, $itemsPerSecond ) <= $remainder ) ? 1 : 0; + } + + return (int)$seconds; + } + + /** + * Get the previous backoff expiries from persistent storage + * + * @return array Map of (job type => backoff expiry timestamp) + */ + private function loadBackoffs() { + $section = new ProfileSection( __METHOD__ ); + + $backoffs = array(); + $file = wfTempDir() . '/mw-runJobs-backoffs.json'; + if ( is_file( $file ) ) { + $handle = fopen( $file, 'rb' ); + flock( $handle, LOCK_SH ); + $content = stream_get_contents( $handle ); + flock( $handle, LOCK_UN ); + fclose( $handle ); + $backoffs = json_decode( $content, true ) ?: array(); + } + + return $backoffs; + } + + /** + * Merge the current backoff expiries from persistent storage + * + * @param array $backoffs Map of (job type => backoff expiry timestamp) + */ + private function syncBackoffs( array $backoffs ) { + $section = new ProfileSection( __METHOD__ ); + + $file = wfTempDir() . '/mw-runJobs-backoffs.json'; + $handle = fopen( $file, 'wb+' ); + flock( $handle, LOCK_EX ); + $content = stream_get_contents( $handle ); + $cBackoffs = json_decode( $content, true ) ?: array(); + foreach ( $backoffs as $type => $timestamp ) { + $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0; + $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] ); + } + ftruncate( $handle, 0 ); + fwrite( $handle, json_encode( $backoffs ) ); + flock( $handle, LOCK_UN ); + fclose( $handle ); } /** - * Make sure that this script is not too close to the memory usage limit + * Make sure that this script is not too close to the memory usage limit. + * It is better to die in between jobs than OOM right in the middle of one. * @throws MWException */ private function assertMemoryOK() { @@ -151,7 +245,7 @@ class RunJobs extends Maintenance { $m = array(); if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) { list( , $num, $unit ) = $m; - $conv = array( 'g' => 1024 * 1024 * 1024, 'm' => 1024 * 1024, 'k' => 1024, '' => 1 ); + $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ); $maxBytes = $num * $conv[strtolower( $unit )]; } else { $maxBytes = 0; -- 2.20.1