From 0ca0e1296c2a2e80f913ab891aa1e31781b87102 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 11 Jan 2013 16:13:29 -0800 Subject: [PATCH] [JobQueue] Improved job recycle rate for small queues. * Make the recycling a bit more periodic rather than based on how often pop() gets called essentially. This works better if a queue does not have jobs inserted very often. Change-Id: I64fbc8afbb1cf096717ba4bfc6fe7b7715abdb72 --- includes/job/JobQueue.php | 28 ++++++++++++++++++ includes/job/JobQueueDB.php | 23 ++++++++++----- includes/job/JobQueueGroup.php | 54 ++++++++++++++++++++++++++++++++++ maintenance/nextJobDB.php | 30 +++++++++++++++++-- maintenance/runJobs.php | 6 ++++ 5 files changed, 132 insertions(+), 9 deletions(-) diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 92beb2c61b..4e0acd24dd 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -324,4 +324,32 @@ abstract class JobQueue { * @return void */ protected function doWaitForBackups() {} + + /** + * Return a map of task names to task definition maps. + * A "task" is a fast periodic queue maintenance action. + * Mutually exclusive tasks must implement their own locking in the callback. + * + * Each task value is an associative array with: + * - name : the name of the task + * - callback : a PHP callable that performs the task + * - period : the period in seconds corresponding to the task frequency + * + * @return Array + */ + final public function getPeriodicTasks() { + $tasks = $this->doGetPeriodicTasks(); + foreach ( $tasks as $name => &$def ) { + $def['name'] = $name; + } + return $tasks; + } + + /** + * @see JobQueue::getPeriodicTasks() + * @return Array + */ + protected function doGetPeriodicTasks() { + return array(); + } } diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 51b35fd694..4df5c0755a 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -28,6 +28,7 @@ * @since 1.21 */ class JobQueueDB extends JobQueue { + const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days) const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed @@ -215,10 +216,6 @@ class JobQueueDB extends JobQueue { $uuid = wfRandomString( 32 ); // pop attempt $job = false; // job popped off - // Occasionally recycle jobs back into the queue that have been claimed too long - if ( mt_rand( 0, 99 ) == 0 ) { - $this->recycleStaleJobs(); - } do { // retry when our row is invalid or deleted as a duplicate // Try to reserve a row in the DB... if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { @@ -401,10 +398,10 @@ class JobQueueDB extends JobQueue { * * @return integer Number of jobs recycled/deleted */ - protected function recycleStaleJobs() { + public function recycleAndDeleteStaleJobs() { global $wgMemc; - $now = time(); + $now = time(); list( $dbw, $scope ) = $this->getMasterDB(); $count = 0; // affected rows @@ -519,7 +516,7 @@ class JobQueueDB extends JobQueue { } // Update the timestamp of the last root job started at the location... - return $wgMemc->set( $key, $params['rootJobTimestamp'], 14*86400 ); // 2 weeks + return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); } ); return true; @@ -557,6 +554,18 @@ class JobQueueDB extends JobQueue { wfWaitForSlaves(); } + /** + * @return Array + */ + protected function doGetPeriodicTasks() { + return array( + 'recycleAndDeleteStaleJobs' => array( + 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), + 'period' => ceil( $this->claimTTL / 2 ) + ) + ); + } + /** * @return Array (DatabaseBase, ScopedCallback) */ diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index cf0215b8bf..6d9d590418 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -228,4 +228,58 @@ class JobQueueGroup { } return $types; } + + /** + * Execute any due periodic queue maintenance tasks for all queues. + * + * A task is "due" if the time ellapsed since the last run is greater than + * the defined run period. Concurrent calls to this function will cause tasks + * to be attempted twice, so they may need their own methods of mutual exclusion. + * + * @return integer Number of tasks run + */ + public function executeReadyPeriodicTasks() { + global $wgMemc; + + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); + $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) + + $count = 0; + $tasksRun = array(); // (queue => task => UNIX timestamp) + foreach ( $this->getQueueTypes() as $type ) { + $queue = $this->get( $type ); + foreach ( $queue->getPeriodicTasks() as $task => $definition ) { + if ( $definition['period'] <= 0 ) { + continue; // disabled + } elseif ( !isset( $lastRuns[$type][$task] ) + || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) + { + if ( call_user_func( $definition['callback'] ) !== null ) { + $tasksRun[$type][$task] = time(); + ++$count; + } + } + } + } + + $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) { + if ( is_array( $lastRuns ) ) { + foreach ( $tasksRun as $type => $tasks ) { + foreach ( $tasks as $task => $timestamp ) { + if ( !isset( $lastRuns[$type][$task] ) + || $timestamp > $lastRuns[$type][$task] ) + { + $lastRuns[$type][$task] = $timestamp; + } + } + } + } else { + $lastRuns = $tasksRun; + } + return $lastRuns; + } ); + + return $count; + } } diff --git a/maintenance/nextJobDB.php b/maintenance/nextJobDB.php index 032d6f9dfb..6606375dfa 100644 --- a/maintenance/nextJobDB.php +++ b/maintenance/nextJobDB.php @@ -48,6 +48,9 @@ class nextJobDB extends Maintenance { $types = JobQueueGroup::singleton()->getDefaultQueueTypes(); } + // Handle any required periodic queue maintenance + $this->executeReadyPeriodicTasks(); + $memcKey = 'jobqueue:dbs:v3'; $pendingDbInfo = $wgMemc->get( $memcKey ); @@ -78,6 +81,7 @@ class nextJobDB extends Maintenance { return; // no DBs with jobs or cache is both empty and locked } + $type = $this->getOption( 'type', false ); $pendingDBs = $pendingDbInfo['pendingDBs']; // convenience do { $again = false; @@ -156,14 +160,36 @@ class nextJobDB extends Maintenance { $pendingDBs = array(); // (job type => (db list)) foreach ( $wgLocalDatabases as $db ) { - $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs(); - foreach ( $types as $type ) { + foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) { $pendingDBs[$type][] = $db; } } return $pendingDBs; } + + /** + * Do all ready periodic jobs for all databases every 5 minutes (and .1% of the time) + * @return integer + */ + private function executeReadyPeriodicTasks() { + global $wgLocalDatabases, $wgMemc; + + $count = 0; + $memcKey = 'jobqueue:periodic:lasttime'; + $timestamp = (int)$wgMemc->get( $memcKey ); // UNIX timestamp or 0 + if ( ( time() - $timestamp ) > 300 || mt_rand( 0, 999 ) == 0 ) { // 5 minutes + if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock + foreach ( $wgLocalDatabases as $db ) { + $count += JobQueueGroup::singleton( $db )->executeReadyPeriodicTasks(); + } + $wgMemc->set( $memcKey, time() ); + $wgMemc->delete( "$memcKey:rebuild" ); // unlock + } + } + + return $count; + } } $maintClass = "nextJobDb"; diff --git a/maintenance/runJobs.php b/maintenance/runJobs.php index 0cf02174b7..a78acd5216 100644 --- a/maintenance/runJobs.php +++ b/maintenance/runJobs.php @@ -76,6 +76,12 @@ class RunJobs extends Maintenance { $n = 0; $group = JobQueueGroup::singleton(); + // Handle any required periodic queue maintenance + $count = $group->executeReadyPeriodicTasks(); + if ( $count > 0 ) { + $this->runJobsLog( "Executed $count periodic queue task(s)." ); + } + do { $job = ( $type === false ) ? $group->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE ) -- 2.20.1