From 6fe2f48df70e63cc0af1e2c100d2a5b10a6c6f71 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 11 Nov 2015 16:10:29 -0800 Subject: [PATCH] Add per-partition JobQueueRedis aggregation * Track queues with non-abandoned jobs per partition server. The s-queuesWithJobs key can easily be queried to see which queues need to have periodic tasks run (or for debugging). * This is requirement for the redis jobchron service to be able to avoid hitting N=(no. types X no. wikis) queues for periodic tasks when only a tiny fraction of those actually have any jobs. For WMF, there are over 30K queues, most of them empty, so doing that can help lower redis-server CPU (or at least make jobchron more responsive). * This also allows for jobchron to manage the aggregator by taking the per-server aggregator sets and merging them. This scales much better as there are only a modest number of these daemons (18 for WMF) but vastly more web thread pushing jobs. This cuts down on the connections to the active aggregator server (the one with the hash table). * Use Lua unpack() more for stylistic consistency. Change-Id: I1549f0edc78cc4004dd887b475dec4c0ebd306c6 --- includes/jobqueue/JobQueueRedis.php | 104 ++++++++++++++++-- .../includes/jobqueue/JobQueueTest.php | 19 ++++ 2 files changed, 111 insertions(+), 12 deletions(-) diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 78d2a360f9..9ce9bf94c3 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -26,8 +26,9 @@ * * This is a faster and less resource-intensive job queue than JobQueueDB. * All data for a queue using this class is placed into one redis server. + * The mediawiki/services/jobrunner background service must be set up and running. * - * There are eight main redis keys used to track jobs: + * There are eight main redis keys (per queue) used to track jobs: * - l-unclaimed : A list of job IDs used for ready unclaimed jobs * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs @@ -43,6 +44,12 @@ * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. * + * The following keys are used to track queue states: + * - s-queuesWithJobs : A set of all queues with non-abandoned jobs + * + * The background service takes care of undelaying, recycling, and pruning jobs as well as + * removing s-queuesWithJobs entries as queues empty. + * * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. * All the keys are prefixed with the relevant wiki ID information. @@ -239,7 +246,8 @@ class JobQueueRedis extends JobQueue { * @throws RedisException */ protected function pushBlobs( RedisConnRef $conn, array $items ) { - $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) + $args = array( $this->encodeQueueName() ); + // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) foreach ( $items as $item ) { $args[] = (string)$item['uuid']; $args[] = (string)$item['sha1']; @@ -248,10 +256,17 @@ class JobQueueRedis extends JobQueue { } static $script = << 0 then @@ -269,6 +284,8 @@ class JobQueueRedis extends JobQueue { pushed = pushed + 1 end end + -- Mark this queue as having jobs + redis.call('sAdd',kQwJobs,queueId) return pushed LUA; return $conn->luaEval( $script, @@ -279,10 +296,11 @@ LUA; $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] $this->getQueueKey( 'z-delayed' ), # KEYS[4] $this->getQueueKey( 'h-data' ), # KEYS[5] + $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6] ), $args ), - 5 # number of first argument(s) that are keys + 6 # number of first argument(s) that are keys ); } @@ -328,15 +346,18 @@ LUA; static $script = <<luaEval( $script, array( @@ -472,7 +494,10 @@ LUA; $keys[] = $this->getQueueKey( $prop ); } - return ( $conn->delete( $keys ) !== false ); + $ok = ( $conn->delete( $keys ) !== false ); + $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() ); + + return $ok; } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } @@ -623,6 +648,27 @@ LUA; } } + /** + * @return array List of (wiki,type) tuples for queues with non-abandoned jobs + * @throws JobQueueConnectionError + * @throws JobQueueError + */ + public function getServerQueuesWithJobs() { + $queues = array(); + + $conn = $this->getConnection(); + try { + $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) ); + foreach ( $set as $queue ) { + $queues[] = $this->decodeQueueName( $queue ); + } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $queues; + } + /** * @param IJobSpecification $job * @return array @@ -719,6 +765,40 @@ LUA; throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); } + /** + * @return string JSON + */ + private function encodeQueueName() { + return json_encode( array( $this->type, $this->wiki ) ); + } + + /** + * @param string $name JSON + * @return array (type, wiki) + */ + private function decodeQueueName( $name ) { + return json_decode( $name ); + } + + /** + * @param string $name + * @return string + */ + private function getGlobalKey( $name ) { + $parts = array( 'global', 'jobqueue', $name ); + if ( strlen( $this->key ) ) { // namespaced queue (for testing) + $parts[] = $this->key; + } + + foreach ( $parts as $part ) { + if ( !preg_match( '/[a-zA-Z0-9_-.]+/', $part ) ) { + throw new InvalidArgumentException( "Key part characters are out of range." ); + } + } + + return implode( ':', $parts ); + } + /** * @param string $prop * @param string|null $type diff --git a/tests/phpunit/includes/jobqueue/JobQueueTest.php b/tests/phpunit/includes/jobqueue/JobQueueTest.php index 3cb1af60d5..bb74e95aa2 100644 --- a/tests/phpunit/includes/jobqueue/JobQueueTest.php +++ b/tests/phpunit/includes/jobqueue/JobQueueTest.php @@ -332,6 +332,25 @@ class JobQueueTest extends MediaWikiTestCase { $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" ); } + /** + * @covers JobQueue + */ + public function testQueueAggregateTable() { + $queue = $this->queueFifo; + if ( !$queue || !method_exists( $queue, 'getServerQueuesWithJobs' ) ) { + $this->markTestSkipped(); + } + + $this->assertArrayEquals( array(), $queue->getServerQueuesWithJobs() ); + + $queue->push( $this->newJob( 0 ) ); + + $this->assertArrayEquals( + array( array( $queue->getType(), $queue->getWiki() ) ), + $queue->getServerQueuesWithJobs() + ); + } + public static function provider_queueLists() { return array( array( 'queueRand', false, 'Random queue without ack()' ), -- 2.20.1