From f826b15bd23487902a86228af1a622890d39e27b Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 23 Jan 2015 13:00:49 -0800 Subject: [PATCH] Only support "daemonized" mode for redis job queues to avoid code duplication Change-Id: I83bf61734fc48feea8a2f93adfe70f7d3a066b9e --- RELEASE-NOTES-1.25 | 3 + includes/jobqueue/JobQueueRedis.php | 123 ++-------------------------- 2 files changed, 10 insertions(+), 116 deletions(-) diff --git a/RELEASE-NOTES-1.25 b/RELEASE-NOTES-1.25 index 3b7eef339f..3f24db013a 100644 --- a/RELEASE-NOTES-1.25 +++ b/RELEASE-NOTES-1.25 @@ -42,6 +42,9 @@ production. background with white fallback color, rather than just white background. * MediaWikiBagOStuff class removed, make sure any object cache config uses SqlBagOStuff instead. +* The 'daemonized' flag must be set to true in $wgJobTypeConf for any redis + job queues. This means that mediawiki/services/jobrunner service has to + be installed and running for any such queues to work. === New features in 1.25 === * (T64861) Updated plural rules to CLDR 26. Includes incompatible changes diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 9368fbf734..243fec9070 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -64,8 +64,6 @@ class JobQueueRedis extends JobQueue { protected $server; /** @var string Compression method to use */ protected $compression; - /** @var bool */ - protected $daemonized; const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) @@ -90,7 +88,11 @@ class JobQueueRedis extends JobQueue { $this->server = $params['redisServer']; $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); - $this->daemonized = !empty( $params['daemonized'] ); + if ( empty( $params['daemonized'] ) ) { + throw new Exception( + "Non-daemonized mode is no longer supported. Please install the " . + "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); + } $this->checkDelay = true; // always enabled } @@ -291,19 +293,9 @@ LUA; protected function doPop() { $job = false; - // Push ready delayed jobs into the queue every 10 jobs to spread the load. - // This is also done as a periodic task, but we don't want too much done at once. - if ( !$this->daemonized && mt_rand( 0, 9 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } - $conn = $this->getConnection(); try { do { - // Keep the claimed job list down for high-traffic queues - if ( !$this->daemonized && mt_rand( 0, 99 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } $blob = $this->popAndAcquireBlob( $conn ); if ( !is_string( $blob ) ) { break; // no jobs; nothing to do @@ -316,7 +308,7 @@ LUA; continue; } - // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed + // If $item is invalid, the runner loop recyling will cleanup as needed $job = $this->getJobFromFields( $item ); // may be false } while ( !$job ); // job may be false if invalid } catch ( RedisException $e ) { @@ -583,112 +575,11 @@ LUA; } } - /** - * Recycle or destroy any jobs that have been claimed for too long - * and release any ready delayed jobs into the queue - * - * @return int Number of jobs recycled/deleted/undelayed - * @throws MWException|JobQueueError - */ - public function recyclePruneAndUndelayJobs() { - $count = 0; - // For each job item that can be retried, we need to add it back to the - // main queue and remove it from the list of currenty claimed job items. - // For those that cannot, they are marked as dead and kept around for - // investigation and manual job restoration but are eventually deleted. - $conn = $this->getConnection(); - try { - $now = time(); - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - $this->getQueueKey( 'z-abandoned' ), # KEYS[5] - $this->getQueueKey( 'z-delayed' ), # KEYS[6] - $now - $this->claimTTL, # ARGV[1] - $now - self::MAX_AGE_PRUNE, # ARGV[2] - $this->maxTries, # ARGV[3] - $now # ARGV[4] - ), - 6 # number of first argument(s) that are keys - ); - if ( $res ) { - list( $released, $abandoned, $pruned, $undelayed ) = $res; - $count += $released + $pruned + $undelayed; - JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki ); - JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki ); - JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki ); - } - } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); - } - - return $count; - } - /** * @return array */ protected function doGetPeriodicTasks() { - if ( $this->daemonized ) { - return array(); // managed in the runner loop - } - $periods = array( 300 ); // 5 min; delayed/stale jobs - if ( $this->claimTTL > 0 ) { - $periods[] = ceil( $this->claimTTL / 2 ); // halved to avoid bad timing - } - $period = min( $periods ); - $period = max( $period, 30 ); // sanity - - return array( - 'recyclePruneAndUndelayJobs' => array( - 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), - 'period' => $period, - ) - ); + return array(); // managed in the runner loop } /** -- 2.20.1