Merge "Made root job de-duplication work without cache setup"
[lhc/web/wiklou.git] / includes / job / JobQueueRedis.php
index 1f5b761..378e175 100644 (file)
@@ -501,7 +501,7 @@ LUA;
                        foreach ( $props as $prop ) {
                                $keys[] = $this->getQueueKey( $prop );
                        }
-                       $res = ( $conn->delete( $keys ) !== false );
+                       return ( $conn->delete( $keys ) !== false );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -519,7 +519,8 @@ LUA;
                                $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
                                function( $uid ) use ( $that, $conn ) {
                                        return $that->getJobFromUidInternal( $uid, $conn );
-                               }
+                               },
+                               array( 'accept' => function ( $job ) { return is_object( $job ); } )
                        );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -538,23 +539,57 @@ LUA;
                                $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
                                function( $uid ) use ( $that, $conn ) {
                                        return $that->getJobFromUidInternal( $uid, $conn );
-                               }
+                               },
+                               array( 'accept' => function ( $job ) { return is_object( $job ); } )
                        );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
        }
 
+       public function getCoalesceLocationInternal() {
+               return "RedisServer:" . $this->server;
+       }
+
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $sizes = array(); // (type => size)
+               $types = array_values( $types ); // reindex
+               try {
+                       $conn = $this->getConnection();
+                       $conn->multi( Redis::PIPELINE );
+                       foreach ( $types as $type ) {
+                               $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
+                       }
+                       $res = $conn->exec();
+                       if ( is_array( $res ) ) {
+                               foreach ( $res as $i => $size ) {
+                                       $sizes[$types[$i]] = $size;
+                               }
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+               return $sizes;
+       }
+
        /**
         * This function should not be called outside JobQueueRedis
         *
         * @param $uid string
         * @param $conn RedisConnRef
-        * @return Job
+        * @return Job|bool Returns false if the job does not exist
         * @throws MWException
         */
        public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
                try {
+                       $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
+                       if ( $data === false ) {
+                               return false; // not found
+                       }
                        $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
                        if ( !is_array( $item ) ) { // this shouldn't happen
                                throw new MWException( "Could not find job with ID '$uid'." );
@@ -780,7 +815,7 @@ LUA;
        protected function getConnection() {
                $conn = $this->redisPool->getConnection( $this->server );
                if ( !$conn ) {
-                       throw new MWException( "Unable to connect to redis server." );
+                       throw new JobQueueConnectionError( "Unable to connect to redis server." );
                }
                return $conn;
        }
@@ -793,19 +828,21 @@ LUA;
         */
        protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
                $this->redisPool->handleException( $server, $conn, $e );
-               throw new MWException( "Redis server error: {$e->getMessage()}\n" );
+               throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
        }
 
        /**
         * @param $prop string
+        * @param $type string|null
         * @return string
         */
-       private function getQueueKey( $prop ) {
+       private function getQueueKey( $prop, $type = null ) {
+               $type = is_string( $type ) ? $type : $this->type;
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
                if ( strlen( $this->key ) ) { // namespaced queue (for testing)
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
                } else {
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
                }
        }