Moved job running via $wgJobRunRate to a special API
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 12 Feb 2014 22:43:14 +0000 (14:43 -0800)
committerOri.livneh <ori@wikimedia.org>
Thu, 27 Feb 2014 19:12:04 +0000 (19:12 +0000)
* A background internal HTTP request used to load the API.
  This handles job execution and related periodic tasks.
  It avoids blocking the main request or breaking it on fatals.
* This method avoids problems that shelling could have:
** Mismatched apache/cli PHP config or versions.
** Had to start a new process and could not use opcode cache.
** safe_mode and open_basedir restrictions required a fallback.
** Some wiki farms may not respect the --wiki parameter.
** wgMaxShellMemory applies to spawned PHP processes.
** Spawning processes is more prone to DOS due to a lack
   of proper limiting/pooling.

bug: 60208
bug: 60210
bug: 60698
bug: 60844
Change-Id: I78b0e709301ac4a0c7b7ed337d6969d7546674bf

RELEASE-NOTES-1.23
includes/AutoLoader.php
includes/Wiki.php
includes/api/ApiMain.php
includes/api/ApiRunJobs.php [new file with mode: 0644]

index 4f29097..5310ee4 100644 (file)
@@ -9,6 +9,9 @@ MediaWiki 1.23 is an alpha-quality branch and is not recommended for use in
 production.
 
 === Configuration changes in 1.23 ===
+* When $wgJobRunRate is higher that zero, jobs are now executed via an
+  asynchronous HTTP request to a MediaWiki entry point. This may require
+  increasing the number of server worker threads.
 * $wgDebugLogGroups values may be set to an associative array with a
   'destination' key specifying the log destination. The array may also contain
   a 'sample' key with a positive integer value N indicating that the log group
