From 797c7c900544fabb15cfdc17deb3453293faf2ae Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 3 Sep 2014 11:50:12 -0700 Subject: [PATCH] More tweaks to job backoff code * Replace one time() call with microtime() in syncBackoffDeltas(). Also moved the call down slightly to not count flock() delay. * Moved read-only case logic into syncBackoffDeltas(). * Moved $backoffExpireFunc logic into syncBackoffDeltas(). * Tightened the syncBackoffDeltas() checks around pop() for better accuracy. Change-Id: Ifed3d24ba62277c0e0f52cdc1051990a590be18a --- includes/jobqueue/JobRunner.php | 56 ++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 8462924019..8a708f16c8 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -80,11 +80,9 @@ class JobRunner { wfGetLBFactory()->commitMasterChanges(); // Some jobs types should not run until a certain timestamp - $backoffs = $this->loadBackoffs( array(), 'wait' ); // map of (type => UNIX expiry) + $backoffs = array(); // map of (type => UNIX expiry) $backoffDeltas = array(); // map of (type => seconds) - $backoffExpireFunc = function ( $t ) { - return $t > time(); - }; + $wait = 'wait'; // block to read backoffs the first time $jobsRun = 0; // counter $timeMsTotal = 0; @@ -92,8 +90,11 @@ class JobRunner { $sTime = microtime( true ); // time since jobs started running $lastTime = microtime( true ); // time since last slave check do { - $backoffs = array_filter( $backoffs, $backoffExpireFunc ); + // Sync the persistent backoffs with concurrent runners + $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); $blacklist = $noThrottle ? array() : array_keys( $backoffs ); + $wait = 'nowait'; // less important now + if ( $type === false ) { $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist ); } elseif ( in_array( $type, $blacklist ) ) { @@ -101,9 +102,21 @@ class JobRunner { } else { $job = $group->pop( $type ); // job from a single queue } + if ( $job ) { // found a job $jType = $job->getType(); + // Back off of certain jobs for a while (for throttling and for errors) + $ttw = $this->getBackoffTimeToWait( $job ); + if ( $ttw > 0 ) { + // Always add the delta for other runners in case the time running the + // job negated the backoff for each individually but not collectively. + $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) + ? $backoffDeltas[$jType] + $ttw + : $ttw; + $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); + } + $this->runJobsLog( $job->toString() . " STARTING" ); // Run the job... @@ -130,23 +143,13 @@ class JobRunner { } // Back off of certain jobs for a while (for throttling and for errors) - $ttw = $this->getBackoffTimeToWait( $job ); if ( $status === false && mt_rand( 0, 49 ) == 0 ) { $ttw = max( $ttw, 30 ); // too many errors - } - if ( $ttw > 0 ) { - // Always add the delta for other runners in case the time running the - // job negated the backoff for each individually but not collectively. $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) ? $backoffDeltas[$jType] + $ttw : $ttw; } - // Sync the persistent backoffs with concurrent runners - $backoffs = $backoffDeltas - ? $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'nowait' ) - : $this->loadBackoffs( $backoffs, 'nowait' ); - if ( $status === false ) { $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" ); } else { @@ -249,12 +252,18 @@ class JobRunner { $content = stream_get_contents( $handle ); flock( $handle, LOCK_UN ); fclose( $handle ); - $backoffs = json_decode( $content, true ) ?: array(); + $ctime = microtime( true ); + $cBackoffs = json_decode( $content, true ) ?: array(); + foreach ( $cBackoffs as $type => $timestamp ) { + if ( $timestamp < $ctime ) { + unset( $cBackoffs[$type] ); + } + } } else { - $backoffs = array(); + $cBackoffs = array(); } - return $backoffs; + return $cBackoffs; } /** @@ -271,7 +280,10 @@ class JobRunner { private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { $section = new ProfileSection( __METHOD__ ); - $ctime = time(); + if ( !$deltas ) { + return $this->loadBackoffs( $backoffs, $mode ); + } + $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; $file = wfTempDir() . '/mw-runJobs-backoffs.json'; $handle = fopen( $file, 'wb+' ); @@ -279,6 +291,7 @@ class JobRunner { fclose( $handle ); return $backoffs; // don't wait on lock } + $ctime = microtime( true ); $content = stream_get_contents( $handle ); $cBackoffs = json_decode( $content, true ) ?: array(); foreach ( $deltas as $type => $seconds ) { @@ -286,6 +299,11 @@ class JobRunner { ? $cBackoffs[$type] + $seconds : $ctime + $seconds; } + foreach ( $cBackoffs as $type => $timestamp ) { + if ( $timestamp < $ctime ) { + unset( $cBackoffs[$type] ); + } + } ftruncate( $handle, 0 ); fwrite( $handle, json_encode( $cBackoffs ) ); flock( $handle, LOCK_UN ); -- 2.20.1