Add per-partition JobQueueRedis aggregation
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 12 Nov 2015 00:10:29 +0000 (16:10 -0800)
committerOri.livneh <ori@wikimedia.org>
Thu, 10 Dec 2015 23:59:59 +0000 (23:59 +0000)
* Track queues with non-abandoned jobs per partition server.
  The s-queuesWithJobs key can easily be queried to see which
  queues need to have periodic tasks run (or for debugging).
* This is requirement for the redis jobchron service to be able to
  avoid hitting N=(no. types X no. wikis) queues for periodic tasks
  when only a tiny fraction of those actually have any jobs. For WMF,
  there are over 30K queues, most of them empty, so doing that can help
  lower redis-server CPU (or at least make jobchron more responsive).
* This also allows for jobchron to manage the aggregator by taking the
  per-server aggregator sets and merging them. This scales much better
  as there are only a modest number of these daemons (18 for WMF) but
  vastly more web thread pushing jobs. This cuts down on the connections
  to the active aggregator server (the one with the hash table).
* Use Lua unpack() more for stylistic consistency.

Change-Id: I1549f0edc78cc4004dd887b475dec4c0ebd306c6

includes/jobqueue/JobQueueRedis.php
tests/phpunit/includes/jobqueue/JobQueueTest.php

index 78d2a36..9ce9bf9 100644 (file)
@@ -26,8 +26,9 @@
  *
  * This is a faster and less resource-intensive job queue than JobQueueDB.
  * All data for a queue using this class is placed into one redis server.
+ * The mediawiki/services/jobrunner background service must be set up and running.
  *
- * There are eight main redis keys used to track jobs:
+ * There are eight main redis keys (per queue) used to track jobs:
  *   - l-unclaimed  : A list of job IDs used for ready unclaimed jobs
  *   - z-claimed    : A sorted set of (job ID, UNIX timestamp as score) used for job retries
  *   - z-abandoned  : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
  * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
  * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
  *
+ * The following keys are used to track queue states:
+ *   - s-queuesWithJobs : A set of all queues with non-abandoned jobs
+ *
+ * The background service takes care of undelaying, recycling, and pruning jobs as well as
+ * removing s-queuesWithJobs entries as queues empty.
+ *
  * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
  * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
  * All the keys are prefixed with the relevant wiki ID information.
@@ -239,7 +246,8 @@ class JobQueueRedis extends JobQueue {
         * @throws RedisException
         */
        protected function pushBlobs( RedisConnRef $conn, array $items ) {
-               $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
+               $args = array( $this->encodeQueueName() );
+               // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
                foreach ( $items as $item ) {
                        $args[] = (string)$item['uuid'];
                        $args[] = (string)$item['sha1'];
@@ -248,10 +256,17 @@ class JobQueueRedis extends JobQueue {
                }
                static $script =
 <<<LUA
-               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
-               if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
+               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
+               -- First argument is the queue ID
+               local queueId = ARGV[1]
+               -- Next arguments all come in 4s (one per job)
+               local variadicArgCount = #ARGV - 1
+               if variadicArgCount % 4 ~= 0 then
+                       return redis.error_reply('Unmatched arguments')
+               end
+               -- Insert each job into this queue as needed
                local pushed = 0
-               for i = 1,#ARGV,4 do
+               for i = 2,#ARGV,4 do
                        local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
                        if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
                                if 1*rtimestamp > 0 then
@@ -269,6 +284,8 @@ class JobQueueRedis extends JobQueue {
                                pushed = pushed + 1
                        end
                end
+               -- Mark this queue as having jobs
+               redis.call('sAdd',kQwJobs,queueId)
                return pushed
 LUA;
                return $conn->luaEval( $script,
@@ -279,10 +296,11 @@ LUA;
                                        $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
                                        $this->getQueueKey( 'z-delayed' ), # KEYS[4]
                                        $this->getQueueKey( 'h-data' ), # KEYS[5]
+                                       $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
                                ),
                                $args
                        ),
-                       5 # number of first argument(s) that are keys
+                       6 # number of first argument(s) that are keys
                );
        }
 
@@ -328,15 +346,18 @@ LUA;
                static $script =
 <<<LUA
                local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
+               local rTime = unpack(ARGV)
                -- Pop an item off the queue
                local id = redis.call('rPop',kUnclaimed)
-               if not id then return false end
+               if not id then
+                       return false
+               end
                -- Allow new duplicates of this job
                local sha1 = redis.call('hGet',kSha1ById,id)
                if sha1 then redis.call('hDel',kIdBySha1,sha1) end
                redis.call('hDel',kSha1ById,id)
                -- Mark the jobs as claimed and return it
-               redis.call('zAdd',kClaimed,ARGV[1],id)
+               redis.call('zAdd',kClaimed,rTime,id)
                redis.call('hIncrBy',kAttempts,id,1)
                return redis.call('hGet',kData,id)
 LUA;
@@ -372,11 +393,12 @@ LUA;
                        static $script =
 <<<LUA
                        local kClaimed, kAttempts, kData = unpack(KEYS)
+                       local uuid = unpack(ARGV)
                        -- Unmark the job as claimed
-                       redis.call('zRem',kClaimed,ARGV[1])
-                       redis.call('hDel',kAttempts,ARGV[1])
+                       redis.call('zRem',kClaimed,uuid)
+                       redis.call('hDel',kAttempts,uuid)
                        -- Delete the job data itself
-                       return redis.call('hDel',kData,ARGV[1])
+                       return redis.call('hDel',kData,uuid)
 LUA;
                        $res = $conn->luaEval( $script,
                                array(
@@ -472,7 +494,10 @@ LUA;
                                $keys[] = $this->getQueueKey( $prop );
                        }
 
-                       return ( $conn->delete( $keys ) !== false );
+                       $ok = ( $conn->delete( $keys ) !== false );
+                       $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
+
+                       return $ok;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
@@ -623,6 +648,27 @@ LUA;
                }
        }
 
