From ec3da97659f035380281ab4b22a89954c6a7d7a9 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 6 May 2015 20:06:01 -0700 Subject: [PATCH] Removed executeReadyPeriodicTasks() method * Moved all these hacks to JobQueueDB, which is the only queue that should need this (for stock installs). Newer queues should always have the queue store manage stuff like this, not MediaWiki. * This also avoids expensive object construction that does nothing when non-DB queues are used. Change-Id: Id718cda25750be73044a049b39958cca55aa3172 --- includes/jobqueue/JobQueue.php | 29 ------------ includes/jobqueue/JobQueueDB.php | 22 ++++----- includes/jobqueue/JobQueueFederated.php | 12 ----- includes/jobqueue/JobQueueGroup.php | 63 ------------------------- includes/jobqueue/JobRunner.php | 11 +---- 5 files changed, 12 insertions(+), 125 deletions(-) diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php index 7df85ff0f4..fd4234d507 100644 --- a/includes/jobqueue/JobQueue.php +++ b/includes/jobqueue/JobQueue.php @@ -548,35 +548,6 @@ abstract class JobQueue { protected function doWaitForBackups() { } - /** - * Return a map of task names to task definition maps. - * A "task" is a fast periodic queue maintenance action. - * Mutually exclusive tasks must implement their own locking in the callback. - * - * Each task value is an associative array with: - * - name : the name of the task - * - callback : a PHP callable that performs the task - * - period : the period in seconds corresponding to the task frequency - * - * @return array - */ - final public function getPeriodicTasks() { - $tasks = $this->doGetPeriodicTasks(); - foreach ( $tasks as $name => &$def ) { - $def['name'] = $name; - } - - return $tasks; - } - - /** - * @see JobQueue::getPeriodicTasks() - * @return array - */ - protected function doGetPeriodicTasks() { - return array(); - } - /** * Clear any process and persistent caches * diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index b1b650ba81..49de4b4031 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -301,6 +301,12 @@ class JobQueueDB extends JobQueue { $job->metadata['id'] = $row->job_id; break; // done } while ( true ); + + if ( !$job || mt_rand( 0, 9 ) == 0 ) { + // Handled jobs that need to be recycled/deleted; + // any recycled jobs will be picked up next attempt + $this->recycleAndDeleteStaleJobs(); + } } catch ( DBError $e ) { $this->throwDBException( $e ); } @@ -535,18 +541,6 @@ class JobQueueDB extends JobQueue { wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false ); } - /** - * @return array - */ - protected function doGetPeriodicTasks() { - return array( - 'recycleAndDeleteStaleJobs' => array( - 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) - ) - ); - } - /** * @return void */ @@ -589,6 +583,10 @@ class JobQueueDB extends JobQueue { protected function doGetSiblingQueuesWithJobs( array $types ) { $dbr = $this->getSlaveDB(); + // @note: this does not check whether the jobs are claimed or not. + // This is useful so JobQueueGroup::pop() also sees queues that only + // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue + // failed jobs so that they can be popped again for that edge case. $res = $dbr->select( 'job', 'DISTINCT job_cmd', array( 'job_cmd' => $types ), __METHOD__ ); diff --git a/includes/jobqueue/JobQueueFederated.php b/includes/jobqueue/JobQueueFederated.php index 178ce8a7da..b6bb3010ce 100644 --- a/includes/jobqueue/JobQueueFederated.php +++ b/includes/jobqueue/JobQueueFederated.php @@ -373,18 +373,6 @@ class JobQueueFederated extends JobQueue { $this->throwErrorIfAllPartitionsDown( $failed ); } - protected function doGetPeriodicTasks() { - $tasks = array(); - /** @var JobQueue $queue */ - foreach ( $this->partitionQueues as $partition => $queue ) { - foreach ( $queue->getPeriodicTasks() as $task => $def ) { - $tasks["{$partition}:{$task}"] = $def; - } - } - - return $tasks; - } - protected function doFlushCaches() { static $types = array( 'empty', diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php index ebd547a0e5..fdf7b876cc 100644 --- a/includes/jobqueue/JobQueueGroup.php +++ b/includes/jobqueue/JobQueueGroup.php @@ -341,69 +341,6 @@ class JobQueueGroup { return $this->coalescedQueues; } - /** - * Execute any due periodic queue maintenance tasks for all queues. - * - * A task is "due" if the time ellapsed since the last run is greater than - * the defined run period. Concurrent calls to this function will cause tasks - * to be attempted twice, so they may need their own methods of mutual exclusion. - * - * @return int Number of tasks run - */ - public function executeReadyPeriodicTasks() { - global $wgMemc; - - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); - $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) - - $count = 0; - $tasksRun = array(); // (queue => task => UNIX timestamp) - foreach ( $this->getQueueTypes() as $type ) { - $queue = $this->get( $type ); - foreach ( $queue->getPeriodicTasks() as $task => $definition ) { - if ( $definition['period'] <= 0 ) { - continue; // disabled - } elseif ( !isset( $lastRuns[$type][$task] ) - || $lastRuns[$type][$task] < ( time() - $definition['period'] ) - ) { - try { - if ( call_user_func( $definition['callback'] ) !== null ) { - $tasksRun[$type][$task] = time(); - ++$count; - } - } catch ( JobQueueError $e ) { - MWExceptionHandler::logException( $e ); - } - } - } - } - - if ( $count === 0 ) { - return $count; // nothing to update - } - - $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) { - if ( is_array( $lastRuns ) ) { - foreach ( $tasksRun as $type => $tasks ) { - foreach ( $tasks as $task => $timestamp ) { - if ( !isset( $lastRuns[$type][$task] ) - || $timestamp > $lastRuns[$type][$task] - ) { - $lastRuns[$type][$task] = $timestamp; - } - } - } - } else { - $lastRuns = $tasksRun; - } - - return $lastRuns; - } ); - - return $count; - } - /** * @param string $name * @return mixed diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index 0948092e67..5c68d90555 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -103,15 +103,6 @@ class JobRunner implements LoggerAwareInterface { return $response; } - $group = JobQueueGroup::singleton(); - // Handle any required periodic queue maintenance - $count = $group->executeReadyPeriodicTasks(); - if ( $count > 0 ) { - $msg = "Executed $count periodic queue task(s)."; - $this->logger->debug( $msg ); - $this->debugCallback( $msg ); - } - // Bail out if in read-only mode if ( wfReadOnly() ) { $response['reached'] = 'read-only'; @@ -132,6 +123,8 @@ class JobRunner implements LoggerAwareInterface { return $response; } + $group = JobQueueGroup::singleton(); + // Flush any pending DB writes for sanity wfGetLBFactory()->commitMasterChanges(); -- 2.20.1