From 2ea4d7ae8c19f718e872888db151ccc5e28c9ca4 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 12 Feb 2014 14:43:14 -0800 Subject: [PATCH] Moved job running via $wgJobRunRate to a special API * A background internal HTTP request used to load the API. This handles job execution and related periodic tasks. It avoids blocking the main request or breaking it on fatals. * This method avoids problems that shelling could have: ** Mismatched apache/cli PHP config or versions. ** Had to start a new process and could not use opcode cache. ** safe_mode and open_basedir restrictions required a fallback. ** Some wiki farms may not respect the --wiki parameter. ** wgMaxShellMemory applies to spawned PHP processes. ** Spawning processes is more prone to DOS due to a lack of proper limiting/pooling. bug: 60208 bug: 60210 bug: 60698 bug: 60844 Change-Id: I78b0e709301ac4a0c7b7ed337d6969d7546674bf --- RELEASE-NOTES-1.23 | 3 + includes/AutoLoader.php | 1 + includes/Wiki.php | 91 +++++++++----------- includes/api/ApiMain.php | 1 + includes/api/ApiRunJobs.php | 166 ++++++++++++++++++++++++++++++++++++ 5 files changed, 213 insertions(+), 49 deletions(-) create mode 100644 includes/api/ApiRunJobs.php diff --git a/RELEASE-NOTES-1.23 b/RELEASE-NOTES-1.23 index 4f29097e19..5310ee43ab 100644 --- a/RELEASE-NOTES-1.23 +++ b/RELEASE-NOTES-1.23 @@ -9,6 +9,9 @@ MediaWiki 1.23 is an alpha-quality branch and is not recommended for use in production. === Configuration changes in 1.23 === +* When $wgJobRunRate is higher that zero, jobs are now executed via an + asynchronous HTTP request to a MediaWiki entry point. This may require + increasing the number of server worker threads. * $wgDebugLogGroups values may be set to an associative array with a 'destination' key specifying the log destination. The array may also contain a 'sample' key with a positive integer value N indicating that the log group diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 359fd8b230..c511b4b2b1 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -358,6 +358,7 @@ $wgAutoloadLocalClasses = array( 'ApiRevisionDelete' => 'includes/api/ApiRevisionDelete.php', 'ApiRollback' => 'includes/api/ApiRollback.php', 'ApiRsd' => 'includes/api/ApiRsd.php', + 'ApiRunJobs' => 'includes/api/ApiRunJobs.php', 'ApiSetNotificationTimestamp' => 'includes/api/ApiSetNotificationTimestamp.php', 'ApiTokens' => 'includes/api/ApiTokens.php', 'ApiUnblock' => 'includes/api/ApiUnblock.php', diff --git a/includes/Wiki.php b/includes/Wiki.php index 99857d9638..fb46d80d43 100644 --- a/includes/Wiki.php +++ b/includes/Wiki.php @@ -448,6 +448,7 @@ class MediaWiki { if ( function_exists( 'fastcgi_finish_request' ) ) { fastcgi_finish_request(); } + $this->triggerJobs(); $this->restInPeace(); } catch ( Exception $e ) { MWExceptionHandler::handle( $e ); @@ -583,6 +584,7 @@ class MediaWiki { wfProfileOut( 'main-try-filecache' ); } + // Actually do the work of the request and build up any output $this->performRequest(); // Now commit any transactions, so that unreported errors after @@ -602,9 +604,6 @@ class MediaWiki { // Do any deferred jobs DeferredUpdates::doUpdates( 'commit' ); - // Execute a job from the queue - $this->doJobs(); - // Log profiling data, e.g. in the database or UDP wfLogProfilingData(); @@ -617,15 +616,19 @@ class MediaWiki { } /** - * Do a job from the job queue + * Potentially open a socket and sent an HTTP request back to the server + * to run a specified number of jobs. This registers a callback to cleanup + * the socket once it's done. */ - private function doJobs() { - global $wgJobRunRate, $wgPhpCli, $IP; + protected function triggerJobs() { + global $wgJobRunRate, $wgServer, $wgScriptPath, $wgScriptExtension; if ( $wgJobRunRate <= 0 || wfReadOnly() ) { return; } + $section = new ProfileSection( __METHOD__ ); + if ( $wgJobRunRate < 1 ) { $max = mt_getrandmax(); if ( mt_rand( 0, $max ) > $max * $wgJobRunRate ) { @@ -636,51 +639,41 @@ class MediaWiki { $n = intval( $wgJobRunRate ); } - if ( !wfShellExecDisabled() && is_executable( $wgPhpCli ) ) { - // Start a background process to run some of the jobs - wfProfileIn( __METHOD__ . '-exec' ); - $retVal = 1; - $cmd = wfShellWikiCmd( "$IP/maintenance/runJobs.php", - array( '--wiki', wfWikiID(), '--maxjobs', $n ) ); - $cmd .= " >" . wfGetNull() . " 2>&1"; // don't hang PHP on pipes - if ( wfIsWindows() ) { - // Using START makes this async and also works around a bug where using - // wfShellExec() with a quoted script name causes a filename syntax error. - $cmd = "START /B \"bg\" $cmd"; - } else { - $cmd = "$cmd &"; - } - wfShellExec( $cmd, $retVal ); - wfProfileOut( __METHOD__ . '-exec' ); + $query = array( 'action' => 'runjobs', + 'tasks' => 'jobs', 'maxjobs' => $n, 'sigexpiry' => time() + 5 ); + $query['signature'] = ApiRunJobs::getQuerySignature( $query ); + + $errno = $errstr = null; + $info = wfParseUrl( $wgServer ); + $sock = fsockopen( + $info['host'], + isset( $info['port'] ) ? $info['port'] : 80, + $errno, + $errstr + ); + if ( !$sock ) { + wfDebugLog( 'runJobs', "Failed to start cron API (socket error $errno): $errstr\n" ); + return; + } + + $url = wfAppendQuery( "{$wgScriptPath}/api{$wgScriptExtension}", $query ); + $req = "POST $url HTTP/1.1\r\nHost: {$info['host']}\r\nConnection: Close\r\n\r\n"; + + wfDebugLog( 'runJobs', "Running $n job(s) via '$url'\n" ); + // Send a cron API request to be performed in the background. + // Give up if this takes to long to send (which should be rare). + stream_set_timeout( $sock, 1 ); + $bytes = fwrite( $sock, $req ); + if ( $bytes !== strlen( $req ) ) { + wfDebugLog( 'runJobs', "Failed to start cron API (socket write error)\n" ); } else { - try { - // Fallback to running the jobs here while the user waits - $group = JobQueueGroup::singleton(); - do { - $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue - if ( $job ) { - $output = $job->toString() . "\n"; - $t = - microtime( true ); - wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); - $success = $job->run(); - wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); - $group->ack( $job ); // done - $t += microtime( true ); - $t = round( $t * 1000 ); - if ( $success === false ) { - $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n"; - } else { - $output .= "Success, Time: $t ms\n"; - } - wfDebugLog( 'jobqueue', $output ); - } - } while ( --$n && $job ); - } catch ( MWException $e ) { - // We don't want exceptions thrown during job execution to - // be reported to the user since the output is already sent. - // Instead we just log them. - MWExceptionHandler::logException( $e ); + // Do not wait for the response (the script should handle client aborts). + // Make sure that we don't close before that script reaches ignore_user_abort(). + $status = fgets( $sock ); + if ( !preg_match( '#^HTTP/\d\.\d 204 #', $status ) ) { + wfDebugLog( 'runJobs', "Failed to start cron API: received '$status'\n" ); } } + fclose( $sock ); } } diff --git a/includes/api/ApiMain.php b/includes/api/ApiMain.php index 2684f51356..0939deaefa 100644 --- a/includes/api/ApiMain.php +++ b/includes/api/ApiMain.php @@ -67,6 +67,7 @@ class ApiMain extends ApiBase { 'purge' => 'ApiPurge', 'setnotificationtimestamp' => 'ApiSetNotificationTimestamp', 'rollback' => 'ApiRollback', + 'runjobs' => 'ApiRunJobs', 'delete' => 'ApiDelete', 'undelete' => 'ApiUndelete', 'protect' => 'ApiProtect', diff --git a/includes/api/ApiRunJobs.php b/includes/api/ApiRunJobs.php new file mode 100644 index 0000000000..425c0a3d5a --- /dev/null +++ b/includes/api/ApiRunJobs.php @@ -0,0 +1,166 @@ +dieUsage( 'Wiki is in read-only mode', 'read_only', 400 ); + } + + $params = $this->extractRequestParams(); + $squery = $this->getRequest()->getValues(); + unset( $squery['signature'] ); + $cSig = self::getQuerySignature( $squery ); + $rSig = $params['signature']; + + // Time-insensitive signature verification + if ( strlen( $rSig ) !== strlen( $cSig ) ) { + $verified = false; + } else { + $result = 0; + for ( $i = 0; $i < strlen( $cSig ); $i++ ) { + $result |= ord( $cSig{$i} ) ^ ord( $rSig{$i} ); + } + $verified = ( $result == 0 ); + } + + if ( !$verified || $params['sigexpiry'] < time() ) { + $this->dieUsage( 'Invalid or stale signature provided', 'bad_signature', 401 ); + } + + // Client will usually disconnect before checking the response, + // but it needs to know when it is safe to disconnect. Until this + // reaches ignore_user_abort(), it is not safe as the jobs won't run. + ignore_user_abort( true ); // jobs may take a bit of time + header( "HTTP/1.0 204 No Content" ); + ob_flush(); + flush(); + // Once the client receives this response, it can disconnect + + // Do all of the specified tasks... + if ( in_array( 'jobs', $params['tasks'] ) ) { + $this->executeJobs( $params ); + } + } + + /** + * @param array $query + * @return string + */ + public static function getQuerySignature( array $query ) { + global $wgSecretKey; + + ksort( $query ); // stable order + return hash_hmac( 'sha1', wfArrayToCgi( $query ), $wgSecretKey ); + } + + /** + * Run jobs from the job queue + * + * @param array $params Request parameters + * @return void + */ + protected function executeJobs( array $params ) { + $n = $params['maxjobs']; // number of jobs to run + if ( $n < 1 ) { + return; + } + try { + // Fallback to running the jobs here while the user waits + $group = JobQueueGroup::singleton(); + do { + $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue + if ( $job ) { + $output = $job->toString() . "\n"; + $t = - microtime( true ); + wfProfileIn( __METHOD__ . '-' . get_class( $job ) ); + $success = $job->run(); + wfProfileOut( __METHOD__ . '-' . get_class( $job ) ); + $group->ack( $job ); // done + $t += microtime( true ); + $t = round( $t * 1000 ); + if ( $success === false ) { + $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n"; + } else { + $output .= "Success, Time: $t ms\n"; + } + wfDebugLog( 'jobqueue', $output ); + } + } while ( --$n && $job ); + } catch ( MWException $e ) { + // We don't want exceptions thrown during job execution to + // be reported to the user since the output is already sent. + // Instead we just log them. + MWExceptionHandler::logException( $e ); + } + } + + public function mustBePosted() { + return true; + } + + public function getAllowedParams() { + return array( + 'tasks' => array( + ApiBase::PARAM_ISMULTI => true, + ApiBase::PARAM_TYPE => array( 'jobs' ) + ), + 'maxjobs' => array( + ApiBase::PARAM_TYPE => 'integer', + ApiBase::PARAM_DFLT => 0 + ), + 'signature' => array( + ApiBase::PROP_TYPE => 'string', + ), + 'sigexpiry' => array( + ApiBase::PARAM_TYPE => 'integer', + ApiBase::PARAM_DFLT => 0 // ~epoch + ), + ); + } + + public function getParamDescription() { + return array( + 'tasks' => 'List of task types to perform', + 'maxjobs' => 'Maximum number of jobs to run', + 'signature' => 'HMAC Signature that signs the request', + 'sigexpiry' => 'HMAC signature expiry as a UNIX timestamp' + ); + } + + public function getDescription() { + return 'Perform periodic tasks or run jobs from the queue'; + } + + public function getExamples() { + return array( + 'api.php?action=runjobs&tasks=jobs&maxjobs=3' => 'Run up to 3 jobs from the queue', + ); + } +} -- 2.20.1