Merge "Revision: mark getRaw*() methods as deprecated"
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueRedis.php
index 3519eac..abfdc8c 100644 (file)
@@ -91,6 +91,7 @@ class JobQueueRedis extends JobQueue {
                $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
                $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
                $this->daemonized = !empty( $params['daemonized'] );
+               $this->checkDelay = true; // always enabled
        }
 
        protected function supportedOrders() {
@@ -134,9 +135,6 @@ class JobQueueRedis extends JobQueue {
         * @throws JobQueueError
         */
        protected function doGetAcquiredCount() {
-               if ( $this->claimTTL <= 0 ) {
-                       return 0; // no acknowledgements
-               }
                $conn = $this->getConnection();
                try {
                        $conn->multi( Redis::PIPELINE );
@@ -155,9 +153,6 @@ class JobQueueRedis extends JobQueue {
         * @throws JobQueueError
         */
        protected function doGetDelayedCount() {
-               if ( !$this->checkDelay ) {
-                       return 0; // no delayed jobs
-               }
                $conn = $this->getConnection();
                try {
                        return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
@@ -172,9 +167,6 @@ class JobQueueRedis extends JobQueue {
         * @throws JobQueueError
         */
        protected function doGetAbandonedCount() {
-               if ( $this->claimTTL <= 0 ) {
-                       return 0; // no acknowledgements
-               }
                $conn = $this->getConnection();
                try {
                        return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
@@ -301,22 +293,18 @@ LUA;
 
                // Push ready delayed jobs into the queue every 10 jobs to spread the load.
                // This is also done as a periodic task, but we don't want too much done at once.
-               if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
+               if ( !$this->daemonized && mt_rand( 0, 9 ) == 0 ) {
                        $this->recyclePruneAndUndelayJobs();
                }
 
                $conn = $this->getConnection();
                try {
                        do {
-                               if ( $this->claimTTL > 0 ) {
-                                       // Keep the claimed job list down for high-traffic queues
-                                       if ( mt_rand( 0, 99 ) == 0 ) {
-                                               $this->recyclePruneAndUndelayJobs();
-                                       }
-                                       $blob = $this->popAndAcquireBlob( $conn );
-                               } else {
-                                       $blob = $this->popAndDeleteBlob( $conn );
+                               // Keep the claimed job list down for high-traffic queues
+                               if ( !$this->daemonized && mt_rand( 0, 99 ) == 0 ) {
+                                       $this->recyclePruneAndUndelayJobs();
                                }
+                               $blob = $this->popAndAcquireBlob( $conn );
                                if ( !is_string( $blob ) ) {
                                        break; // no jobs; nothing to do
                                }
@@ -338,39 +326,6 @@ LUA;
                return $job;
        }
 
-       /**
-        * @param RedisConnRef $conn
-        * @return array Serialized string or false
-        * @throws RedisException
-        */
-       protected function popAndDeleteBlob( RedisConnRef $conn ) {
-               static $script =
-<<<LUA
-               local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
-               -- Pop an item off the queue
-               local id = redis.call('rpop',kUnclaimed)
-               if not id then return false end
-               -- Get the job data and remove it
-               local item = redis.call('hGet',kData,id)
-               redis.call('hDel',kData,id)
-               -- Allow new duplicates of this job
-               local sha1 = redis.call('hGet',kSha1ById,id)
-               if sha1 then redis.call('hDel',kIdBySha1,sha1) end
-               redis.call('hDel',kSha1ById,id)
-               -- Return the job data
-               return item
-LUA;
-               return $conn->luaEval( $script,
-                       array(
-                               $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
-                               $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
-                               $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
-                               $this->getQueueKey( 'h-data' ), # KEYS[4]
-                       ),
-                       4 # number of first argument(s) that are keys
-               );
-       }
-
        /**
         * @param RedisConnRef $conn
         * @return array Serialized string or false
@@ -416,36 +371,35 @@ LUA;
                if ( !isset( $job->metadata['uuid'] ) ) {
                        throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
                }
-               if ( $this->claimTTL > 0 ) {
-                       $conn = $this->getConnection();
-                       try {
-                               static $script =
+
+               $conn = $this->getConnection();
+               try {
+                       static $script =
 <<<LUA
-                               local kClaimed, kAttempts, kData = unpack(KEYS)
-                               -- Unmark the job as claimed
-                               redis.call('zRem',kClaimed,ARGV[1])
-                               redis.call('hDel',kAttempts,ARGV[1])
-                               -- Delete the job data itself
-                               return redis.call('hDel',kData,ARGV[1])
+                       local kClaimed, kAttempts, kData = unpack(KEYS)
+                       -- Unmark the job as claimed
+                       redis.call('zRem',kClaimed,ARGV[1])
+                       redis.call('hDel',kAttempts,ARGV[1])
+                       -- Delete the job data itself
+                       return redis.call('hDel',kData,ARGV[1])
 LUA;
-                               $res = $conn->luaEval( $script,
-                                       array(
-                                               $this->getQueueKey( 'z-claimed' ), # KEYS[1]
-                                               $this->getQueueKey( 'h-attempts' ), # KEYS[2]
-                                               $this->getQueueKey( 'h-data' ), # KEYS[3]
-                                               $job->metadata['uuid'] # ARGV[1]
-                                       ),
-                                       3 # number of first argument(s) that are keys
-                               );
-
-                               if ( !$res ) {
-                                       wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
-
-                                       return false;
-                               }
-                       } catch ( RedisException $e ) {
-                               $this->throwRedisException( $conn, $e );
+                       $res = $conn->luaEval( $script,
+                               array(
+                                       $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+                                       $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+                                       $this->getQueueKey( 'h-data' ), # KEYS[3]
+                                       $job->metadata['uuid'] # ARGV[1]
+                               ),
+                               3 # number of first argument(s) that are keys
+                       );
+
+                       if ( !$res ) {
+                               wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+
+                               return false;
                        }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $conn, $e );
                }
 
                return true;
@@ -658,7 +612,6 @@ LUA;
                                if attempts < ARGV[3] then
                                        -- Claim expired and retries left: re-enqueue the job
                                        redis.call('lPush',kUnclaimed,id)
-                                       redis.call('hIncrBy',kAttempts,id,1)
                                        released = released + 1
                                else
                                        -- Claim expired and no retries left: mark the job as dead
@@ -723,12 +676,9 @@ LUA;
                if ( $this->daemonized ) {
                        return array(); // managed in the runner loop
                }
-               $periods = array( 3600 ); // standard cleanup (useful on config change)
+               $periods = array( 300 ); // 5 min; delayed/stale jobs
                if ( $this->claimTTL > 0 ) {
-                       $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
-               }
-               if ( $this->checkDelay ) {
-                       $periods[] = 300; // 5 minutes
+                       $periods[] = ceil( $this->claimTTL / 2 ); // halved to avoid bad timing
                }
                $period = min( $periods );
                $period = max( $period, 30 ); // sanity