* A background internal HTTP request used to load the API.
This handles job execution and related periodic tasks.
It avoids blocking the main request or breaking it on fatals.
* This method avoids problems that shelling could have:
** Mismatched apache/cli PHP config or versions.
** Had to start a new process and could not use opcode cache.
** safe_mode and open_basedir restrictions required a fallback.
** Some wiki farms may not respect the --wiki parameter.
** wgMaxShellMemory applies to spawned PHP processes.
** Spawning processes is more prone to DOS due to a lack
of proper limiting/pooling.
bug: 60208
bug: 60210
bug: 60698
bug: 60844
Change-Id: I78b0e709301ac4a0c7b7ed337d6969d7546674bf
production.
=== Configuration changes in 1.23 ===
+* When $wgJobRunRate is higher that zero, jobs are now executed via an
+ asynchronous HTTP request to a MediaWiki entry point. This may require
+ increasing the number of server worker threads.
* $wgDebugLogGroups values may be set to an associative array with a
'destination' key specifying the log destination. The array may also contain
a 'sample' key with a positive integer value N indicating that the log group
'ApiRevisionDelete' => 'includes/api/ApiRevisionDelete.php',
'ApiRollback' => 'includes/api/ApiRollback.php',
'ApiRsd' => 'includes/api/ApiRsd.php',
+ 'ApiRunJobs' => 'includes/api/ApiRunJobs.php',
'ApiSetNotificationTimestamp' => 'includes/api/ApiSetNotificationTimestamp.php',
'ApiTokens' => 'includes/api/ApiTokens.php',
'ApiUnblock' => 'includes/api/ApiUnblock.php',
if ( function_exists( 'fastcgi_finish_request' ) ) {
fastcgi_finish_request();
}
+ $this->triggerJobs();
$this->restInPeace();
} catch ( Exception $e ) {
MWExceptionHandler::handle( $e );
wfProfileOut( 'main-try-filecache' );
}
+ // Actually do the work of the request and build up any output
$this->performRequest();
// Now commit any transactions, so that unreported errors after
// Do any deferred jobs
DeferredUpdates::doUpdates( 'commit' );
- // Execute a job from the queue
- $this->doJobs();
-
// Log profiling data, e.g. in the database or UDP
wfLogProfilingData();
}
/**
- * Do a job from the job queue
+ * Potentially open a socket and sent an HTTP request back to the server
+ * to run a specified number of jobs. This registers a callback to cleanup
+ * the socket once it's done.
*/
- private function doJobs() {
- global $wgJobRunRate, $wgPhpCli, $IP;
+ protected function triggerJobs() {
+ global $wgJobRunRate, $wgServer, $wgScriptPath, $wgScriptExtension;
if ( $wgJobRunRate <= 0 || wfReadOnly() ) {
return;
}
+ $section = new ProfileSection( __METHOD__ );
+
if ( $wgJobRunRate < 1 ) {
$max = mt_getrandmax();
if ( mt_rand( 0, $max ) > $max * $wgJobRunRate ) {
$n = intval( $wgJobRunRate );
}
- if ( !wfShellExecDisabled() && is_executable( $wgPhpCli ) ) {
- // Start a background process to run some of the jobs
- wfProfileIn( __METHOD__ . '-exec' );
- $retVal = 1;
- $cmd = wfShellWikiCmd( "$IP/maintenance/runJobs.php",
- array( '--wiki', wfWikiID(), '--maxjobs', $n ) );
- $cmd .= " >" . wfGetNull() . " 2>&1"; // don't hang PHP on pipes
- if ( wfIsWindows() ) {
- // Using START makes this async and also works around a bug where using
- // wfShellExec() with a quoted script name causes a filename syntax error.
- $cmd = "START /B \"bg\" $cmd";
- } else {
- $cmd = "$cmd &";
- }
- wfShellExec( $cmd, $retVal );
- wfProfileOut( __METHOD__ . '-exec' );
+ $query = array( 'action' => 'runjobs',
+ 'tasks' => 'jobs', 'maxjobs' => $n, 'sigexpiry' => time() + 5 );
+ $query['signature'] = ApiRunJobs::getQuerySignature( $query );
+
+ $errno = $errstr = null;
+ $info = wfParseUrl( $wgServer );
+ $sock = fsockopen(
+ $info['host'],
+ isset( $info['port'] ) ? $info['port'] : 80,
+ $errno,
+ $errstr
+ );
+ if ( !$sock ) {
+ wfDebugLog( 'runJobs', "Failed to start cron API (socket error $errno): $errstr\n" );
+ return;
+ }
+
+ $url = wfAppendQuery( "{$wgScriptPath}/api{$wgScriptExtension}", $query );
+ $req = "POST $url HTTP/1.1\r\nHost: {$info['host']}\r\nConnection: Close\r\n\r\n";
+
+ wfDebugLog( 'runJobs', "Running $n job(s) via '$url'\n" );
+ // Send a cron API request to be performed in the background.
+ // Give up if this takes to long to send (which should be rare).
+ stream_set_timeout( $sock, 1 );
+ $bytes = fwrite( $sock, $req );
+ if ( $bytes !== strlen( $req ) ) {
+ wfDebugLog( 'runJobs', "Failed to start cron API (socket write error)\n" );
} else {
- try {
- // Fallback to running the jobs here while the user waits
- $group = JobQueueGroup::singleton();
- do {
- $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue
- if ( $job ) {
- $output = $job->toString() . "\n";
- $t = - microtime( true );
- wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
- $success = $job->run();
- wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
- $group->ack( $job ); // done
- $t += microtime( true );
- $t = round( $t * 1000 );
- if ( $success === false ) {
- $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
- } else {
- $output .= "Success, Time: $t ms\n";
- }
- wfDebugLog( 'jobqueue', $output );
- }
- } while ( --$n && $job );
- } catch ( MWException $e ) {
- // We don't want exceptions thrown during job execution to
- // be reported to the user since the output is already sent.
- // Instead we just log them.
- MWExceptionHandler::logException( $e );
+ // Do not wait for the response (the script should handle client aborts).
+ // Make sure that we don't close before that script reaches ignore_user_abort().
+ $status = fgets( $sock );
+ if ( !preg_match( '#^HTTP/\d\.\d 204 #', $status ) ) {
+ wfDebugLog( 'runJobs', "Failed to start cron API: received '$status'\n" );
}
}
+ fclose( $sock );
}
}
'purge' => 'ApiPurge',
'setnotificationtimestamp' => 'ApiSetNotificationTimestamp',
'rollback' => 'ApiRollback',
+ 'runjobs' => 'ApiRunJobs',
'delete' => 'ApiDelete',
'undelete' => 'ApiUndelete',
'protect' => 'ApiProtect',
--- /dev/null
+<?php
+/**
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * This is a simple class to handle action=runjobs and is only used internally
+ *
+ * @note: this does not requre "write mode" nor tokens due to the signature check
+ *
+ * @ingroup API
+ */
+class ApiRunJobs extends ApiBase {
+ public function execute() {
+ if ( wfReadOnly() ) {
+ $this->dieUsage( 'Wiki is in read-only mode', 'read_only', 400 );
+ }
+
+ $params = $this->extractRequestParams();
+ $squery = $this->getRequest()->getValues();
+ unset( $squery['signature'] );
+ $cSig = self::getQuerySignature( $squery );
+ $rSig = $params['signature'];
+
+ // Time-insensitive signature verification
+ if ( strlen( $rSig ) !== strlen( $cSig ) ) {
+ $verified = false;
+ } else {
+ $result = 0;
+ for ( $i = 0; $i < strlen( $cSig ); $i++ ) {
+ $result |= ord( $cSig{$i} ) ^ ord( $rSig{$i} );
+ }
+ $verified = ( $result == 0 );
+ }
+
+ if ( !$verified || $params['sigexpiry'] < time() ) {
+ $this->dieUsage( 'Invalid or stale signature provided', 'bad_signature', 401 );
+ }
+
+ // Client will usually disconnect before checking the response,
+ // but it needs to know when it is safe to disconnect. Until this
+ // reaches ignore_user_abort(), it is not safe as the jobs won't run.
+ ignore_user_abort( true ); // jobs may take a bit of time
+ header( "HTTP/1.0 204 No Content" );
+ ob_flush();
+ flush();
+ // Once the client receives this response, it can disconnect
+
+ // Do all of the specified tasks...
+ if ( in_array( 'jobs', $params['tasks'] ) ) {
+ $this->executeJobs( $params );
+ }
+ }
+
+ /**
+ * @param array $query
+ * @return string
+ */
+ public static function getQuerySignature( array $query ) {
+ global $wgSecretKey;
+
+ ksort( $query ); // stable order
+ return hash_hmac( 'sha1', wfArrayToCgi( $query ), $wgSecretKey );
+ }
+
+ /**
+ * Run jobs from the job queue
+ *
+ * @param array $params Request parameters
+ * @return void
+ */
+ protected function executeJobs( array $params ) {
+ $n = $params['maxjobs']; // number of jobs to run
+ if ( $n < 1 ) {
+ return;
+ }
+ try {
+ // Fallback to running the jobs here while the user waits
+ $group = JobQueueGroup::singleton();
+ do {
+ $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue
+ if ( $job ) {
+ $output = $job->toString() . "\n";
+ $t = - microtime( true );
+ wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
+ $success = $job->run();
+ wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
+ $group->ack( $job ); // done
+ $t += microtime( true );
+ $t = round( $t * 1000 );
+ if ( $success === false ) {
+ $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
+ } else {
+ $output .= "Success, Time: $t ms\n";
+ }
+ wfDebugLog( 'jobqueue', $output );
+ }
+ } while ( --$n && $job );
+ } catch ( MWException $e ) {
+ // We don't want exceptions thrown during job execution to
+ // be reported to the user since the output is already sent.
+ // Instead we just log them.
+ MWExceptionHandler::logException( $e );
+ }
+ }
+
+ public function mustBePosted() {
+ return true;
+ }
+
+ public function getAllowedParams() {
+ return array(
+ 'tasks' => array(
+ ApiBase::PARAM_ISMULTI => true,
+ ApiBase::PARAM_TYPE => array( 'jobs' )
+ ),
+ 'maxjobs' => array(
+ ApiBase::PARAM_TYPE => 'integer',
+ ApiBase::PARAM_DFLT => 0
+ ),
+ 'signature' => array(
+ ApiBase::PROP_TYPE => 'string',
+ ),
+ 'sigexpiry' => array(
+ ApiBase::PARAM_TYPE => 'integer',
+ ApiBase::PARAM_DFLT => 0 // ~epoch
+ ),
+ );
+ }
+
+ public function getParamDescription() {
+ return array(
+ 'tasks' => 'List of task types to perform',
+ 'maxjobs' => 'Maximum number of jobs to run',
+ 'signature' => 'HMAC Signature that signs the request',
+ 'sigexpiry' => 'HMAC signature expiry as a UNIX timestamp'
+ );
+ }
+
+ public function getDescription() {
+ return 'Perform periodic tasks or run jobs from the queue';
+ }
+
+ public function getExamples() {
+ return array(
+ 'api.php?action=runjobs&tasks=jobs&maxjobs=3' => 'Run up to 3 jobs from the queue',
+ );
+ }
+}