Fixed JobQueueAggregatorRedis to handle empty collections
authorAaron Schulz <aschulz@wikimedia.org>
Tue, 15 Jul 2014 20:43:35 +0000 (13:43 -0700)
committerOri.livneh <ori@wikimedia.org>
Tue, 22 Jul 2014 15:12:02 +0000 (15:12 +0000)
* Previously it would keep polling if the hash was empty since
  empty and non-existing collections are the same in Redis.
* Also turned off the pointless PHP serialization.
* Bumped the cache version key; this change should be deployed
  in all wikis at once, along with updating the runners.

Change-Id: I33dc55bc579bf678f14a5383e9e2c3c35231d599

includes/jobqueue/aggregator/JobQueueAggregatorRedis.php

index df9ae39..e8e8e30 100644 (file)
@@ -50,6 +50,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
                $this->servers = isset( $params['redisServers'] )
                        ? $params['redisServers']
                        : array( $params['redisServer'] ); // b/c
+               $params['redisConfig']['serializer'] = 'none';
                $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
        }
 
@@ -94,18 +95,16 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
                        return array();
                }
                try {
-                       $conn->multi( Redis::PIPELINE );
-                       $conn->exists( $this->getReadyQueueKey() );
-                       $conn->hGetAll( $this->getReadyQueueKey() );
-                       list( $exists, $map ) = $conn->exec();
+                       $map = $conn->hGetAll( $this->getReadyQueueKey() );
 
-                       if ( $exists ) { // cache hit
+                       if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
+                               unset( $map['_epoch'] ); // ignore
                                $pendingDBs = array(); // (type => list of wikis)
                                foreach ( $map as $key => $time ) {
                                        list( $type, $wiki ) = $this->dencQueueName( $key );
                                        $pendingDBs[$type][] = $wiki;
                                }
-                       } else { // cache miss
+                       } else {
                                // Avoid duplicated effort
                                $rand = wfRandomString( 32 );
                                $conn->multi( Redis::MULTI );
@@ -120,7 +119,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
 
                                $conn->multi( Redis::PIPELINE );
                                $now = time();
-                               $map = array();
+                               $map = array( '_epoch' => time() ); // dummy key for empty Redis collections
                                foreach ( $pendingDBs as $type => $wikis ) {
                                        $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
                                        foreach ( $wikis as $wiki ) {
@@ -189,14 +188,14 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
         * @return string
         */
        private function getReadyQueueKey() {
-               return "jobqueue:aggregator:h-ready-queues:v1"; // global
+               return "jobqueue:aggregator:h-ready-queues:v2"; // global
        }
 
        /**
         * @return string
         */
        private function getQueueTypesKey() {
-               return "jobqueue:aggregator:h-queue-types:v1"; // global
+               return "jobqueue:aggregator:h-queue-types:v2"; // global
        }
 
        /**