[JobQueue] Fixed numbers given to redis queue stat calls.
[lhc/web/wiklou.git] / includes / job / JobQueueRedis.php
index 1b22c8d..f14f9fa 100644 (file)
@@ -216,8 +216,9 @@ class JobQueueRedis extends JobQueue {
                                wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
                                return false;
                        }
-                       wfIncrStats( 'job-insert', count( $items ) );
-                       wfIncrStats( 'job-insert-duplicate', count( $items ) - $failed - $pushed );
+                       JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
+                       JobQueue::incrStats( 'job-insert-duplicate', $this->type,
+                               count( $items ) - $failed - $pushed );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -232,7 +233,7 @@ class JobQueueRedis extends JobQueue {
         * @throws RedisException
         */
        protected function pushBlobs( RedisConnRef $conn, array $items ) {
-               $args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] )
+               $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
                foreach ( $items as $item ) {
                        $args[] = (string)$item['uuid'];
                        $args[] = (string)$item['sha1'];
@@ -263,7 +264,7 @@ class JobQueueRedis extends JobQueue {
                end
                return pushed
 LUA;
-               return $this->redisEval( $conn, $script,
+               return $conn->luaEval( $script,
                        array_merge(
                                array(
                                        $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
@@ -308,7 +309,7 @@ LUA;
                                        break; // no jobs; nothing to do
                                }
 
-                               wfIncrStats( 'job-pop' );
+                               JobQueue::incrStats( 'job-pop', $this->type );
                                $item = unserialize( $blob );
                                if ( $item === false ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
@@ -346,7 +347,7 @@ LUA;
                -- Return the job data
                return item
 LUA;
-               return $this->redisEval( $conn, $script,
+               return $conn->luaEval( $script,
                        array(
                                $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
                                $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
@@ -377,7 +378,7 @@ LUA;
                redis.call('hIncrBy',KEYS[5],id,1)
                return redis.call('hGet',KEYS[6],id)
 LUA;
-               return $this->redisEval( $conn, $script,
+               return $conn->luaEval( $script,
                        array(
                                $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
                                $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
@@ -398,13 +399,12 @@ LUA;
         * @throws MWException
         */
        protected function doAck( Job $job ) {
+               if ( !isset( $job->metadata['uuid'] ) ) {
+                       throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
+               }
                if ( $this->claimTTL > 0 ) {
                        $conn = $this->getConnection();
                        try {
-                               // Get the exact field map this Job came from, regardless of whether
-                               // the job was transformed into a DuplicateJob or anything of the sort.
-                               $item = $job->metadata['sourceFields'];
-
                                static $script =
 <<<LUA
                                -- Unmark the job as claimed
@@ -413,12 +413,12 @@ LUA;
                                -- Delete the job data itself
                                return redis.call('hDel',KEYS[3],ARGV[1])
 LUA;
-                               $res = $this->redisEval( $conn, $script,
+                               $res = $conn->luaEval( $script,
                                        array(
                                                $this->getQueueKey( 'z-claimed' ), # KEYS[1]
                                                $this->getQueueKey( 'h-attempts' ), # KEYS[2]
                                                $this->getQueueKey( 'h-data' ), # KEYS[3]
-                                               $item['uuid'] # ARGV[1]
+                                               $job->metadata['uuid'] # ARGV[1]
                                        ),
                                        3 # number of first argument(s) that are keys
                                );
@@ -467,13 +467,10 @@ LUA;
         * @return bool
         */
        protected function doIsRootJobOldDuplicate( Job $job ) {
-               $params = $job->getParams();
-               if ( !isset( $params['rootJobSignature'] ) ) {
+               if ( !$job->hasRootJobParams() ) {
                        return false; // job has no de-deplication info
-               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
-                       wfDebugLog( 'JobQueueRedis', "Cannot check root job; missing 'rootJobTimestamp'." );
-                       return false;
                }
+               $params = $job->getRootJobParams();
 
                $conn = $this->getConnection();
                try {
@@ -493,9 +490,6 @@ LUA;
         */
        public function getAllQueuedJobs() {
                $conn = $this->getConnection();
-               if ( !$conn ) {
-                       throw new MWException( "Unable to connect to redis server." );
-               }
                try {
                        $that = $this;
                        return new MappedIterator(
@@ -515,9 +509,6 @@ LUA;
         */
        public function getAllDelayedJobs() {
                $conn = $this->getConnection();
-               if ( !$conn ) {
-                       throw new MWException( "Unable to connect to redis server." );
-               }
                try {
                        $that = $this;
                        return new MappedIterator( // delayed jobs
@@ -547,7 +538,7 @@ LUA;
                        }
                        $title = Title::makeTitle( $item['namespace'], $item['title'] );
                        $job = Job::factory( $item['type'], $title, $item['params'] );
-                       $job->metadata['sourceFields'] = $item;
+                       $job->metadata['uuid'] = $item['uuid'];
                        return $job;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -576,7 +567,7 @@ LUA;
                        end
                        return #ids
 LUA;
-                       $count += (int)$this->redisEval( $conn, $script,
+                       $count += (int)$conn->luaEval( $script,
                                array(
                                        $this->getQueueKey( 'z-delayed' ), // KEYS[1]
                                        $this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
@@ -642,7 +633,7 @@ LUA;
                        end
                        return {released,abandoned,pruned}
 LUA;
-                       $res = $this->redisEval( $conn, $script,
+                       $res = $conn->luaEval( $script,
                                array(
                                        $this->getQueueKey( 'z-claimed' ), # KEYS[1]
                                        $this->getQueueKey( 'h-attempts' ), # KEYS[2]
@@ -658,7 +649,8 @@ LUA;
                        if ( $res ) {
                                list( $released, $abandoned, $pruned ) = $res;
                                $count += $released + $pruned;
-                               wfIncrStats( 'job-recycle', count( $released ) );
+                               JobQueue::incrStats( 'job-recycle', $this->type, $released );
+                               JobQueue::incrStats( 'job-abandon', $this->type, $abandoned );
                        }
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -687,33 +679,6 @@ LUA;
                return $tasks;
        }
 
-       /**
-        * @param RedisConnRef $conn
-        * @param string $script
-        * @param array $params
-        * @param integer $numKeys
-        * @return mixed
-        */
-       protected function redisEval( RedisConnRef $conn, $script, array $params, $numKeys ) {
-               $sha1 = sha1( $script ); // 40 char hex
-
-               // Try to run the server-side cached copy of the script
-               $conn->clearLastError();
-               $res = $conn->evalSha( $sha1, $params, $numKeys );
-               // If the script is not in cache, use eval() to retry and cache it
-               if ( $conn->getLastError() && $conn->script( 'exists', $sha1 ) === array( 0 ) ) {
-                       $conn->clearLastError();
-                       $res = $conn->eval( $script, $params, $numKeys );
-                       wfDebugLog( 'JobQueueRedis', "Used eval() for Lua script $sha1." );
-               }
-
-               if ( $conn->getLastError() ) { // script bug?
-                       wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
-               }
-
-               return $res;
-       }
-
        /**
         * @param $job Job
         * @return array
@@ -744,7 +709,7 @@ LUA;
                $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
                if ( $title ) {
                        $job = Job::factory( $fields['type'], $title, $fields['params'] );
-                       $job->metadata['sourceFields'] = $fields;
+                       $job->metadata['uuid'] = $fields['uuid'];
                        return $job;
                }
                return false;