Improved job backoff handling to be more properly per-server
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 30 Aug 2014 02:48:09 +0000 (19:48 -0700)
committerNik Everett <neverett@wikimedia.org>
Wed, 3 Sep 2014 14:25:30 +0000 (10:25 -0400)
Change-Id: I7c8e44a474ca8d05771477665ba508a358c5747d

includes/jobqueue/JobRunner.php

index 617a3a3..8462924 100644 (file)
@@ -79,8 +79,9 @@ class JobRunner {
                // Flush any pending DB writes for sanity
                wfGetLBFactory()->commitMasterChanges();
 
-               $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry)
-               $startingBackoffs = $backoffs; // avoid unnecessary writes
+               // Some jobs types should not run until a certain timestamp
+               $backoffs = $this->loadBackoffs( array(), 'wait' ); // map of (type => UNIX expiry)
+               $backoffDeltas = array(); // map of (type => seconds)
                $backoffExpireFunc = function ( $t ) {
                        return $t > time();
                };
@@ -88,7 +89,7 @@ class JobRunner {
                $jobsRun = 0; // counter
                $timeMsTotal = 0;
                $flags = JobQueueGroup::USE_CACHE;
-               $startTime = microtime( true ); // time since jobs started running
+               $sTime = microtime( true ); // time since jobs started running
                $lastTime = microtime( true ); // time since last slave check
                do {
                        $backoffs = array_filter( $backoffs, $backoffExpireFunc );
@@ -107,7 +108,7 @@ class JobRunner {
 
                                // Run the job...
                                wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
-                               $t = microtime( true );
+                               $sTime = microtime( true );
                                try {
                                        ++$jobsRun;
                                        $status = $job->run();
@@ -119,7 +120,7 @@ class JobRunner {
                                        $error = get_class( $e ) . ': ' . $e->getMessage();
                                        MWExceptionHandler::logException( $e );
                                }
-                               $timeMs = intval( ( microtime( true ) - $t ) * 1000 );
+                               $timeMs = intval( ( microtime( true ) - $sTime ) * 1000 );
                                wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
                                $timeMsTotal += $timeMs;
 
@@ -128,6 +129,24 @@ class JobRunner {
                                        $group->ack( $job ); // done
                                }
 
+                               // Back off of certain jobs for a while (for throttling and for errors)
+                               $ttw = $this->getBackoffTimeToWait( $job );
+                               if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
+                                       $ttw = max( $ttw, 30 ); // too many errors
+                               }
+                               if ( $ttw > 0 ) {
+                                       // Always add the delta for other runners in case the time running the
+                                       // job negated the backoff for each individually but not collectively.
+                                       $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+                                               ? $backoffDeltas[$jType] + $ttw
+                                               : $ttw;
+                               }
+
+                               // Sync the persistent backoffs with concurrent runners
+                               $backoffs = $backoffDeltas
+                                       ? $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'nowait' )
+                                       : $this->loadBackoffs( $backoffs, 'nowait' );
+
                                if ( $status === false ) {
                                        $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
                                } else {
@@ -141,21 +160,11 @@ class JobRunner {
                                        'time'   => $timeMs
                                );
 
-                               // Back off of certain jobs for a while (for throttling and for errors)
-                               $ttw = $this->getBackoffTimeToWait( $job );
-                               if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
-                                       $ttw = max( $ttw, 30 );
-                               }
-                               if ( $ttw > 0 ) {
-                                       $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0;
-                                       $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw );
-                               }
-
                                // Break out if we hit the job count or wall time limits...
                                if ( $maxJobs && $jobsRun >= $maxJobs ) {
                                        $response['reached'] = 'job-limit';
                                        break;
-                               } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
+                               } elseif ( $maxTime && ( microtime( true ) - $sTime ) > $maxTime ) {
                                        $response['reached'] = 'time-limit';
                                        break;
                                }
@@ -177,9 +186,8 @@ class JobRunner {
                } while ( $job ); // stop when there are no jobs
 
                // Sync the persistent backoffs for the next runJobs.php pass
-               $backoffs = array_filter( $backoffs, $backoffExpireFunc );
-               if ( $backoffs !== $startingBackoffs ) {
-                       $this->syncBackoffs( $backoffs );
+               if ( $backoffDeltas ) {
+                       $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
                }
 
                $response['backoffs'] = $backoffs;
@@ -221,21 +229,29 @@ class JobRunner {
 
        /**
         * Get the previous backoff expiries from persistent storage
+        * On I/O or lock acquisition failure this returns the original $backoffs.
         *
+        * @param array $backoffs Map of (job type => UNIX timestamp)
+        * @param string $mode Lock wait mode - "wait" or "nowait"
         * @return array Map of (job type => backoff expiry timestamp)
         */
-       private function loadBackoffs() {
+       private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
                $section = new ProfileSection( __METHOD__ );
 
-               $backoffs = array();
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                if ( is_file( $file ) ) {
+                       $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
                        $handle = fopen( $file, 'rb' );
-                       flock( $handle, LOCK_SH );
+                       if ( !flock( $handle, LOCK_SH | $noblock ) ) {
+                               fclose( $handle );
+                               return $backoffs; // don't wait on lock
+                       }
                        $content = stream_get_contents( $handle );
                        flock( $handle, LOCK_UN );
                        fclose( $handle );
                        $backoffs = json_decode( $content, true ) ?: array();
+               } else {
+                       $backoffs = array();
                }
 
                return $backoffs;
@@ -244,24 +260,40 @@ class JobRunner {
        /**
         * Merge the current backoff expiries from persistent storage
         *
-        * @param array $backoffs Map of (job type => backoff expiry timestamp)
+        * The $deltas map is set to an empty array on success.
+        * On I/O or lock acquisition failure this returns the original $backoffs.
+        *
+        * @param array $backoffs Map of (job type => UNIX timestamp)
+        * @param array $deltas Map of (job type => seconds)
+        * @param string $mode Lock wait mode - "wait" or "nowait"
+        * @return array The new backoffs account for $backoffs and the latest file data
         */
-       private function syncBackoffs( array $backoffs ) {
+       private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
                $section = new ProfileSection( __METHOD__ );
 
+               $ctime = time();
+               $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                $handle = fopen( $file, 'wb+' );
-               flock( $handle, LOCK_EX );
+               if ( !flock( $handle, LOCK_EX | $noblock ) ) {
+                       fclose( $handle );
+                       return $backoffs; // don't wait on lock
+               }
                $content = stream_get_contents( $handle );
                $cBackoffs = json_decode( $content, true ) ?: array();
-               foreach ( $backoffs as $type => $timestamp ) {
-                       $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0;
-                       $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] );
+               foreach ( $deltas as $type => $seconds ) {
+                       $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
+                               ? $cBackoffs[$type] + $seconds
+                               : $ctime + $seconds;
                }
                ftruncate( $handle, 0 );
-               fwrite( $handle, json_encode( $backoffs ) );
+               fwrite( $handle, json_encode( $cBackoffs ) );
                flock( $handle, LOCK_UN );
                fclose( $handle );
+
+               $deltas = array();
+
+               return $cBackoffs;
        }
 
        /**