X-Git-Url: https://git.cyclocoop.org/?a=blobdiff_plain;f=includes%2Fjob%2FJobQueueRedis.php;h=e0641b577472c54a90f65d3f59dedd36e9a1bfcd;hb=996771f7202fb412bfef3799df78f2f9abe4f5d8;hp=212871e2633eb986798e43840e874c55189a94c1;hpb=37150b8d9b0476f28d48c70600422fabdfaf1e7e;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 212871e263..e0641b5774 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -300,7 +300,7 @@ LUA; // Push ready delayed jobs into the queue every 10 jobs to spread the load. // This is also done as a periodic task, but we don't want too much done at once. if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { - $this->releaseReadyDelayedJobs(); + $this->recyclePruneAndUndelayJobs(); } $conn = $this->getConnection(); @@ -309,7 +309,7 @@ LUA; if ( $this->claimTTL > 0 ) { // Keep the claimed job list down for high-traffic queues if ( mt_rand( 0, 99 ) == 0 ) { - $this->recycleAndDeleteStaleJobs(); + $this->recyclePruneAndUndelayJobs(); } $blob = $this->popAndAcquireBlob( $conn ); } else { @@ -326,7 +326,7 @@ LUA; continue; } - // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed + // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed $job = $this->getJobFromFields( $item ); // may be false } while ( !$job ); // job may be false if invalid } catch ( RedisException $e ) { @@ -627,54 +627,14 @@ LUA; } } - /** - * Release any ready delayed jobs into the queue - * - * @return int Number of jobs released - * @throws JobQueueError - */ - public function releaseReadyDelayedJobs() { - $count = 0; - - $conn = $this->getConnection(); - try { - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'z-delayed' ), // KEYS[1] - $this->getQueueKey( 'l-unclaimed' ), // KEYS[2] - time() // ARGV[1]; max "delay until" UNIX timestamp - ), - 2 # first two arguments are keys - ); - } catch ( RedisException $e ) { - $this->throwRedisException( $this->server, $conn, $e ); - } - - return $count; - } - /** * Recycle or destroy any jobs that have been claimed for too long + * and release any ready delayed jobs into the queue * - * @return int Number of jobs recycled/deleted + * @return int Number of jobs recycled/deleted/undelayed * @throws MWException|JobQueueError */ - public function recycleAndDeleteStaleJobs() { - if ( $this->claimTTL <= 0 ) { // sanity - throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." ); - } + public function recyclePruneAndUndelayJobs() { $count = 0; // For each job item that can be retried, we need to add it back to the // main queue and remove it from the list of currenty claimed job items. @@ -685,8 +645,8 @@ LUA; $now = time(); static $script = <<luaEval( $script, array( @@ -724,15 +692,17 @@ LUA; $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] $this->getQueueKey( 'h-data' ), # KEYS[4] $this->getQueueKey( 'z-abandoned' ), # KEYS[5] + $this->getQueueKey( 'z-delayed' ), # KEYS[6] $now - $this->claimTTL, # ARGV[1] $now - self::MAX_AGE_PRUNE, # ARGV[2] - $this->maxTries # ARGV[3] + $this->maxTries, # ARGV[3] + $now # ARGV[4] ), - 5 # number of first argument(s) that are keys + 6 # number of first argument(s) that are keys ); if ( $res ) { - list( $released, $abandoned, $pruned ) = $res; - $count += $released + $pruned; + list( $released, $abandoned, $pruned, $undelayed ) = $res; + $count += $released + $pruned + $undelayed; JobQueue::incrStats( 'job-recycle', $this->type, $released ); JobQueue::incrStats( 'job-abandon', $this->type, $abandoned ); } @@ -747,21 +717,19 @@ LUA; * @return array */ protected function doGetPeriodicTasks() { - $tasks = array(); + $periods = array( 3600 ); // standard cleanup (useful on config change) if ( $this->claimTTL > 0 ) { - $tasks['recycleAndDeleteStaleJobs'] = array( - 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), - 'period' => ceil( $this->claimTTL / 2 ) - ); + $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing } if ( $this->checkDelay ) { - $tasks['releaseReadyDelayedJobs'] = array( - 'callback' => array( $this, 'releaseReadyDelayedJobs' ), - 'period' => 300 // 5 minutes - ); + $periods[] = 300; // 5 minutes } - - return $tasks; + return array( + 'recyclePruneAndUndelayJobs' => array( + 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), + 'period' => max( min( $periods ), 30 ) // sanity + ) + ); } /**