Remove JobQueue::setTestingPrefix() hack
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueRedis.php
index 29c8068..3e7bdcc 100644 (file)
@@ -26,8 +26,9 @@
  *
  * This is a faster and less resource-intensive job queue than JobQueueDB.
  * All data for a queue using this class is placed into one redis server.
+ * The mediawiki/services/jobrunner background service must be set up and running.
  *
- * There are eight main redis keys used to track jobs:
+ * There are eight main redis keys (per queue) used to track jobs:
  *   - l-unclaimed  : A list of job IDs used for ready unclaimed jobs
  *   - z-claimed    : A sorted set of (job ID, UNIX timestamp as score) used for job retries
  *   - z-abandoned  : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
  * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
  * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
  *
+ * The following keys are used to track queue states:
+ *   - s-queuesWithJobs : A set of all queues with non-abandoned jobs
+ *
+ * The background service takes care of undelaying, recycling, and pruning jobs as well as
+ * removing s-queuesWithJobs entries as queues empty.
+ *
  * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
  * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
  * All the keys are prefixed with the relevant wiki ID information.
@@ -218,14 +225,15 @@ class JobQueueRedis extends JobQueue {
                                        $failed += count( $itemBatch );
                                }
                        }
-                       if ( $failed > 0 ) {
-                               wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
-
-                               throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
-                       }
                        JobQueue::incrStats( 'inserts', $this->type, count( $items ) );
+                       JobQueue::incrStats( 'inserts_actual', $this->type, $pushed );
                        JobQueue::incrStats( 'dupe_inserts', $this->type,
                                count( $items ) - $failed - $pushed );
+                       if ( $failed > 0 ) {
+                               $err = "Could not insert {$failed} {$this->type} job(s).";
+                               wfDebugLog( 'JobQueueRedis', $err );
+                               throw new RedisException( $err );
+                       }
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
@@ -238,7 +246,8 @@ class JobQueueRedis extends JobQueue {
         * @throws RedisException
         */
        protected function pushBlobs( RedisConnRef $conn, array $items ) {
-               $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
+               $args = array( $this->encodeQueueName() );
+               // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
                foreach ( $items as $item ) {
                        $args[] = (string)$item['uuid'];
                        $args[] = (string)$item['sha1'];
@@ -247,10 +256,17 @@ class JobQueueRedis extends JobQueue {
                }
                static $script =
 <<<LUA
-               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
-               if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
+               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
+               -- First argument is the queue ID
+               local queueId = ARGV[1]
+               -- Next arguments all come in 4s (one per job)
+               local variadicArgCount = #ARGV - 1
+               if variadicArgCount % 4 ~= 0 then
+                       return redis.error_reply('Unmatched arguments')
+               end
+               -- Insert each job into this queue as needed
                local pushed = 0
-               for i = 1,#ARGV,4 do
+               for i = 2,#ARGV,4 do
                        local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
                        if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
                                if 1*rtimestamp > 0 then
@@ -268,6 +284,8 @@ class JobQueueRedis extends JobQueue {
                                pushed = pushed + 1
                        end
                end
+               -- Mark this queue as having jobs
+               redis.call('sAdd',kQwJobs,queueId)
                return pushed
 LUA;
                return $conn->luaEval( $script,
@@ -278,10 +296,11 @@ LUA;
                                        $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
                                        $this->getQueueKey( 'z-delayed' ), # KEYS[4]
                                        $this->getQueueKey( 'h-data' ), # KEYS[5]
+                                       $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
                                ),
                                $args
                        ),
-                       5 # number of first argument(s) that are keys
+                       6 # number of first argument(s) that are keys
                );
        }
 
@@ -301,7 +320,7 @@ LUA;
                                        break; // no jobs; nothing to do
                                }
 
