From db5ad07b24fcffbc110e792a55184e99b171260e Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 29 Aug 2014 19:48:09 -0700 Subject: [PATCH] Improved job backoff handling to be more properly per-server Change-Id: I7c8e44a474ca8d05771477665ba508a358c5747d --- includes/jobqueue/JobRunner.php | 90 ++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 29 deletions(-) diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 617a3a31df..8462924019 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -79,8 +79,9 @@ class JobRunner { // Flush any pending DB writes for sanity wfGetLBFactory()->commitMasterChanges(); - $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry) - $startingBackoffs = $backoffs; // avoid unnecessary writes + // Some jobs types should not run until a certain timestamp + $backoffs = $this->loadBackoffs( array(), 'wait' ); // map of (type => UNIX expiry) + $backoffDeltas = array(); // map of (type => seconds) $backoffExpireFunc = function ( $t ) { return $t > time(); }; @@ -88,7 +89,7 @@ class JobRunner { $jobsRun = 0; // counter $timeMsTotal = 0; $flags = JobQueueGroup::USE_CACHE; - $startTime = microtime( true ); // time since jobs started running + $sTime = microtime( true ); // time since jobs started running $lastTime = microtime( true ); // time since last slave check do { $backoffs = array_filter( $backoffs, $backoffExpireFunc ); @@ -107,7 +108,7 @@ class JobRunner { // Run the job... wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); - $t = microtime( true ); + $sTime = microtime( true ); try { ++$jobsRun; $status = $job->run(); @@ -119,7 +120,7 @@ class JobRunner { $error = get_class( $e ) . ': ' . $e->getMessage(); MWExceptionHandler::logException( $e ); } - $timeMs = intval( ( microtime( true ) - $t ) * 1000 ); + $timeMs = intval( ( microtime( true ) - $sTime ) * 1000 ); wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); $timeMsTotal += $timeMs; @@ -128,6 +129,24 @@ class JobRunner { $group->ack( $job ); // done } + // Back off of certain jobs for a while (for throttling and for errors) + $ttw = $this->getBackoffTimeToWait( $job ); + if ( $status === false && mt_rand( 0, 49 ) == 0 ) { + $ttw = max( $ttw, 30 ); // too many errors + } + if ( $ttw > 0 ) { + // Always add the delta for other runners in case the time running the + // job negated the backoff for each individually but not collectively. + $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) + ? $backoffDeltas[$jType] + $ttw + : $ttw; + } + + // Sync the persistent backoffs with concurrent runners + $backoffs = $backoffDeltas + ? $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'nowait' ) + : $this->loadBackoffs( $backoffs, 'nowait' ); + if ( $status === false ) { $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" ); } else { @@ -141,21 +160,11 @@ class JobRunner { 'time' => $timeMs ); - // Back off of certain jobs for a while (for throttling and for errors) - $ttw = $this->getBackoffTimeToWait( $job ); - if ( $status === false && mt_rand( 0, 49 ) == 0 ) { - $ttw = max( $ttw, 30 ); - } - if ( $ttw > 0 ) { - $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0; - $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw ); - } - // Break out if we hit the job count or wall time limits... if ( $maxJobs && $jobsRun >= $maxJobs ) { $response['reached'] = 'job-limit'; break; - } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) { + } elseif ( $maxTime && ( microtime( true ) - $sTime ) > $maxTime ) { $response['reached'] = 'time-limit'; break; } @@ -177,9 +186,8 @@ class JobRunner { } while ( $job ); // stop when there are no jobs // Sync the persistent backoffs for the next runJobs.php pass - $backoffs = array_filter( $backoffs, $backoffExpireFunc ); - if ( $backoffs !== $startingBackoffs ) { - $this->syncBackoffs( $backoffs ); + if ( $backoffDeltas ) { + $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' ); } $response['backoffs'] = $backoffs; @@ -221,21 +229,29 @@ class JobRunner { /** * Get the previous backoff expiries from persistent storage + * On I/O or lock acquisition failure this returns the original $backoffs. * + * @param array $backoffs Map of (job type => UNIX timestamp) + * @param string $mode Lock wait mode - "wait" or "nowait" * @return array Map of (job type => backoff expiry timestamp) */ - private function loadBackoffs() { + private function loadBackoffs( array $backoffs, $mode = 'wait' ) { $section = new ProfileSection( __METHOD__ ); - $backoffs = array(); $file = wfTempDir() . '/mw-runJobs-backoffs.json'; if ( is_file( $file ) ) { + $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; $handle = fopen( $file, 'rb' ); - flock( $handle, LOCK_SH ); + if ( !flock( $handle, LOCK_SH | $noblock ) ) { + fclose( $handle ); + return $backoffs; // don't wait on lock + } $content = stream_get_contents( $handle ); flock( $handle, LOCK_UN ); fclose( $handle ); $backoffs = json_decode( $content, true ) ?: array(); + } else { + $backoffs = array(); } return $backoffs; @@ -244,24 +260,40 @@ class JobRunner { /** * Merge the current backoff expiries from persistent storage * - * @param array $backoffs Map of (job type => backoff expiry timestamp) + * The $deltas map is set to an empty array on success. + * On I/O or lock acquisition failure this returns the original $backoffs. + * + * @param array $backoffs Map of (job type => UNIX timestamp) + * @param array $deltas Map of (job type => seconds) + * @param string $mode Lock wait mode - "wait" or "nowait" + * @return array The new backoffs account for $backoffs and the latest file data */ - private function syncBackoffs( array $backoffs ) { + private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { $section = new ProfileSection( __METHOD__ ); + $ctime = time(); + $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; $file = wfTempDir() . '/mw-runJobs-backoffs.json'; $handle = fopen( $file, 'wb+' ); - flock( $handle, LOCK_EX ); + if ( !flock( $handle, LOCK_EX | $noblock ) ) { + fclose( $handle ); + return $backoffs; // don't wait on lock + } $content = stream_get_contents( $handle ); $cBackoffs = json_decode( $content, true ) ?: array(); - foreach ( $backoffs as $type => $timestamp ) { - $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0; - $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] ); + foreach ( $deltas as $type => $seconds ) { + $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime + ? $cBackoffs[$type] + $seconds + : $ctime + $seconds; } ftruncate( $handle, 0 ); - fwrite( $handle, json_encode( $backoffs ) ); + fwrite( $handle, json_encode( $cBackoffs ) ); flock( $handle, LOCK_UN ); fclose( $handle ); + + $deltas = array(); + + return $cBackoffs; } /** -- 2.20.1