More tweaks to job backoff code
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 3 Sep 2014 18:50:12 +0000 (11:50 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Wed, 3 Sep 2014 19:37:50 +0000 (19:37 +0000)
* Replace one time() call with microtime() in syncBackoffDeltas().
  Also moved the call down slightly to not count flock() delay.
* Moved read-only case logic into syncBackoffDeltas().
* Moved $backoffExpireFunc logic into syncBackoffDeltas().
* Tightened the syncBackoffDeltas() checks around pop() for
  better accuracy.

Change-Id: Ifed3d24ba62277c0e0f52cdc1051990a590be18a

includes/jobqueue/JobRunner.php

index 8462924..8a708f1 100644 (file)
@@ -80,11 +80,9 @@ class JobRunner {
                wfGetLBFactory()->commitMasterChanges();
 
                // Some jobs types should not run until a certain timestamp
-               $backoffs = $this->loadBackoffs( array(), 'wait' ); // map of (type => UNIX expiry)
+               $backoffs = array(); // map of (type => UNIX expiry)
                $backoffDeltas = array(); // map of (type => seconds)
-               $backoffExpireFunc = function ( $t ) {
-                       return $t > time();
-               };
+               $wait = 'wait'; // block to read backoffs the first time
 
                $jobsRun = 0; // counter
                $timeMsTotal = 0;
@@ -92,8 +90,11 @@ class JobRunner {
                $sTime = microtime( true ); // time since jobs started running
                $lastTime = microtime( true ); // time since last slave check
                do {
-                       $backoffs = array_filter( $backoffs, $backoffExpireFunc );
+                       // Sync the persistent backoffs with concurrent runners
+                       $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
                        $blacklist = $noThrottle ? array() : array_keys( $backoffs );
+                       $wait = 'nowait'; // less important now
+
                        if ( $type === false ) {
                                $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist );
                        } elseif ( in_array( $type, $blacklist ) ) {
@@ -101,9 +102,21 @@ class JobRunner {
                        } else {
                                $job = $group->pop( $type ); // job from a single queue
                        }
+
                        if ( $job ) { // found a job
                                $jType = $job->getType();
 
+                               // Back off of certain jobs for a while (for throttling and for errors)
+                               $ttw = $this->getBackoffTimeToWait( $job );
+                               if ( $ttw > 0 ) {
+                                       // Always add the delta for other runners in case the time running the
+                                       // job negated the backoff for each individually but not collectively.
+                                       $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+                                               ? $backoffDeltas[$jType] + $ttw
+                                               : $ttw;
+                                       $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
+                               }
+
                                $this->runJobsLog( $job->toString() . " STARTING" );
 
                                // Run the job...
@@ -130,23 +143,13 @@ class JobRunner {
                                }
 
                                // Back off of certain jobs for a while (for throttling and for errors)
-                               $ttw = $this->getBackoffTimeToWait( $job );
                                if ( $status === false && mt_rand( 0, 49 ) == 0 ) {
                                        $ttw = max( $ttw, 30 ); // too many errors
-                               }
-                               if ( $ttw > 0 ) {
-                                       // Always add the delta for other runners in case the time running the
-                                       // job negated the backoff for each individually but not collectively.
                                        $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
                                                ? $backoffDeltas[$jType] + $ttw
                                                : $ttw;
                                }
 
-                               // Sync the persistent backoffs with concurrent runners
-                               $backoffs = $backoffDeltas
-                                       ? $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'nowait' )
-                                       : $this->loadBackoffs( $backoffs, 'nowait' );
-
                                if ( $status === false ) {
                                        $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
                                } else {
@@ -249,12 +252,18 @@ class JobRunner {
                        $content = stream_get_contents( $handle );
                        flock( $handle, LOCK_UN );
                        fclose( $handle );
-                       $backoffs = json_decode( $content, true ) ?: array();
+                       $ctime = microtime( true );
+                       $cBackoffs = json_decode( $content, true ) ?: array();
+                       foreach ( $cBackoffs as $type => $timestamp ) {
+                               if ( $timestamp < $ctime ) {
+                                       unset( $cBackoffs[$type] );
+                               }
+                       }
                } else {
-                       $backoffs = array();
+                       $cBackoffs = array();
                }
 
-               return $backoffs;
+               return $cBackoffs;
        }
 
        /**
@@ -271,7 +280,10 @@ class JobRunner {
        private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
                $section = new ProfileSection( __METHOD__ );
 
-               $ctime = time();
+               if ( !$deltas ) {
+                       return $this->loadBackoffs( $backoffs, $mode );
+               }
+
                $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
                $file = wfTempDir() . '/mw-runJobs-backoffs.json';
                $handle = fopen( $file, 'wb+' );
@@ -279,6 +291,7 @@ class JobRunner {
                        fclose( $handle );
                        return $backoffs; // don't wait on lock
                }
+               $ctime = microtime( true );
                $content = stream_get_contents( $handle );
                $cBackoffs = json_decode( $content, true ) ?: array();
                foreach ( $deltas as $type => $seconds ) {
@@ -286,6 +299,11 @@ class JobRunner {
                                ? $cBackoffs[$type] + $seconds
                                : $ctime + $seconds;
                }
+               foreach ( $cBackoffs as $type => $timestamp ) {
+                       if ( $timestamp < $ctime ) {
+                               unset( $cBackoffs[$type] );
+                       }
+               }
                ftruncate( $handle, 0 );
                fwrite( $handle, json_encode( $cBackoffs ) );
                flock( $handle, LOCK_UN );