[JobQueue] Allow using gzip on larger jobs in JobQueueRedis.
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 1 May 2013 03:00:20 +0000 (20:00 -0700)
committerGerrit Code Review <gerrit@wikimedia.org>
Sun, 26 May 2013 08:52:56 +0000 (08:52 +0000)
Change-Id: I74ebdcfa0d3f2bcdc429394f7873fa25b7eb0f58

includes/job/JobQueueRedis.php

index a3f847f..1f5b761 100644 (file)
@@ -61,6 +61,7 @@ class JobQueueRedis extends JobQueue {
        protected $redisPool;
 
        protected $server; // string; server address
+       protected $compression; // string; compression method to use
 
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
 
@@ -73,12 +74,14 @@ class JobQueueRedis extends JobQueue {
         *   - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
         *                   If a hostname is specified but no port, the standard port number
         *                   6379 will be used. Required.
+        *   - compression : The type of compression to use; one of (none,gzip).
         * @param array $params
         */
        public function __construct( array $params ) {
                parent::__construct( $params );
                $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
                $this->server = $params['redisServer'];
+               $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
                $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
        }
 
@@ -238,7 +241,7 @@ class JobQueueRedis extends JobQueue {
                        $args[] = (string)$item['uuid'];
                        $args[] = (string)$item['sha1'];
                        $args[] = (string)$item['rtimestamp'];
-                       $args[] = (string)serialize( $item );
+                       $args[] = (string)$this->serialize( $item );
                }
                static $script =
 <<<LUA
@@ -310,7 +313,7 @@ LUA;
                                }
 
                                JobQueue::incrStats( 'job-pop', $this->type );
-                               $item = unserialize( $blob );
+                               $item = $this->unserialize( $blob );
                                if ( $item === false ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
                                        continue;
@@ -552,7 +555,7 @@ LUA;
         */
        public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
                try {
-                       $item = unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+                       $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
                        if ( !is_array( $item ) ) { // this shouldn't happen
                                throw new MWException( "Could not find job with ID '$uid'." );
                        }
@@ -735,6 +738,39 @@ LUA;
                return false;
        }
 
+       /**
+        * @param array $fields
+        * @return string Serialized and possibly compressed version of $fields
+        */
+       protected function serialize( array $fields ) {
+               $blob = serialize( $fields );
+               if ( $this->compression === 'gzip'
+                       && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) )
+               {
+                       $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' );
+                       $blobz = serialize( $object );
+                       return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
+               } else {
+                       return $blob;
+               }
+       }
+
+       /**
+        * @param string $blob
+        * @return array|bool Unserialized version of $blob or false
+        */
+       protected function unserialize( $blob ) {
+               $fields = unserialize( $blob );
+               if ( is_object( $fields ) ) {
+                       if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
+                               $fields = unserialize( gzinflate( $fields->blob ) );
+                       } else {
+                               $fields = false;
+                       }
+               }
+               return is_array( $fields ) ? $fields : false;
+       }
+
        /**
         * Get a connection to the server that handles all sub-queues for this queue
         *