+       /**
+        * @return array List of (wiki,type) tuples for queues with non-abandoned jobs
+        * @throws JobQueueConnectionError
+        * @throws JobQueueError
+        */
+       public function getServerQueuesWithJobs() {
+               $queues = array();
+
+               $conn = $this->getConnection();
+               try {
+                       $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
+                       foreach ( $set as $queue ) {
+                               $queues[] = $this->decodeQueueName( $queue );
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $conn, $e );
+               }
+
+               return $queues;
+       }
+
        /**
         * @param IJobSpecification $job
         * @return array
@@ -719,6 +765,40 @@ LUA;
                throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
        }
 
+       /**
+        * @return string JSON
+        */
+       private function encodeQueueName() {
+               return json_encode( array( $this->type, $this->wiki ) );
+       }
+
+       /**
+        * @param string $name JSON
+        * @return array (type, wiki)
+        */
+       private function decodeQueueName( $name ) {
+               return json_decode( $name );
+       }
+
+       /**
+        * @param string $name
+        * @return string
+        */
+       private function getGlobalKey( $name ) {
+               $parts = array( 'global', 'jobqueue', $name );
+               if ( strlen( $this->key ) ) { // namespaced queue (for testing)
+                       $parts[] = $this->key;
+               }
+
+               foreach ( $parts as $part ) {
+                   if ( !preg_match( '/[a-zA-Z0-9_-.]+/', $part ) ) {
+                       throw new InvalidArgumentException( "Key part characters are out of range." );
+                   }
+               }
+
+               return implode( ':', $parts );
+       }
+
        /**
         * @param string $prop
         * @param string|null $type
index 3cb1af6..bb74e95 100644 (file)
@@ -332,6 +332,25 @@ class JobQueueTest extends MediaWikiTestCase {
                $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
        }
 
+       /**
+        * @covers JobQueue
+        */
+       public function testQueueAggregateTable() {
+               $queue = $this->queueFifo;
+               if ( !$queue || !method_exists( $queue, 'getServerQueuesWithJobs' ) ) {
+                       $this->markTestSkipped();
+               }
+
+               $this->assertArrayEquals( array(), $queue->getServerQueuesWithJobs() );
+
+               $queue->push( $this->newJob( 0 ) );
+
+               $this->assertArrayEquals(
+                       array( array( $queue->getType(), $queue->getWiki() ) ),
+                       $queue->getServerQueuesWithJobs()
+               );
+       }
+
        public static function provider_queueLists() {
                return array(
                        array( 'queueRand', false, 'Random queue without ack()' ),