-                               JobQueue::incrStats( 'job-pop', $this->type );
+                               JobQueue::incrStats( 'pops', $this->type );
                                $item = $this->unserialize( $blob );
                                if ( $item === false ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
@@ -327,15 +346,18 @@ LUA;
                static $script =
 <<<LUA
                local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
+               local rTime = unpack(ARGV)
                -- Pop an item off the queue
                local id = redis.call('rPop',kUnclaimed)
-               if not id then return false end
+               if not id then
+                       return false
+               end
                -- Allow new duplicates of this job
                local sha1 = redis.call('hGet',kSha1ById,id)
                if sha1 then redis.call('hDel',kIdBySha1,sha1) end
                redis.call('hDel',kSha1ById,id)
                -- Mark the jobs as claimed and return it
-               redis.call('zAdd',kClaimed,ARGV[1],id)
+               redis.call('zAdd',kClaimed,rTime,id)
                redis.call('hIncrBy',kAttempts,id,1)
                return redis.call('hGet',kData,id)
 LUA;
@@ -371,11 +393,12 @@ LUA;
                        static $script =
 <<<LUA
                        local kClaimed, kAttempts, kData = unpack(KEYS)
+                       local uuid = unpack(ARGV)
                        -- Unmark the job as claimed
-                       redis.call('zRem',kClaimed,ARGV[1])
-                       redis.call('hDel',kAttempts,ARGV[1])
+                       redis.call('zRem',kClaimed,uuid)
+                       redis.call('hDel',kAttempts,uuid)
                        -- Delete the job data itself
-                       return redis.call('hDel',kData,ARGV[1])
+                       return redis.call('hDel',kData,uuid)
 LUA;
                        $res = $conn->luaEval( $script,
                                array(
@@ -393,7 +416,7 @@ LUA;
                                return false;
                        }
 
-                       JobQueue::incrStats( 'job-ack', $this->type );
+                       JobQueue::incrStats( 'acks', $this->type );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
@@ -471,7 +494,10 @@ LUA;
                                $keys[] = $this->getQueueKey( $prop );
                        }
 
-                       return ( $conn->delete( $keys ) !== false );
+                       $ok = ( $conn->delete( $keys ) !== false );
+                       $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
+
+                       return $ok;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
@@ -622,6 +648,27 @@ LUA;
                }
        }
 
+       /**
+        * @return array List of (wiki,type) tuples for queues with non-abandoned jobs
+        * @throws JobQueueConnectionError
+        * @throws JobQueueError
+        */
+       public function getServerQueuesWithJobs() {
+               $queues = array();
+
+               $conn = $this->getConnection();
+               try {
+                       $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
+                       foreach ( $set as $queue ) {
+                               $queues[] = $this->decodeQueueName( $queue );
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $conn, $e );
+               }
+
+               return $queues;
+       }
+
        /**
         * @param IJobSpecification $job
         * @return array
@@ -638,7 +685,7 @@ LUA;
                        // Additional job metadata
                        'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
                        'sha1' => $job->ignoreDuplicates()
-                               ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
+                               ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
                                : '',
                        'timestamp' => time() // UNIX timestamp
                );
@@ -718,6 +765,36 @@ LUA;
                throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
        }
 
+       /**
+        * @return string JSON
+        */
+       private function encodeQueueName() {
+               return json_encode( array( $this->type, $this->wiki ) );
+       }
+
+       /**
+        * @param string $name JSON
+        * @return array (type, wiki)
+        */
+       private function decodeQueueName( $name ) {
+               return json_decode( $name );
+       }
+
+       /**
+        * @param string $name
+        * @return string
+        */
+       private function getGlobalKey( $name ) {
+               $parts = array( 'global', 'jobqueue', $name );
+               foreach ( $parts as $part ) {
+                   if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
+                       throw new InvalidArgumentException( "Key part characters are out of range." );
+                   }
+               }
+
+               return implode( ':', $parts );
+       }
+
        /**
         * @param string $prop
         * @param string|null $type
@@ -726,18 +803,7 @@ LUA;
        private function getQueueKey( $prop, $type = null ) {
                $type = is_string( $type ) ? $type : $this->type;
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               if ( strlen( $this->key ) ) { // namespaced queue (for testing)
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
-               } else {
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
-               }
-       }
 
-       /**
-        * @param string $key
-        * @return void
-        */
-       public function setTestingPrefix( $key ) {
-               $this->key = $key;
+               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
        }
 }