X-Git-Url: http://git.cyclocoop.org/?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobQueueRedis.php;h=abfdc8c74e37a7376a22a7d6cacc18b22871f93c;hb=5ad132533be45f999cec3ea52d959f3504fda83c;hp=3519eac8ebcb4ddef77c825847dff1607e5dce75;hpb=4c413f2ae7020a0a5a87aa0814b67c2c18d09f35;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 3519eac8eb..abfdc8c74e 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -91,6 +91,7 @@ class JobQueueRedis extends JobQueue { $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); $this->daemonized = !empty( $params['daemonized'] ); + $this->checkDelay = true; // always enabled } protected function supportedOrders() { @@ -134,9 +135,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetAcquiredCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } $conn = $this->getConnection(); try { $conn->multi( Redis::PIPELINE ); @@ -155,9 +153,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetDelayedCount() { - if ( !$this->checkDelay ) { - return 0; // no delayed jobs - } $conn = $this->getConnection(); try { return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); @@ -172,9 +167,6 @@ class JobQueueRedis extends JobQueue { * @throws JobQueueError */ protected function doGetAbandonedCount() { - if ( $this->claimTTL <= 0 ) { - return 0; // no acknowledgements - } $conn = $this->getConnection(); try { return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); @@ -301,22 +293,18 @@ LUA; // Push ready delayed jobs into the queue every 10 jobs to spread the load. // This is also done as a periodic task, but we don't want too much done at once. - if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { + if ( !$this->daemonized && mt_rand( 0, 9 ) == 0 ) { $this->recyclePruneAndUndelayJobs(); } $conn = $this->getConnection(); try { do { - if ( $this->claimTTL > 0 ) { - // Keep the claimed job list down for high-traffic queues - if ( mt_rand( 0, 99 ) == 0 ) { - $this->recyclePruneAndUndelayJobs(); - } - $blob = $this->popAndAcquireBlob( $conn ); - } else { - $blob = $this->popAndDeleteBlob( $conn ); + // Keep the claimed job list down for high-traffic queues + if ( !$this->daemonized && mt_rand( 0, 99 ) == 0 ) { + $this->recyclePruneAndUndelayJobs(); } + $blob = $this->popAndAcquireBlob( $conn ); if ( !is_string( $blob ) ) { break; // no jobs; nothing to do } @@ -338,39 +326,6 @@ LUA; return $job; } - /** - * @param RedisConnRef $conn - * @return array Serialized string or false - * @throws RedisException - */ - protected function popAndDeleteBlob( RedisConnRef $conn ) { - static $script = -<<luaEval( $script, - array( - $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] - $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] - $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] - $this->getQueueKey( 'h-data' ), # KEYS[4] - ), - 4 # number of first argument(s) that are keys - ); - } - /** * @param RedisConnRef $conn * @return array Serialized string or false @@ -416,36 +371,35 @@ LUA; if ( !isset( $job->metadata['uuid'] ) ) { throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); } - if ( $this->claimTTL > 0 ) { - $conn = $this->getConnection(); - try { - static $script = + + $conn = $this->getConnection(); + try { + static $script = <<luaEval( $script, - array( - $this->getQueueKey( 'z-claimed' ), # KEYS[1] - $this->getQueueKey( 'h-attempts' ), # KEYS[2] - $this->getQueueKey( 'h-data' ), # KEYS[3] - $job->metadata['uuid'] # ARGV[1] - ), - 3 # number of first argument(s) that are keys - ); - - if ( !$res ) { - wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); - - return false; - } - } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + $res = $conn->luaEval( $script, + array( + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'h-data' ), # KEYS[3] + $job->metadata['uuid'] # ARGV[1] + ), + 3 # number of first argument(s) that are keys + ); + + if ( !$res ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + + return false; } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); } return true; @@ -658,7 +612,6 @@ LUA; if attempts < ARGV[3] then -- Claim expired and retries left: re-enqueue the job redis.call('lPush',kUnclaimed,id) - redis.call('hIncrBy',kAttempts,id,1) released = released + 1 else -- Claim expired and no retries left: mark the job as dead @@ -723,12 +676,9 @@ LUA; if ( $this->daemonized ) { return array(); // managed in the runner loop } - $periods = array( 3600 ); // standard cleanup (useful on config change) + $periods = array( 300 ); // 5 min; delayed/stale jobs if ( $this->claimTTL > 0 ) { - $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing - } - if ( $this->checkDelay ) { - $periods[] = 300; // 5 minutes + $periods[] = ceil( $this->claimTTL / 2 ); // halved to avoid bad timing } $period = min( $periods ); $period = max( $period, 30 ); // sanity