From 094d901b88ff59501b7b3f38fb7f7241379acfdf Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Thu, 24 Jul 2014 15:03:23 -0700 Subject: [PATCH] Refactored duplicated code into JobRunner.php * Also added an async flag to SpecialRunJobs so that it can be set to false to get a JSON blob back with a regular 200 status. Change-Id: I2f5763e017684c3c61f3d3f27ddf7f7834bdfce2 --- includes/AutoLoader.php | 1 + includes/jobqueue/JobRunner.php | 293 +++++++++++++++++++++++++++ includes/specials/SpecialRunJobs.php | 78 ++----- maintenance/runJobs.php | 215 +------------------- 4 files changed, 325 insertions(+), 262 deletions(-) create mode 100644 includes/jobqueue/JobRunner.php diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 079cbd8eaf..f35b38052c 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -639,6 +639,7 @@ $wgAutoloadLocalClasses = array( 'JobQueueGroup' => 'includes/jobqueue/JobQueueGroup.php', 'JobQueueFederated' => 'includes/jobqueue/JobQueueFederated.php', 'JobQueueRedis' => 'includes/jobqueue/JobQueueRedis.php', + 'JobRunner' => 'includes/jobqueue/JobRunner.php', 'JobSpecification' => 'includes/jobqueue/JobSpecification.php', # includes/jobqueue/jobs diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php new file mode 100644 index 0000000000..0f585c7dc6 --- /dev/null +++ b/includes/jobqueue/JobRunner.php @@ -0,0 +1,293 @@ +debug = $debug; + } + + /** + * Run jobs of the specified number/type for the specified time + * + * The response map has a 'job' field that lists status of each job, including: + * - type : the job type + * - status : ok/failed + * - error : any error message string + * - time : the job run time in ms + * The response map also has: + * - backoffs : the (job type => seconds) map of backoff times + * - elapsed : the total time spent running tasks in ms + * - reached : the reason the script finished, one of (none-ready, job-limit, time-limit) + * + * @param array $options + * @return array Summary response that can easily be JSON serialized + */ + public function run( array $options ) { + $response = array( 'jobs' => array(), 'reached' => 'none-ready' ); + + $type = isset( $options['type'] ) ? $options['type'] : false; + $maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false; + $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false; + $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; + + $group = JobQueueGroup::singleton(); + // Handle any required periodic queue maintenance + $count = $group->executeReadyPeriodicTasks(); + if ( $count > 0 ) { + $this->runJobsLog( "Executed $count periodic queue task(s)." ); + } + + // Flush any pending DB writes for sanity + wfGetLBFactory()->commitMasterChanges(); + + $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry) + $startingBackoffs = $backoffs; // avoid unnecessary writes + $backoffExpireFunc = function ( $t ) { + return $t > time(); + }; + + $jobsRun = 0; // counter + $timeMsTotal = 0; + $flags = JobQueueGroup::USE_CACHE; + $startTime = microtime( true ); // time since jobs started running + $lastTime = microtime( true ); // time since last slave check + do { + $backoffs = array_filter( $backoffs, $backoffExpireFunc ); + $blacklist = $noThrottle ? array() : array_keys( $backoffs ); + if ( $type === false ) { + $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist ); + } elseif ( in_array( $type, $blacklist ) ) { + $job = false; // requested queue in backoff state + } else { + $job = $group->pop( $type ); // job from a single queue + } + if ( $job ) { // found a job + $jType = $job->getType(); + + $this->runJobsLog( $job->toString() . " STARTING" ); + + // Run the job... + wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); + $t = microtime( true ); + try { + ++$jobsRun; + $status = $job->run(); + $error = $job->getLastError(); + wfGetLBFactory()->commitMasterChanges(); + } catch ( MWException $e ) { + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + $status = false; + $error = get_class( $e ) . ': ' . $e->getMessage(); + $e->report(); // write error to STDERR and the log + } + $timeMs = intval( ( microtime( true ) - $t ) * 1000 ); + wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); + $timeMsTotal += $timeMs; + + // Mark the job as done on success or when the job cannot be retried + if ( $status !== false || !$job->allowRetries() ) { + $group->ack( $job ); // done + } + + if ( $status === false ) { + $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" ); + } else { + $this->runJobsLog( $job->toString() . " t=$timeMs good" ); + } + + $response['jobs'][] = array( + 'type' => $jType, + 'status' => ( $status === false ) ? 'failed' : 'ok', + 'error' => $error, + 'time' => $timeMs + ); + + // Back off of certain jobs for a while (for throttling and for errors) + $ttw = $this->getBackoffTimeToWait( $job ); + if ( $status === false && mt_rand( 0, 49 ) == 0 ) { + $ttw = max( $ttw, 30 ); + } + if ( $ttw > 0 ) { + $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0; + $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw ); + } + + // Break out if we hit the job count or wall time limits... + if ( $maxJobs && $jobsRun >= $maxJobs ) { + $response['reached'] = 'job-limit'; + break; + } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) { + $response['reached'] = 'time-limit'; + break; + } + + // Don't let any of the main DB slaves get backed up + $timePassed = microtime( true ) - $lastTime; + if ( $timePassed >= 5 || $timePassed < 0 ) { + wfWaitForSlaves( $lastTime ); + $lastTime = microtime( true ); + } + // Don't let any queue slaves/backups fall behind + if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) { + $group->waitForBackups(); + } + + // Bail if near-OOM instead of in a job + $this->assertMemoryOK(); + } + } while ( $job ); // stop when there are no jobs + + // Sync the persistent backoffs for the next runJobs.php pass + $backoffs = array_filter( $backoffs, $backoffExpireFunc ); + if ( $backoffs !== $startingBackoffs ) { + $this->syncBackoffs( $backoffs ); + } + + $response['backoffs'] = $backoffs; + $response['elapsed'] = $timeMsTotal; + + return $response; + } + + /** + * @param Job $job + * @return int Seconds for this runner to avoid doing more jobs of this type + * @see $wgJobBackoffThrottling + */ + private function getBackoffTimeToWait( Job $job ) { + global $wgJobBackoffThrottling; + + if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) || + $job instanceof DuplicateJob // no work was done + ) { + return 0; // not throttled + } + + $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()]; + if ( $itemsPerSecond <= 0 ) { + return 0; // not throttled + } + + $seconds = 0; + if ( $job->workItemCount() > 0 ) { + $exactSeconds = $job->workItemCount() / $itemsPerSecond; + // use randomized rounding + $seconds = floor( $exactSeconds ); + $remainder = $exactSeconds - $seconds; + $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0; + } + + return (int)$seconds; + } + + /** + * Get the previous backoff expiries from persistent storage + * + * @return array Map of (job type => backoff expiry timestamp) + */ + private function loadBackoffs() { + $section = new ProfileSection( __METHOD__ ); + + $backoffs = array(); + $file = wfTempDir() . '/mw-runJobs-backoffs.json'; + if ( is_file( $file ) ) { + $handle = fopen( $file, 'rb' ); + flock( $handle, LOCK_SH ); + $content = stream_get_contents( $handle ); + flock( $handle, LOCK_UN ); + fclose( $handle ); + $backoffs = json_decode( $content, true ) ? : array(); + } + + return $backoffs; + } + + /** + * Merge the current backoff expiries from persistent storage + * + * @param array $backoffs Map of (job type => backoff expiry timestamp) + */ + private function syncBackoffs( array $backoffs ) { + $section = new ProfileSection( __METHOD__ ); + + $file = wfTempDir() . '/mw-runJobs-backoffs.json'; + $handle = fopen( $file, 'wb+' ); + flock( $handle, LOCK_EX ); + $content = stream_get_contents( $handle ); + $cBackoffs = json_decode( $content, true ) ? : array(); + foreach ( $backoffs as $type => $timestamp ) { + $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0; + $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] ); + } + ftruncate( $handle, 0 ); + fwrite( $handle, json_encode( $backoffs ) ); + flock( $handle, LOCK_UN ); + fclose( $handle ); + } + + /** + * Make sure that this script is not too close to the memory usage limit. + * It is better to die in between jobs than OOM right in the middle of one. + * @throws MWException + */ + private function assertMemoryOK() { + static $maxBytes = null; + if ( $maxBytes === null ) { + $m = array(); + if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) { + list( , $num, $unit ) = $m; + $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ); + $maxBytes = $num * $conv[strtolower( $unit )]; + } else { + $maxBytes = 0; + } + } + $usedBytes = memory_get_usage(); + if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { + throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." ); + } + } + + /** + * Log the job message + * @param string $msg The message to log + */ + private function runJobsLog( $msg ) { + if ( $this->debug ) { + call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) ); + } + wfDebugLog( 'runJobs', $msg ); + } +} diff --git a/includes/specials/SpecialRunJobs.php b/includes/specials/SpecialRunJobs.php index b977292412..54f224acec 100644 --- a/includes/specials/SpecialRunJobs.php +++ b/includes/specials/SpecialRunJobs.php @@ -47,7 +47,7 @@ class SpecialRunJobs extends UnlistedSpecialPage { return; } - $optional = array( 'maxjobs' => 0 ); + $optional = array( 'maxjobs' => 0, 'maxtime' => 30, 'type' => false, 'async' => true ); $required = array_flip( array( 'title', 'tasks', 'signature', 'sigexpiry' ) ); $params = array_intersect_key( $this->getRequest()->getValues(), $required + $optional ); @@ -75,18 +75,28 @@ class SpecialRunJobs extends UnlistedSpecialPage { // Apply any default parameter values $params += $optional; - // Client will usually disconnect before checking the response, - // but it needs to know when it is safe to disconnect. Until this - // reaches ignore_user_abort(), it is not safe as the jobs won't run. - ignore_user_abort( true ); // jobs may take a bit of time - header( "HTTP/1.0 202 Accepted" ); - ob_flush(); - flush(); - // Once the client receives this response, it can disconnect + if ( $params['async'] ) { + // Client will usually disconnect before checking the response, + // but it needs to know when it is safe to disconnect. Until this + // reaches ignore_user_abort(), it is not safe as the jobs won't run. + ignore_user_abort( true ); // jobs may take a bit of time + header( "HTTP/1.0 202 Accepted" ); + ob_flush(); + flush(); + // Once the client receives this response, it can disconnect + } // Do all of the specified tasks... if ( in_array( 'jobs', explode( '|', $params['tasks'] ) ) ) { - self::executeJobs( (int)$params['maxjobs'] ); + $runner = new JobRunner(); + $response = $runner->run( array( + 'type' => $params['type'], + 'maxJobs' => $params['maxjobs'] ? $params['maxjobs'] : 1, + 'maxTime' => $params['maxtime'] ? $params['maxjobs'] : 30 + ) ); + if ( !$params['async'] ) { + print FormatJson::encode( $response, true ); + } } } @@ -100,52 +110,4 @@ class SpecialRunJobs extends UnlistedSpecialPage { ksort( $query ); // stable order return hash_hmac( 'sha1', wfArrayToCgi( $query ), $wgSecretKey ); } - - /** - * Run jobs from the job queue - * - * @note also called from Wiki.php - * - * @param int $maxJobs Maximum number of jobs to run - * @return void - */ - public static function executeJobs( $maxJobs ) { - $n = $maxJobs; // number of jobs to run - if ( $n < 1 ) { - return; - } - try { - $group = JobQueueGroup::singleton(); - $count = $group->executeReadyPeriodicTasks(); - if ( $count > 0 ) { - wfDebugLog( 'jobqueue', "Executed $count periodic queue task(s)." ); - } - - do { - $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE ); - if ( $job ) { - $output = $job->toString() . "\n"; - $t = -microtime( true ); - wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); - $success = $job->run(); - wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); - $group->ack( $job ); // done - $t += microtime( true ); - $t = round( $t * 1000 ); - if ( $success === false ) { - $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n"; - } else { - $output .= "Success, Time: $t ms\n"; - } - wfDebugLog( 'jobqueue', $output ); - } - } while ( --$n && $job ); - } catch ( MWException $e ) { - MWExceptionHandler::rollbackMasterChangesAndLog( $e ); - // We don't want exceptions thrown during job execution to - // be reported to the user since the output is already sent. - // Instead we just log them. - MWExceptionHandler::logException( $e ); - } - } } diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php index e56640f93e..3e6fa52c8c 100644 --- a/maintenance/runJobs.php +++ b/maintenance/runJobs.php @@ -65,214 +65,21 @@ class RunJobs extends Maintenance { } } - $type = $this->getOption( 'type', false ); - $maxJobs = $this->getOption( 'maxjobs', false ); - $maxTime = $this->getOption( 'maxtime', false ); - $noThrottle = $this->hasOption( 'nothrottle' ); - $startTime = time(); - - $group = JobQueueGroup::singleton(); - // Handle any required periodic queue maintenance - $count = $group->executeReadyPeriodicTasks(); - if ( $count > 0 ) { - $this->runJobsLog( "Executed $count periodic queue task(s)." ); - } - - $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry) - $startingBackoffs = $backoffs; // avoid unnecessary writes - $backoffExpireFunc = function ( $t ) { - return $t > time(); - }; - - $jobsRun = 0; // counter - $flags = JobQueueGroup::USE_CACHE; - $lastTime = microtime( true ); // time since last slave check - do { - $backoffs = array_filter( $backoffs, $backoffExpireFunc ); - $blacklist = $noThrottle ? array() : array_keys( $backoffs ); - if ( $type === false ) { - $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist ); - } elseif ( in_array( $type, $blacklist ) ) { - $job = false; // requested queue in backoff state - } else { - $job = $group->pop( $type ); // job from a single queue - } - if ( $job ) { // found a job - ++$jobsRun; - $this->runJobsLog( $job->toString() . " STARTING" ); - - // Run the job... - wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); - $t = microtime( true ); - try { - $status = $job->run(); - $error = $job->getLastError(); - } catch ( MWException $e ) { - MWExceptionHandler::rollbackMasterChangesAndLog( $e ); - $status = false; - $error = get_class( $e ) . ': ' . $e->getMessage(); - $e->report(); // write error to STDERR and the log - } - $timeMs = intval( ( microtime( true ) - $t ) * 1000 ); - wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); - - // Mark the job as done on success or when the job cannot be retried - if ( $status !== false || !$job->allowRetries() ) { - $group->ack( $job ); // done - } - - if ( $status === false ) { - $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" ); - } else { - $this->runJobsLog( $job->toString() . " t=$timeMs good" ); - } - - // Back off of certain jobs for a while (for throttling and for errors) - $ttw = $this->getBackoffTimeToWait( $job ); - if ( $status === false && mt_rand( 0, 49 ) == 0 ) { - $ttw = max( $ttw, 30 ); - } - if ( $ttw > 0 ) { - $jType = $job->getType(); - $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0; - $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw ); - } - - // Break out if we hit the job count or wall time limits... - if ( $maxJobs && $jobsRun >= $maxJobs ) { - break; - } elseif ( $maxTime && ( time() - $startTime ) > $maxTime ) { - break; - } - - // Don't let any of the main DB slaves get backed up - $timePassed = microtime( true ) - $lastTime; - if ( $timePassed >= 5 || $timePassed < 0 ) { - wfWaitForSlaves( $lastTime ); - $lastTime = microtime( true ); - } - // Don't let any queue slaves/backups fall behind - if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) { - $group->waitForBackups(); - } - - // Bail if near-OOM instead of in a job - $this->assertMemoryOK(); - } - } while ( $job ); // stop when there are no jobs - // Sync the persistent backoffs for the next runJobs.php pass - $backoffs = array_filter( $backoffs, $backoffExpireFunc ); - if ( $backoffs !== $startingBackoffs ) { - $this->syncBackoffs( $backoffs ); - } - } - - /** - * @param Job $job - * @return int Seconds for this runner to avoid doing more jobs of this type - * @see $wgJobBackoffThrottling - */ - private function getBackoffTimeToWait( Job $job ) { - global $wgJobBackoffThrottling; - - if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) || - $job instanceof DuplicateJob // no work was done - ) { - return 0; // not throttled - } - - $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()]; - if ( $itemsPerSecond <= 0 ) { - return 0; // not throttled - } - - $seconds = 0; - if ( $job->workItemCount() > 0 ) { - $exactSeconds = $job->workItemCount() / $itemsPerSecond; - // use randomized rounding - $seconds = floor( $exactSeconds ); - $remainder = $exactSeconds - $seconds; - $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0; - } - - return (int)$seconds; - } - - /** - * Get the previous backoff expiries from persistent storage - * - * @return array Map of (job type => backoff expiry timestamp) - */ - private function loadBackoffs() { - $section = new ProfileSection( __METHOD__ ); - - $backoffs = array(); - $file = wfTempDir() . '/mw-runJobs-backoffs.json'; - if ( is_file( $file ) ) { - $handle = fopen( $file, 'rb' ); - flock( $handle, LOCK_SH ); - $content = stream_get_contents( $handle ); - flock( $handle, LOCK_UN ); - fclose( $handle ); - $backoffs = json_decode( $content, true ) ? : array(); - } - - return $backoffs; - } - - /** - * Merge the current backoff expiries from persistent storage - * - * @param array $backoffs Map of (job type => backoff expiry timestamp) - */ - private function syncBackoffs( array $backoffs ) { - $section = new ProfileSection( __METHOD__ ); - - $file = wfTempDir() . '/mw-runJobs-backoffs.json'; - $handle = fopen( $file, 'wb+' ); - flock( $handle, LOCK_EX ); - $content = stream_get_contents( $handle ); - $cBackoffs = json_decode( $content, true ) ? : array(); - foreach ( $backoffs as $type => $timestamp ) { - $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0; - $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] ); - } - ftruncate( $handle, 0 ); - fwrite( $handle, json_encode( $backoffs ) ); - flock( $handle, LOCK_UN ); - fclose( $handle ); - } - - /** - * Make sure that this script is not too close to the memory usage limit. - * It is better to die in between jobs than OOM right in the middle of one. - * @throws MWException - */ - private function assertMemoryOK() { - static $maxBytes = null; - if ( $maxBytes === null ) { - $m = array(); - if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) { - list( , $num, $unit ) = $m; - $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ); - $maxBytes = $num * $conv[strtolower( $unit )]; - } else { - $maxBytes = 0; - } - } - $usedBytes = memory_get_usage(); - if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { - throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." ); - } + $runner = new JobRunner(); + $runner->setDebugHandler( array( $this, 'debugInternal' ) ); + $runner->run( array( + 'type' => $this->getOption( 'type', false ), + 'maxJobs' => $this->getOption( 'maxjobs', false ), + 'maxTime' => $this->getOption( 'maxtime', false ), + 'throttle' => $this->hasOption( 'nothrottle' ) ? false : true, + ) ); } /** - * Log the job message - * @param string $msg The message to log + * @param string $s */ - private function runJobsLog( $msg ) { - $this->output( wfTimestamp( TS_DB ) . " $msg\n" ); - wfDebugLog( 'runJobs', $msg ); + public function debugInternal( $s ) { + $this->output( $s ); } } -- 2.20.1