index 359fd8b..c511b4b 100644 (file)
@@ -358,6 +358,7 @@ $wgAutoloadLocalClasses = array(
        'ApiRevisionDelete' => 'includes/api/ApiRevisionDelete.php',
        'ApiRollback' => 'includes/api/ApiRollback.php',
        'ApiRsd' => 'includes/api/ApiRsd.php',
+       'ApiRunJobs' => 'includes/api/ApiRunJobs.php',
        'ApiSetNotificationTimestamp' => 'includes/api/ApiSetNotificationTimestamp.php',
        'ApiTokens' => 'includes/api/ApiTokens.php',
        'ApiUnblock' => 'includes/api/ApiUnblock.php',
index 99857d9..fb46d80 100644 (file)
@@ -448,6 +448,7 @@ class MediaWiki {
                        if ( function_exists( 'fastcgi_finish_request' ) ) {
                                fastcgi_finish_request();
                        }
+                       $this->triggerJobs();
                        $this->restInPeace();
                } catch ( Exception $e ) {
                        MWExceptionHandler::handle( $e );
@@ -583,6 +584,7 @@ class MediaWiki {
                        wfProfileOut( 'main-try-filecache' );
                }
 
+               // Actually do the work of the request and build up any output
                $this->performRequest();
 
                // Now commit any transactions, so that unreported errors after
@@ -602,9 +604,6 @@ class MediaWiki {
                // Do any deferred jobs
                DeferredUpdates::doUpdates( 'commit' );
 
-               // Execute a job from the queue
-               $this->doJobs();
-
                // Log profiling data, e.g. in the database or UDP
                wfLogProfilingData();
 
@@ -617,15 +616,19 @@ class MediaWiki {
        }
 
        /**
-        * Do a job from the job queue
+        * Potentially open a socket and sent an HTTP request back to the server
+        * to run a specified number of jobs. This registers a callback to cleanup
+        * the socket once it's done.
         */
-       private function doJobs() {
-               global $wgJobRunRate, $wgPhpCli, $IP;
+       protected function triggerJobs() {
+               global $wgJobRunRate, $wgServer, $wgScriptPath, $wgScriptExtension;
 
                if ( $wgJobRunRate <= 0 || wfReadOnly() ) {
                        return;
                }
 
+               $section = new ProfileSection( __METHOD__ );
+
                if ( $wgJobRunRate < 1 ) {
                        $max = mt_getrandmax();
                        if ( mt_rand( 0, $max ) > $max * $wgJobRunRate ) {
@@ -636,51 +639,41 @@ class MediaWiki {
                        $n = intval( $wgJobRunRate );
                }
 
-               if ( !wfShellExecDisabled() && is_executable( $wgPhpCli ) ) {
-                       // Start a background process to run some of the jobs
-                       wfProfileIn( __METHOD__ . '-exec' );
-                       $retVal = 1;
-                       $cmd = wfShellWikiCmd( "$IP/maintenance/runJobs.php",
-                               array( '--wiki', wfWikiID(), '--maxjobs', $n ) );
-                       $cmd .= " >" . wfGetNull() . " 2>&1"; // don't hang PHP on pipes
-                       if ( wfIsWindows() ) {
-                               // Using START makes this async and also works around a bug where using
-                               // wfShellExec() with a quoted script name causes a filename syntax error.
-                               $cmd = "START /B \"bg\" $cmd";
-                       } else {
-                               $cmd = "$cmd &";
-                       }
-                       wfShellExec( $cmd, $retVal );
-                       wfProfileOut( __METHOD__ . '-exec' );
+               $query = array( 'action' => 'runjobs',
+                       'tasks' => 'jobs', 'maxjobs' => $n, 'sigexpiry' => time() + 5 );
+               $query['signature'] = ApiRunJobs::getQuerySignature( $query );
+
+               $errno = $errstr = null;
+               $info = wfParseUrl( $wgServer );
+               $sock = fsockopen(
+                       $info['host'],
+                       isset( $info['port'] ) ? $info['port'] : 80,
+                       $errno,
+                       $errstr
+               );
+               if ( !$sock ) {
+                       wfDebugLog( 'runJobs', "Failed to start cron API (socket error $errno): $errstr\n" );
+                       return;
+               }
+
+               $url = wfAppendQuery( "{$wgScriptPath}/api{$wgScriptExtension}", $query );
+               $req = "POST $url HTTP/1.1\r\nHost: {$info['host']}\r\nConnection: Close\r\n\r\n";
+
+               wfDebugLog( 'runJobs', "Running $n job(s) via '$url'\n" );
+               // Send a cron API request to be performed in the background.
+               // Give up if this takes to long to send (which should be rare).
+               stream_set_timeout( $sock, 1 );
+               $bytes = fwrite( $sock, $req );
+               if ( $bytes !== strlen( $req ) ) {
+                       wfDebugLog( 'runJobs', "Failed to start cron API (socket write error)\n" );
                } else {
-                       try {
-                               // Fallback to running the jobs here while the user waits
-                               $group = JobQueueGroup::singleton();
-                               do {
-                                       $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue
-                                       if ( $job ) {
-                                               $output = $job->toString() . "\n";
-                                               $t = - microtime( true );
-                                               wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
-                                               $success = $job->run();
-                                               wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
-                                               $group->ack( $job ); // done
-                                               $t += microtime( true );
-                                               $t = round( $t * 1000 );
-                                               if ( $success === false ) {
-                                                       $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
-                                               } else {
-                                                       $output .= "Success, Time: $t ms\n";
-                                               }
-                                               wfDebugLog( 'jobqueue', $output );
-                                       }
-                               } while ( --$n && $job );
-                       } catch ( MWException $e ) {
-                               // We don't want exceptions thrown during job execution to
-                               // be reported to the user since the output is already sent.
-                               // Instead we just log them.
-                               MWExceptionHandler::logException( $e );
+                       // Do not wait for the response (the script should handle client aborts).
+                       // Make sure that we don't close before that script reaches ignore_user_abort().
+                       $status = fgets( $sock );
+                       if ( !preg_match( '#^HTTP/\d\.\d 204 #', $status ) ) {
+                               wfDebugLog( 'runJobs', "Failed to start cron API: received '$status'\n" );
                        }
                }
+               fclose( $sock );
        }
 }
index 2684f51..0939dea 100644 (file)
@@ -67,6 +67,7 @@ class ApiMain extends ApiBase {
                'purge' => 'ApiPurge',
                'setnotificationtimestamp' => 'ApiSetNotificationTimestamp',
                'rollback' => 'ApiRollback',
+               'runjobs' => 'ApiRunJobs',
                'delete' => 'ApiDelete',
                'undelete' => 'ApiUndelete',
                'protect' => 'ApiProtect',
diff --git a/includes/api/ApiRunJobs.php b/includes/api/ApiRunJobs.php
new file mode 100644 (file)
index 0000000..425c0a3
--- /dev/null
@@ -0,0 +1,166 @@
+<?php
+/**
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * This is a simple class to handle action=runjobs and is only used internally
+ *
+ * @note: this does not requre "write mode" nor tokens due to the signature check
+ *
+ * @ingroup API
+ */
+class ApiRunJobs extends ApiBase {
+       public function execute() {
+               if ( wfReadOnly() ) {
+                       $this->dieUsage( 'Wiki is in read-only mode', 'read_only', 400 );
+               }
+
+               $params = $this->extractRequestParams();
+               $squery = $this->getRequest()->getValues();
+               unset( $squery['signature'] );
+               $cSig = self::getQuerySignature( $squery );
+               $rSig = $params['signature'];
+
+               // Time-insensitive signature verification
+               if ( strlen( $rSig ) !== strlen( $cSig ) ) {
+                       $verified = false;
+               } else {
+                       $result = 0;
+                       for ( $i = 0; $i < strlen( $cSig ); $i++ ) {
+                               $result |= ord( $cSig{$i} ) ^ ord( $rSig{$i} );
+                       }
+                       $verified = ( $result == 0 );
+               }
+
+               if ( !$verified || $params['sigexpiry'] < time() ) {
+                       $this->dieUsage( 'Invalid or stale signature provided', 'bad_signature', 401 );
+               }
+
+               // Client will usually disconnect before checking the response,
+               // but it needs to know when it is safe to disconnect. Until this
+               // reaches ignore_user_abort(), it is not safe as the jobs won't run.
+               ignore_user_abort( true ); // jobs may take a bit of time
+               header( "HTTP/1.0 204 No Content" );
+               ob_flush();
+        flush();
+               // Once the client receives this response, it can disconnect
+
+               // Do all of the specified tasks...
+               if ( in_array( 'jobs', $params['tasks'] ) ) {
+                       $this->executeJobs( $params );
+               }
+       }
+
+       /**
+        * @param array $query
+        * @return string
+        */
+       public static function getQuerySignature( array $query ) {
+               global $wgSecretKey;
+
+               ksort( $query ); // stable order
+               return hash_hmac( 'sha1', wfArrayToCgi( $query ), $wgSecretKey );
+       }
+
+       /**
+        * Run jobs from the job queue
+        *
+        * @param array $params Request parameters
+        * @return void
+        */
+       protected function executeJobs( array $params ) {
+               $n = $params['maxjobs']; // number of jobs to run
+               if ( $n < 1 ) {
+                       return;
+               }
+               try {
+                       // Fallback to running the jobs here while the user waits
+                       $group = JobQueueGroup::singleton();
+                       do {
+                               $job = $group->pop( JobQueueGroup::USE_CACHE ); // job from any queue
+                               if ( $job ) {
+                                       $output = $job->toString() . "\n";
+                                       $t = - microtime( true );
+                                       wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
+                                       $success = $job->run();
+                                       wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
+                                       $group->ack( $job ); // done
+                                       $t += microtime( true );
+                                       $t = round( $t * 1000 );
+                                       if ( $success === false ) {
+                                               $output .= "Error: " . $job->getLastError() . ", Time: $t ms\n";
+                                       } else {
+                                               $output .= "Success, Time: $t ms\n";
+                                       }
+                                       wfDebugLog( 'jobqueue', $output );
+                               }
+                       } while ( --$n && $job );
+               } catch ( MWException $e ) {
+                       // We don't want exceptions thrown during job execution to
+                       // be reported to the user since the output is already sent.
+                       // Instead we just log them.
+                       MWExceptionHandler::logException( $e );
+               }
+       }
+
+       public function mustBePosted() {
+               return true;
+       }
+
+       public function getAllowedParams() {
+               return array(
+                       'tasks' => array(
+                               ApiBase::PARAM_ISMULTI => true,
+                               ApiBase::PARAM_TYPE => array( 'jobs' )
+                       ),
+                       'maxjobs' => array(
+                               ApiBase::PARAM_TYPE => 'integer',
+                               ApiBase::PARAM_DFLT => 0
+                       ),
+                       'signature' =>  array(
+                               ApiBase::PROP_TYPE => 'string',
+                       ),
+                       'sigexpiry' => array(
+                               ApiBase::PARAM_TYPE => 'integer',
+                               ApiBase::PARAM_DFLT => 0 // ~epoch
+                       ),
+               );
+       }
+
+       public function getParamDescription() {
+               return array(
+                       'tasks' => 'List of task types to perform',
+                       'maxjobs' => 'Maximum number of jobs to run',
+                       'signature' => 'HMAC Signature that signs the request',
+                       'sigexpiry' => 'HMAC signature expiry as a UNIX timestamp'
+               );
+       }
+
+       public function getDescription() {
+               return 'Perform periodic tasks or run jobs from the queue';
+       }
+
+       public function getExamples() {
+               return array(
+                       'api.php?action=runjobs&tasks=jobs&maxjobs=3' => 'Run up to 3 jobs from the queue',
+               );
+       }
+}