X-Git-Url: https://git.cyclocoop.org/?a=blobdiff_plain;f=maintenance%2FrunJobs.php;h=40605cebaed736cc0df12a56452eb4f782033d30;hb=b1e4006b440185c7e8304c03946bb0155b0edc83;hp=e56640f93ea1f2b9a3865318b1ad5188da0fcce3;hpb=806dcb725770a585f4b09f0a24ba2178a74561b5;p=lhc%2Fweb%2Fwiklou.git diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php index e56640f93e..40605cebae 100644 --- a/maintenance/runJobs.php +++ b/maintenance/runJobs.php @@ -37,6 +37,7 @@ class RunJobs extends Maintenance { $this->addOption( 'type', 'Type of job to run', false, true ); $this->addOption( 'procs', 'Number of processes to use', false, true ); $this->addOption( 'nothrottle', 'Ignore job throttling configuration', false, false ); + $this->addOption( 'result', 'Set to JSON to print only a JSON response', false, true ); } public function memoryLimit() { @@ -65,214 +66,28 @@ class RunJobs extends Maintenance { } } - $type = $this->getOption( 'type', false ); - $maxJobs = $this->getOption( 'maxjobs', false ); - $maxTime = $this->getOption( 'maxtime', false ); - $noThrottle = $this->hasOption( 'nothrottle' ); - $startTime = time(); + $json = ( $this->getOption( 'result' ) === 'json' ); - $group = JobQueueGroup::singleton(); - // Handle any required periodic queue maintenance - $count = $group->executeReadyPeriodicTasks(); - if ( $count > 0 ) { - $this->runJobsLog( "Executed $count periodic queue task(s)." ); + $runner = new JobRunner(); + if ( !$json ) { + $runner->setDebugHandler( array( $this, 'debugInternal' ) ); } - - $backoffs = $this->loadBackoffs(); // map of (type => UNIX expiry) - $startingBackoffs = $backoffs; // avoid unnecessary writes - $backoffExpireFunc = function ( $t ) { - return $t > time(); - }; - - $jobsRun = 0; // counter - $flags = JobQueueGroup::USE_CACHE; - $lastTime = microtime( true ); // time since last slave check - do { - $backoffs = array_filter( $backoffs, $backoffExpireFunc ); - $blacklist = $noThrottle ? array() : array_keys( $backoffs ); - if ( $type === false ) { - $job = $group->pop( JobQueueGroup::TYPE_DEFAULT, $flags, $blacklist ); - } elseif ( in_array( $type, $blacklist ) ) { - $job = false; // requested queue in backoff state - } else { - $job = $group->pop( $type ); // job from a single queue - } - if ( $job ) { // found a job - ++$jobsRun; - $this->runJobsLog( $job->toString() . " STARTING" ); - - // Run the job... - wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); - $t = microtime( true ); - try { - $status = $job->run(); - $error = $job->getLastError(); - } catch ( MWException $e ) { - MWExceptionHandler::rollbackMasterChangesAndLog( $e ); - $status = false; - $error = get_class( $e ) . ': ' . $e->getMessage(); - $e->report(); // write error to STDERR and the log - } - $timeMs = intval( ( microtime( true ) - $t ) * 1000 ); - wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); - - // Mark the job as done on success or when the job cannot be retried - if ( $status !== false || !$job->allowRetries() ) { - $group->ack( $job ); // done - } - - if ( $status === false ) { - $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" ); - } else { - $this->runJobsLog( $job->toString() . " t=$timeMs good" ); - } - - // Back off of certain jobs for a while (for throttling and for errors) - $ttw = $this->getBackoffTimeToWait( $job ); - if ( $status === false && mt_rand( 0, 49 ) == 0 ) { - $ttw = max( $ttw, 30 ); - } - if ( $ttw > 0 ) { - $jType = $job->getType(); - $backoffs[$jType] = isset( $backoffs[$jType] ) ? $backoffs[$jType] : 0; - $backoffs[$jType] = max( $backoffs[$jType], time() + $ttw ); - } - - // Break out if we hit the job count or wall time limits... - if ( $maxJobs && $jobsRun >= $maxJobs ) { - break; - } elseif ( $maxTime && ( time() - $startTime ) > $maxTime ) { - break; - } - - // Don't let any of the main DB slaves get backed up - $timePassed = microtime( true ) - $lastTime; - if ( $timePassed >= 5 || $timePassed < 0 ) { - wfWaitForSlaves( $lastTime ); - $lastTime = microtime( true ); - } - // Don't let any queue slaves/backups fall behind - if ( $jobsRun > 0 && ( $jobsRun % 100 ) == 0 ) { - $group->waitForBackups(); - } - - // Bail if near-OOM instead of in a job - $this->assertMemoryOK(); - } - } while ( $job ); // stop when there are no jobs - // Sync the persistent backoffs for the next runJobs.php pass - $backoffs = array_filter( $backoffs, $backoffExpireFunc ); - if ( $backoffs !== $startingBackoffs ) { - $this->syncBackoffs( $backoffs ); - } - } - - /** - * @param Job $job - * @return int Seconds for this runner to avoid doing more jobs of this type - * @see $wgJobBackoffThrottling - */ - private function getBackoffTimeToWait( Job $job ) { - global $wgJobBackoffThrottling; - - if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) || - $job instanceof DuplicateJob // no work was done - ) { - return 0; // not throttled - } - - $itemsPerSecond = $wgJobBackoffThrottling[$job->getType()]; - if ( $itemsPerSecond <= 0 ) { - return 0; // not throttled - } - - $seconds = 0; - if ( $job->workItemCount() > 0 ) { - $exactSeconds = $job->workItemCount() / $itemsPerSecond; - // use randomized rounding - $seconds = floor( $exactSeconds ); - $remainder = $exactSeconds - $seconds; - $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0; - } - - return (int)$seconds; - } - - /** - * Get the previous backoff expiries from persistent storage - * - * @return array Map of (job type => backoff expiry timestamp) - */ - private function loadBackoffs() { - $section = new ProfileSection( __METHOD__ ); - - $backoffs = array(); - $file = wfTempDir() . '/mw-runJobs-backoffs.json'; - if ( is_file( $file ) ) { - $handle = fopen( $file, 'rb' ); - flock( $handle, LOCK_SH ); - $content = stream_get_contents( $handle ); - flock( $handle, LOCK_UN ); - fclose( $handle ); - $backoffs = json_decode( $content, true ) ? : array(); - } - - return $backoffs; - } - - /** - * Merge the current backoff expiries from persistent storage - * - * @param array $backoffs Map of (job type => backoff expiry timestamp) - */ - private function syncBackoffs( array $backoffs ) { - $section = new ProfileSection( __METHOD__ ); - - $file = wfTempDir() . '/mw-runJobs-backoffs.json'; - $handle = fopen( $file, 'wb+' ); - flock( $handle, LOCK_EX ); - $content = stream_get_contents( $handle ); - $cBackoffs = json_decode( $content, true ) ? : array(); - foreach ( $backoffs as $type => $timestamp ) { - $cBackoffs[$type] = isset( $cBackoffs[$type] ) ? $cBackoffs[$type] : 0; - $cBackoffs[$type] = max( $cBackoffs[$type], $backoffs[$type] ); - } - ftruncate( $handle, 0 ); - fwrite( $handle, json_encode( $backoffs ) ); - flock( $handle, LOCK_UN ); - fclose( $handle ); - } - - /** - * Make sure that this script is not too close to the memory usage limit. - * It is better to die in between jobs than OOM right in the middle of one. - * @throws MWException - */ - private function assertMemoryOK() { - static $maxBytes = null; - if ( $maxBytes === null ) { - $m = array(); - if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) { - list( , $num, $unit ) = $m; - $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ); - $maxBytes = $num * $conv[strtolower( $unit )]; - } else { - $maxBytes = 0; - } - } - $usedBytes = memory_get_usage(); - if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { - throw new MWException( "Detected excessive memory usage ($usedBytes/$maxBytes)." ); + $response = $runner->run( array( + 'type' => $this->getOption( 'type', false ), + 'maxJobs' => $this->getOption( 'maxjobs', false ), + 'maxTime' => $this->getOption( 'maxtime', false ), + 'throttle' => $this->hasOption( 'nothrottle' ) ? false : true, + ) ); + if ( $json ) { + $this->output( FormatJson::encode( $response, true ) ); } } /** - * Log the job message - * @param string $msg The message to log + * @param string $s */ - private function runJobsLog( $msg ) { - $this->output( wfTimestamp( TS_DB ) . " $msg\n" ); - wfDebugLog( 'runJobs', $msg ); + public function debugInternal( $s ) { + $this->output( $s ); } }