Merge "Slight improvements to FormSpecialPage behavior."
[lhc/web/wiklou.git] / includes / job / JobQueueRedis.php
index 26a9b72..8250d2b 100644 (file)
  *   - z-abandoned  : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
  *   - z-delayed    : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs
  *   - h-idBySha1   : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
- *   - h-sha1Byid   : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
+ *   - h-sha1ById   : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
  *   - h-attempts   : A hash of (job ID => attempt count) used for job claiming/retries
  *   - h-data       : A hash of (job ID => serialized blobs) for job storage
  * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned.
  * If an ID appears in any of those lists, it should have a h-data entry for its ID.
  * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then
- * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1Byid
- * entry and every h-sha1Byid must refer to an ID that is l-unclaimed. If a job has its
+ * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById
+ * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
  * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
  *
  * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
@@ -216,8 +216,9 @@ class JobQueueRedis extends JobQueue {
                                wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
                                return false;
                        }
-                       wfIncrStats( 'job-insert', count( $items ) );
-                       wfIncrStats( 'job-insert-duplicate', count( $items ) - $failed - $pushed );
+                       JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
+                       JobQueue::incrStats( 'job-insert-duplicate', $this->type,
+                               count( $items ) - $failed - $pushed );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -232,7 +233,7 @@ class JobQueueRedis extends JobQueue {
         * @throws RedisException
         */
        protected function pushBlobs( RedisConnRef $conn, array $items ) {
-               $args = array(); // ([id, sha1, blob [, id, sha1, blob ... ] ] )
+               $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
                foreach ( $items as $item ) {
                        $args[] = (string)$item['uuid'];
                        $args[] = (string)$item['sha1'];
@@ -263,7 +264,7 @@ class JobQueueRedis extends JobQueue {
                end
                return pushed
 LUA;
-               return $this->redisEval( $conn, $script,
+               return $conn->luaEval( $script,
                        array_merge(
                                array(
                                        $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
@@ -308,7 +309,7 @@ LUA;
                                        break; // no jobs; nothing to do
                                }
 
-                               wfIncrStats( 'job-pop' );
+                               JobQueue::incrStats( 'job-pop', $this->type );
                                $item = unserialize( $blob );
                                if ( $item === false ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
@@ -346,7 +347,7 @@ LUA;
                -- Return the job data
                return item
 LUA;
-               return $this->redisEval( $conn, $script,
+               return $conn->luaEval( $script,
                        array(
                                $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
                                $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
@@ -377,7 +378,7 @@ LUA;
                redis.call('hIncrBy',KEYS[5],id,1)
                return redis.call('hGet',KEYS[6],id)
 LUA;
-               return $this->redisEval( $conn, $script,
+               return $conn->luaEval( $script,
                        array(
                                $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
                                $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
@@ -398,13 +399,12 @@ LUA;
         * @throws MWException
         */
        protected function doAck( Job $job ) {
+               if ( !isset( $job->metadata['uuid'] ) ) {
+                       throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
+               }
                if ( $this->claimTTL > 0 ) {
                        $conn = $this->getConnection();
                        try {
-                               // Get the exact field map this Job came from, regardless of whether
-                               // the job was transformed into a DuplicateJob or anything of the sort.
-                               $item = $job->metadata['sourceFields'];
-
                                static $script =
 <<<LUA
                                -- Unmark the job as claimed
@@ -413,12 +413,12 @@ LUA;
                                -- Delete the job data itself
                                return redis.call('hDel',KEYS[3],ARGV[1])
 LUA;
-                               $res = $this->redisEval( $conn, $script,
+                               $res = $conn->luaEval( $script,
                                        array(
                                                $this->getQueueKey( 'z-claimed' ), # KEYS[1]
                                                $this->getQueueKey( 'h-attempts' ), # KEYS[2]
                                                $this->getQueueKey( 'h-data' ), # KEYS[3]
-                                               $item['uuid'] # ARGV[1]
+                                               $job->metadata['uuid'] # ARGV[1]
                                        ),
                                        3 # number of first argument(s) that are keys
                                );
@@ -467,13 +467,10 @@ LUA;
         * @return bool
         */
        protected function doIsRootJobOldDuplicate( Job $job ) {
-               $params = $job->getParams();
-               if ( !isset( $params['rootJobSignature'] ) ) {
+               if ( !$job->hasRootJobParams() ) {
                        return false; // job has no de-deplication info
-               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
-                       wfDebugLog( 'JobQueueRedis', "Cannot check root job; missing 'rootJobTimestamp'." );
-                       return false;
                }
+               $params = $job->getRootJobParams();
 
                $conn = $this->getConnection();
                try {
@@ -487,15 +484,32 @@ LUA;
                return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
        }
 
+       /**
+        * @see JobQueue::doDelete()
+        * @return bool
+        */
+       protected function doDelete() {
+               static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
+                       'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' );
+
+               $conn = $this->getConnection();
+               try {
+                       $keys = array();
+                       foreach ( $props as $prop ) {
+                               $keys[] = $this->getQueueKey( $prop );
+                       }
+                       $res = ( $conn->delete( $keys ) !== false );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
        /**
         * @see JobQueue::getAllQueuedJobs()
         * @return Iterator
         */
        public function getAllQueuedJobs() {
                $conn = $this->getConnection();
-               if ( !$conn ) {
-                       throw new MWException( "Unable to connect to redis server." );
-               }
                try {
                        $that = $this;
                        return new MappedIterator(
@@ -515,9 +529,6 @@ LUA;
         */
        public function getAllDelayedJobs() {
                $conn = $this->getConnection();
-               if ( !$conn ) {
-                       throw new MWException( "Unable to connect to redis server." );
-               }
                try {
                        $that = $this;
                        return new MappedIterator( // delayed jobs
@@ -547,7 +558,7 @@ LUA;
                        }
                        $title = Title::makeTitle( $item['namespace'], $item['title'] );
                        $job = Job::factory( $item['type'], $title, $item['params'] );
-                       $job->metadata['sourceFields'] = $item;
+                       $job->metadata['uuid'] = $item['uuid'];
                        return $job;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -576,7 +587,7 @@ LUA;
                        end
                        return #ids
 LUA;
-                       $count += (int)$this->redisEval( $conn, $script,
+                       $count += (int)$conn->luaEval( $script,
                                array(
                                        $this->getQueueKey( 'z-delayed' ), // KEYS[1]
                                        $this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
@@ -642,7 +653,7 @@ LUA;
                        end
                        return {released,abandoned,pruned}
 LUA;
-                       $res = $this->redisEval( $conn, $script,
+                       $res = $conn->luaEval( $script,
                                array(
                                        $this->getQueueKey( 'z-claimed' ), # KEYS[1]
                                        $this->getQueueKey( 'h-attempts' ), # KEYS[2]
@@ -658,7 +669,8 @@ LUA;
                        if ( $res ) {
                                list( $released, $abandoned, $pruned ) = $res;
                                $count += $released + $pruned;
-                               wfIncrStats( 'job-recycle', count( $released ) );
+                               JobQueue::incrStats( 'job-recycle', $this->type, $released );
+                               JobQueue::incrStats( 'job-abandon', $this->type, $abandoned );
                        }
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -675,45 +687,18 @@ LUA;
                if ( $this->claimTTL > 0 ) {
                        $tasks['recycleAndDeleteStaleJobs'] = array(
                                'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
-                               'period'   => ceil( $this->claimTTL / 2 )
+                               'period' => ceil( $this->claimTTL / 2 )
                        );
                }
                if ( $this->checkDelay ) {
                        $tasks['releaseReadyDelayedJobs'] = array(
                                'callback' => array( $this, 'releaseReadyDelayedJobs' ),
-                               'period'   => 300 // 5 minutes
+                               'period' => 300 // 5 minutes
                        );
                }
                return $tasks;
        }
 
-       /**
-        * @param RedisConnRef $conn
-        * @param string $script
-        * @param array $params
-        * @param integer $numKeys
-        * @return mixed
-        */
-       protected function redisEval( RedisConnRef $conn, $script, array $params, $numKeys ) {
-               $sha1 = sha1( $script ); // 40 char hex
-
-               // Try to run the server-side cached copy of the script
-               $conn->clearLastError();
-               $res = $conn->evalSha( $sha1, $params, $numKeys );
-               // If the script is not in cache, use eval() to retry and cache it
-               if ( $conn->getLastError() && $conn->script( 'exists', $sha1 ) === array( 0 ) ) {
-                       $conn->clearLastError();
-                       $res = $conn->eval( $script, $params, $numKeys );
-                       wfDebugLog( 'JobQueueRedis', "Used eval() for Lua script $sha1." );
-               }
-
-               if ( $conn->getLastError() ) { // script bug?
-                       wfDebugLog( 'JobQueueRedis', "Lua script error: " . $conn->getLastError() );
-               }
-
-               return $res;
-       }
-
        /**
         * @param $job Job
         * @return array
@@ -744,7 +729,7 @@ LUA;
                $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
                if ( $title ) {
                        $job = Job::factory( $fields['type'], $title, $fields['params'] );
-                       $job->metadata['sourceFields'] = $fields;
+                       $job->metadata['uuid'] = $fields['uuid'];
                        return $job;
                }
                return false;