From fbf34d84abc4a6fa43818f097e5db2d52f8f8fe6 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 13 Feb 2013 13:25:37 -0800 Subject: [PATCH] [Upload] Moved async upload stuff to the job queue. * (bug 44080) Also carry-over the IP and HTTP header info. * This adds a RequestContext::importScopedSession() function. Change-Id: Ie9c0a4d78fb719569c8149b9cc8a5430f0ac5673 --- includes/AutoLoader.php | 2 + includes/DefaultSettings.php | 17 ++++- includes/WebRequest.php | 27 ++++++- includes/api/ApiUpload.php | 61 +++++++-------- includes/context/RequestContext.php | 57 ++++++++++++++ .../jobs/AssembleUploadChunksJob.php} | 72 +++++++++--------- .../jobs/PublishStashedFileJob.php} | 75 ++++++++++--------- 7 files changed, 197 insertions(+), 114 deletions(-) rename includes/{upload/AssembleUploadChunks.php => job/jobs/AssembleUploadChunksJob.php} (59%) rename includes/{upload/PublishStashedFile.php => job/jobs/PublishStashedFileJob.php} (63%) diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 23cf411770..14147497dc 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -675,6 +675,8 @@ $wgAutoloadLocalClasses = array( 'RefreshLinksJob' => 'includes/job/jobs/RefreshLinksJob.php', 'RefreshLinksJob2' => 'includes/job/jobs/RefreshLinksJob.php', 'UploadFromUrlJob' => 'includes/job/jobs/UploadFromUrlJob.php', + 'AssembleUploadChunksJob' => 'includes/job/jobs/AssembleUploadChunksJob.php', + 'PublishStashedFileJob' => 'includes/job/jobs/PublishStashedFileJob.php', # includes/json 'FormatJson' => 'includes/json/FormatJson.php', diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index fc8168df83..39ab14883c 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -311,6 +311,13 @@ $wgUploadStashMaxAge = 6 * 3600; // 6 hours /** Allows to move images and other media files */ $wgAllowImageMoving = true; +/** + * Enable deferred upload tasks that use the job queue. + * Only enable this if job runners are set up for both the + * 'AssembleUploadChunks' and 'PublishStashedFile' job types. + */ +$wgEnableAsyncUploads = false; + /** * These are additional characters that should be replaced with '-' in filenames */ @@ -5513,6 +5520,8 @@ $wgJobClasses = array( 'enotifNotify' => 'EnotifNotifyJob', 'fixDoubleRedirect' => 'DoubleRedirectJob', 'uploadFromUrl' => 'UploadFromUrlJob', + 'AssembleUploadChunks' => 'AssembleUploadChunksJob', + 'PublishStashedFile' => 'PublishStashedFileJob', 'null' => 'NullJob' ); @@ -5526,7 +5535,7 @@ $wgJobClasses = array( * - Jobs that you want to run on specialized machines ( like transcoding, or a particular * machine on your cluster has 'outside' web access you could restrict uploadFromUrl ) */ -$wgJobTypesExcludedFromDefaultQueue = array(); +$wgJobTypesExcludedFromDefaultQueue = array( 'AssembleUploadChunks', 'PublishStashedFile' ); /** * Map of job types to configuration arrays. @@ -6200,7 +6209,7 @@ $wgMaxShellTime = 180; $wgMaxShellWallClockTime = 180; /** - * Under Linux: a cgroup directory used to constrain memory usage of shell + * Under Linux: a cgroup directory used to constrain memory usage of shell * commands. The directory must be writable by the user which runs MediaWiki. * * If specified, this is used instead of ulimit, which is inaccurate, and @@ -6208,7 +6217,7 @@ $wgMaxShellWallClockTime = 180; * them segfault or deadlock. * * A wrapper script will create a cgroup for each shell command that runs, as - * a subgroup of the specified cgroup. If the memory limit is exceeded, the + * a subgroup of the specified cgroup. If the memory limit is exceeded, the * kernel will send a SIGKILL signal to a process in the subgroup. * * @par Example: @@ -6218,7 +6227,7 @@ $wgMaxShellWallClockTime = 180; * echo '$wgShellCgroup = "/sys/fs/cgroup/memory/mediawiki/job";' >> LocalSettings.php * @endcode * - * The reliability of cgroup cleanup can be improved by installing a + * The reliability of cgroup cleanup can be improved by installing a * notify_on_release script in the root cgroup, see e.g. * https://gerrit.wikimedia.org/r/#/c/40784 */ diff --git a/includes/WebRequest.php b/includes/WebRequest.php index cd43ffb980..50da049d3f 100644 --- a/includes/WebRequest.php +++ b/includes/WebRequest.php @@ -1124,6 +1124,30 @@ HTML; $this->ip = $ip; return $ip; } + + /** + * @param string $ip + * @return void + * @since 1.21 + */ + public function setIP( $ip ) { + $this->ip = $ip; + } + + /** + * Export the resolved user IP, HTTP headers, and session ID. + * The result will be reasonably sized to allow for serialization. + * + * @return Array + * @since 1.21 + */ + public function exportUserSession() { + return array( + 'ip' => $this->getIP(), + 'headers' => $this->getAllHeaders(), + 'sessionId' => session_id() + ); + } } /** @@ -1263,8 +1287,9 @@ class FauxRequest extends WebRequest { throw new MWException( "FauxRequest() got bogus data" ); } $this->wasPosted = $wasPosted; - if( $session ) + if( $session ) { $this->session = $session; + } } /** diff --git a/includes/api/ApiUpload.php b/includes/api/ApiUpload.php index a0da765b8d..2a58119edd 100644 --- a/includes/api/ApiUpload.php +++ b/includes/api/ApiUpload.php @@ -37,6 +37,8 @@ class ApiUpload extends ApiBase { protected $mParams; public function execute() { + global $wgEnableAsyncUploads; + // Check whether upload is enabled if ( !UploadBase::isEnabled() ) { $this->dieUsageMsg( 'uploaddisabled' ); @@ -47,9 +49,8 @@ class ApiUpload extends ApiBase { // Parameter handling $this->mParams = $this->extractRequestParams(); $request = $this->getMain()->getRequest(); - // Check if async mode is actually supported - $this->mParams['async'] = ( $this->mParams['async'] && !wfIsWindows() ); - $this->mParams['async'] = false; // XXX: disabled per bug 44080 + // Check if async mode is actually supported (jobs done in cli mode) + $this->mParams['async'] = ( $this->mParams['async'] && $wgEnableAsyncUploads ); // Add the uploaded file to the params array $this->mParams['file'] = $request->getFileName( 'file' ); $this->mParams['chunk'] = $request->getFileName( 'chunk' ); @@ -205,8 +206,8 @@ class ApiUpload extends ApiBase { } // Check we added the last chunk: - if( $this->mParams['offset'] + $chunkSize == $this->mParams['filesize'] ) { - if ( $this->mParams['async'] && !wfIsWindows() ) { + if ( $this->mParams['offset'] + $chunkSize == $this->mParams['filesize'] ) { + if ( $this->mParams['async'] ) { $progress = UploadBase::getSessionStatus( $this->mParams['filekey'] ); if ( $progress && $progress['result'] === 'Poll' ) { $this->dieUsage( "Chunk assembly already in progress.", 'stashfailed' ); @@ -216,22 +217,16 @@ class ApiUpload extends ApiBase { array( 'result' => 'Poll', 'stage' => 'queued', 'status' => Status::newGood() ) ); - $retVal = 1; - $cmd = wfShellWikiCmd( - "$IP/includes/upload/AssembleUploadChunks.php", + $ok = JobQueueGroup::singleton()->push( new AssembleUploadChunksJob( + Title::makeTitle( NS_FILE, $this->mParams['filekey'] ), array( - '--wiki', wfWikiID(), - '--filename', $this->mParams['filename'], - '--filekey', $this->mParams['filekey'], - '--userid', $this->getUser()->getId(), - '--sessionid', session_id(), - '--quiet' + 'filename' => $this->mParams['filename'], + 'filekey' => $this->mParams['filekey'], + 'session' => $this->getRequest()->exportUserSession(), + 'userid' => $this->getUser()->getId() ) - ) . " < " . wfGetNull() . " > " . wfGetNull() . " 2>&1 &"; - // Start a process in the background. Enforce the time limits via PHP - // since ulimit4.sh seems to often not work for this particular usage. - wfShellExec( $cmd, $retVal, array(), array( 'time' => 0, 'memory' => 0 ) ); - if ( $retVal == 0 ) { + ) ); + if ( $ok ) { $result['result'] = 'Poll'; } else { UploadBase::setSessionStatus( $this->mParams['filekey'], false ); @@ -596,25 +591,19 @@ class ApiUpload extends ApiBase { $this->mParams['filekey'], array( 'result' => 'Poll', 'stage' => 'queued', 'status' => Status::newGood() ) ); - $retVal = 1; - $cmd = wfShellWikiCmd( - "$IP/includes/upload/PublishStashedFile.php", + $ok = JobQueueGroup::singleton()->push( new PublishStashedFileJob( + Title::makeTitle( NS_FILE, $this->mParams['filename'] ), array( - '--wiki', wfWikiID(), - '--filename', $this->mParams['filename'], - '--filekey', $this->mParams['filekey'], - '--userid', $this->getUser()->getId(), - '--comment', $this->mParams['comment'], - '--text', $this->mParams['text'], - '--watch', $watch, - '--sessionid', session_id(), - '--quiet' + 'filename' => $this->mParams['filename'], + 'filekey' => $this->mParams['filekey'], + 'comment' => $this->mParams['comment'], + 'text' => $this->mParams['text'], + 'watch' => $watch, + 'session' => $this->getRequest()->exportUserSession(), + 'userid' => $this->getUser()->getId() ) - ) . " < " . wfGetNull() . " > " . wfGetNull() . " 2>&1 &"; - // Start a process in the background. Enforce the time limits via PHP - // since ulimit4.sh seems to often not work for this particular usage. - wfShellExec( $cmd, $retVal, array(), array( 'time' => 0, 'memory' => 0 ) ); - if ( $retVal == 0 ) { + ) ); + if ( $ok ) { $result['result'] = 'Poll'; } else { UploadBase::setSessionStatus( $this->mParams['filekey'], false ); diff --git a/includes/context/RequestContext.php b/includes/context/RequestContext.php index 09cb409f65..60c8cd324c 100644 --- a/includes/context/RequestContext.php +++ b/includes/context/RequestContext.php @@ -392,6 +392,63 @@ class RequestContext implements IContextSource { return $instance; } + /** + * Import the resolved user IP, HTTP headers, and session ID. + * This sets the current session and sets $wgUser and $wgRequest. + * Once the return value falls out of scope, the old context is restored. + * This function can only be called within CLI mode scripts. + * + * This will setup the session from the given ID. This is useful when + * background scripts inherit some context when acting on behalf of a user. + * + * $param array $params Result of WebRequest::exportUserSession() + * @return ScopedCallback + * @throws MWException + * @since 1.21 + */ + public static function importScopedSession( array $params ) { + if ( PHP_SAPI !== 'cli' ) { + // Don't send random private cookie headers to other random users + throw new MWException( "Sessions can only be imported in cli mode." ); + } + + $importSessionFunction = function( array $params ) { + global $wgRequest, $wgUser; + + // Write and close any current session + session_write_close(); // persist + session_id( '' ); // detach + $_SESSION = array(); // clear in-memory array + // Load the new session from the session ID + if ( strlen( $params['sessionId'] ) ) { + wfSetupSession( $params['sessionId'] ); // sets $_SESSION + } + // Build the new WebRequest object + $request = new FauxRequest( array(), false, $_SESSION ); + $request->setIP( $params['ip'] ); + foreach ( $params['headers'] as $name => $value ) { + $request->setHeader( $name, $value ); + } + + $context = RequestContext::getMain(); + // Set the current context to use the new WebRequest + $context->setRequest( $request ); + $wgRequest = $context->getRequest(); // b/c + // Set the current user based on the new session and WebRequest + $context->setUser( User::newFromSession( $request ) ); // uses $_SESSION + $wgUser = $context->getUser(); // b/c + }; + + // Stash the old session and load in the new one + $oldParams = self::getMain()->getRequest()->exportUserSession(); + $importSessionFunction( $params ); + + // Set callback to save and close the new session and reload the old one + return new ScopedCallback( function() use ( $importSessionFunction, $oldParams ) { + $importSessionFunction( $oldParams ); + } ); + } + /** * Create a new extraneous context. The context is filled with information * external to the current session. diff --git a/includes/upload/AssembleUploadChunks.php b/includes/job/jobs/AssembleUploadChunksJob.php similarity index 59% rename from includes/upload/AssembleUploadChunks.php rename to includes/job/jobs/AssembleUploadChunksJob.php index 54ef840055..840a3985f8 100644 --- a/includes/upload/AssembleUploadChunks.php +++ b/includes/job/jobs/AssembleUploadChunksJob.php @@ -18,65 +18,58 @@ * http://www.gnu.org/copyleft/gpl.html * * @file - * @ingroup Maintenance + * @ingroup Upload */ -require_once( __DIR__ . '/../../maintenance/Maintenance.php' ); -set_time_limit( 3600 ); // 1 hour /** * Assemble the segments of a chunked upload. * - * @ingroup Maintenance + * @ingroup Upload */ -class AssembleUploadChunks extends Maintenance { - public function __construct() { - parent::__construct(); - $this->mDescription = "Re-assemble the segments of a chunked upload into a single file"; - $this->addOption( 'filename', "Desired file name", true, true ); - $this->addOption( 'filekey', "Upload stash file key", true, true ); - $this->addOption( 'userid', "Upload owner user ID", true, true ); - $this->addOption( 'sessionid', "Upload owner session ID", true, true ); +class AssembleUploadChunksJob extends Job { + public function __construct( $title, $params, $id = 0 ) { + parent::__construct( 'AssembleUploadChunks', $title, $params, $id ); + $this->removeDuplicates = true; } - public function execute() { - $e = null; - wfDebug( "Started assembly for file {$this->getOption( 'filename' )}\n" ); - wfSetupSession( $this->getOption( 'sessionid' ) ); + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $context = RequestContext::getMain(); try { - $user = User::newFromId( $this->getOption( 'userid' ) ); - if ( !$user ) { - throw new MWException( "No user with ID " . $this->getOption( 'userid' ) . "." ); + $user = $context->getUser(); + if ( !$user->isLoggedIn() || $user->getId() != $this->params['userid'] ) { + $this->setLastError( "Could not load the author user from session." ); + return true; // no retries } UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ) ); $upload = new UploadFromChunks( $user ); $upload->continueChunks( - $this->getOption( 'filename' ), - $this->getOption( 'filekey' ), - // @TODO: set User? - RequestContext::getMain()->getRequest() // dummy request + $this->params['filename'], + $this->params['filekey'], + $context->getRequest() ); // Combine all of the chunks into a local file and upload that to a new stash file $status = $upload->concatenateChunks(); if ( !$status->isGood() ) { UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ) ); - session_write_close(); - $this->error( $status->getWikiText() . "\n", 1 ); // die + $this->setLastError( $status->getWikiText() ); + return true; // no retries } // We have a new filekey for the fully concatenated file $newFileKey = $upload->getLocalFile()->getFileKey(); // Remove the old stash file row and first chunk file - $upload->stash->removeFileNoAuth( $this->getOption( 'filekey' ) ); + $upload->stash->removeFileNoAuth( $this->params['filekey'] ); // Build the image info array while we have the local reference handy $apiMain = new ApiMain(); // dummy object (XXX) @@ -87,7 +80,7 @@ class AssembleUploadChunks extends Maintenance { // Cache the info so the user doesn't have to wait forever to get the final info UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Success', 'stage' => 'assembling', @@ -98,21 +91,26 @@ class AssembleUploadChunks extends Maintenance { ); } catch ( MWException $e ) { UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => Status::newFatal( 'api-error-stashfailed' ) ) ); + $this->setLastError( get_class( $e ) . ": " . $e->getText() ); } - session_write_close(); - if ( $e ) { - throw $e; + return true; // returns true on success and erro (no retries) + } + + /** + * @return Array + */ + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = array( 'filekey' => $info['params']['filekey'] ); } - wfDebug( "Finished assembly for file {$this->getOption( 'filename' )}\n" ); + return $info; } } - -$maintClass = "AssembleUploadChunks"; -require_once( RUN_MAINTENANCE_IF_MAIN ); diff --git a/includes/upload/PublishStashedFile.php b/includes/job/jobs/PublishStashedFileJob.php similarity index 63% rename from includes/upload/PublishStashedFile.php rename to includes/job/jobs/PublishStashedFileJob.php index 8198dea84f..594d30493b 100644 --- a/includes/upload/PublishStashedFile.php +++ b/includes/job/jobs/PublishStashedFileJob.php @@ -18,39 +18,32 @@ * http://www.gnu.org/copyleft/gpl.html * * @file - * @ingroup Maintenance + * @ingroup Upload */ -require_once( __DIR__ . '/../../maintenance/Maintenance.php' ); -set_time_limit( 3600 ); // 1 hour /** * Upload a file from the upload stash into the local file repo. * - * @ingroup Maintenance + * @ingroup Upload */ -class PublishStashedFile extends Maintenance { - public function __construct() { - parent::__construct(); - $this->mDescription = "Upload stashed file into the local file repo"; - $this->addOption( 'filename', "Desired file name", true, true ); - $this->addOption( 'filekey', "Upload stash file key", true, true ); - $this->addOption( 'userid', "Upload owner user ID", true, true ); - $this->addOption( 'comment', "Upload comment", true, true ); - $this->addOption( 'text', "Upload description", true, true ); - $this->addOption( 'watch', "Whether the uploader should watch the page", true, true ); - $this->addOption( 'sessionid', "Upload owner session ID", true, true ); +class PublishStashedFileJob extends Job { + public function __construct( $title, $params, $id = 0 ) { + parent::__construct( 'PublishStashedFile', $title, $params, $id ); + $this->removeDuplicates = true; } - public function execute() { - wfSetupSession( $this->getOption( 'sessionid' ) ); + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $context = RequestContext::getMain(); try { - $user = User::newFromId( $this->getOption( 'userid' ) ); - if ( !$user ) { - throw new MWException( "No user with ID " . $this->getOption( 'userid' ) . "." ); + $user = $context->getUser(); + if ( !$user->isLoggedIn() || $user->getId() != $this->params['userid'] ) { + $this->setLastError( "Could not load the author user from session." ); + return true; // no retries } UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ) ); @@ -59,7 +52,7 @@ class PublishStashedFile extends Maintenance { // checks and anything else to the stash stage (which includes concatenation and // the local file is thus already there). That way, instead of GET+PUT, there could // just be a COPY operation from the stash to the public zone. - $upload->initialize( $this->getOption( 'filekey' ), $this->getOption( 'filename' ) ); + $upload->initialize( $this->params['filekey'], $this->params['filename'] ); // Check if the local file checks out (this is generally a no-op) $verification = $upload->verifyUpload(); @@ -67,25 +60,27 @@ class PublishStashedFile extends Maintenance { $status = Status::newFatal( 'verification-error' ); $status->value = array( 'verification' => $verification ); UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) ); - $this->error( "Could not verify upload.\n", 1 ); // die + $this->setLastError( "Could not verify upload." ); + return true; // no retries } // Upload the stashed file to a permanent location $status = $upload->performUpload( - $this->getOption( 'comment' ), - $this->getOption( 'text' ), - $this->getOption( 'watch' ), + $this->params['comment'], + $this->params['text'], + $this->params['watch'], $user ); if ( !$status->isGood() ) { UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) ); - $this->error( $status->getWikiText() . "\n", 1 ); // die + $this->setLastError( $status->getWikiText() ); + return true; // no retries } // Build the image info array while we have the local reference handy @@ -97,7 +92,7 @@ class PublishStashedFile extends Maintenance { // Cache the info so the user doesn't have to wait forever to get the final info UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Success', 'stage' => 'publish', @@ -108,18 +103,26 @@ class PublishStashedFile extends Maintenance { ); } catch ( MWException $e ) { UploadBase::setSessionStatus( - $this->getOption( 'filekey' ), + $this->params['filekey'], array( 'result' => 'Failure', 'stage' => 'publish', 'status' => Status::newFatal( 'api-error-publishfailed' ) ) ); - throw $e; + $this->setLastError( get_class( $e ) . ": " . $e->getText() ); } - session_write_close(); + return true; // returns true on success and erro (no retries) } -} -$maintClass = "PublishStashedFile"; -require_once( RUN_MAINTENANCE_IF_MAIN ); + /** + * @return Array + */ + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = array( 'filekey' => $info['params']['filekey'] ); + } + return $info; + } +} -- 2.20.1