From 167d7149e4728f7df569a7b2dd887d7ca879f3b3 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Thu, 4 Jul 2013 00:05:19 -0700 Subject: [PATCH] jobqueue: improved performance of JobQueueGroup::getQueuesWithJobs() * Also added a JobQueueGroup::getQueueSizes() function. This function is now used by the SiteStats to make is meaningful for when queues are not in the DB. This number is already exposed via the API. bug: 45072 bug: 50635 bug: 9518 Change-Id: I75c16ffa14c963e7f8fb7cb390e6cc4cde0a5804 --- includes/SiteStats.php | 2 +- includes/job/JobQueue.php | 57 ++++++++++++++++++++++++++ includes/job/JobQueueDB.php | 28 +++++++++++++ includes/job/JobQueueFederated.php | 32 +++++++++++++++ includes/job/JobQueueGroup.php | 66 ++++++++++++++++++++++++++++-- includes/job/JobQueueRedis.php | 39 ++++++++++++++++-- 6 files changed, 216 insertions(+), 8 deletions(-) diff --git a/includes/SiteStats.php b/includes/SiteStats.php index 6e2359ae54..355993c6ec 100644 --- a/includes/SiteStats.php +++ b/includes/SiteStats.php @@ -189,7 +189,7 @@ class SiteStats { static function jobs() { if ( !isset( self::$jobs ) ) { $dbr = wfGetDB( DB_SLAVE ); - self::$jobs = $dbr->estimateRowCount( 'job' ); + self::$jobs = array_sum( JobQueueGroup::singleton()->getQueueSizes() ); /* Zero rows still do single row read for row that doesn't exist, but people are annoyed by that */ if ( self::$jobs == 1 ) { self::$jobs = 0; diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 3e94b13bba..81e7b7340e 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -616,6 +616,63 @@ abstract class JobQueue { return new ArrayIterator( array() ); // not implemented } + /** + * Do not use this function outside of JobQueue/JobQueueGroup + * + * @return string + * @since 1.22 + */ + public function getCoalesceLocationInternal() { + return null; + } + + /** + * Check whether each of the given queues are empty. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (list of non-empty queue types) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueuesWithJobs( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueuesWithJobs( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesWithJobs() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueuesWithJobs( array $types ) { + return null; // not supported + } + + /** + * Check the size of each of the given queues. + * For queues not served by the same store as this one, 0 is returned. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (job type => whether queue is empty) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueueSizes( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueueSizes( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesSize() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueueSizes( array $types ) { + return null; // not supported + } + /** * Call wfIncrStats() for the queue overall and for the queue type * diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 3fa06556cd..2052fc10d9 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -604,6 +604,34 @@ class JobQueueDB extends JobQueue { } } + public function getCoalesceLocationInternal() { + return $this->cluster ? "DBCluster:{$this->cluster}" : "LBFactory:{$this->wiki}"; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + list( $dbr, $scope ) = $this->getSlaveDB(); + $res = $dbr->select( 'job', 'DISTINCT job_cmd', + array( 'job_cmd' => $types ), __METHOD__ ); + + $types = array(); + foreach ( $res as $row ) { + $types[] = $row->job_cmd; + } + return $types; + } + + protected function doGetSiblingQueueSizes( array $types ) { + list( $dbr, $scope ) = $this->getSlaveDB(); + $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), + array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); + + $sizes = array(); + foreach ( $res as $row ) { + $sizes[$row->job_cmd] = (int)$row->count; + } + return $sizes; + } + /** * Recycle or destroy any jobs that have been claimed for too long * diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index 35b80ca6a7..d788c98b42 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -421,6 +421,38 @@ class JobQueueFederated extends JobQueue { return $iterator; } + public function getCoalesceLocationInternal() { + return "JobQueueFederated:wiki:" . $this->wiki; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $result = array(); + foreach ( $this->partitionQueues as $queue ) { + $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); + if ( is_array( $nonEmpty ) ) { + $result = array_merge( $result, $nonEmpty ); + } else { + return null; // not supported on all partitions; bail + } + } + return array_values( array_unique( $result ) ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $result = array(); + foreach ( $this->partitionQueues as $queue ) { + $sizes = $queue->doGetSiblingQueueSizes( $types ); + if ( is_array( $sizes ) ) { + foreach ( $sizes as $type => $size ) { + $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; + } + } else { + return null; // not supported on all partitions; bail + } + } + return $result; + } + public function setTestingPrefix( $key ) { foreach ( $this->partitionQueues as $queue ) { $queue->setTestingPrefix( $key ); diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index e483e05797..a20ea4af5d 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -36,6 +36,9 @@ class JobQueueGroup { protected $wiki; // string; wiki ID + /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ + protected $coalescedQueues; + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job @@ -254,14 +257,71 @@ class JobQueueGroup { */ public function getQueuesWithJobs() { $types = array(); - foreach ( $this->getQueueTypes() as $type ) { - if ( !$this->get( $type )->isEmpty() ) { - $types[] = $type; + foreach ( $this->getCoalescedQueues() as $info ) { + $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); + if ( is_array( $nonEmpty ) ) { // batching features supported + $types = array_merge( $types, $nonEmpty ); + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } } } return $types; } + /** + * Get the size of the queus for a list of job types + * + * @return Array Map of (job type => size) + */ + public function getQueueSizes() { + $sizeMap = array(); + foreach ( $this->getCoalescedQueues() as $info ) { + $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); + if ( is_array( $sizes ) ) { // batching features supported + $sizeMap = $sizeMap + $sizes; + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + $sizeMap[$type] = $this->get( $type )->getSize(); + } + } + } + return $sizeMap; + } + + /** + * @return array + */ + protected function getCoalescedQueues() { + global $wgJobTypeConf; + + if ( $this->coalescedQueues === null ) { + $this->coalescedQueues = array(); + foreach ( $wgJobTypeConf as $type => $conf ) { + $queue = JobQueue::factory( + array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); + $loc = $queue->getCoalesceLocationInternal(); + if ( !isset( $this->coalescedQueues[$loc] ) ) { + $this->coalescedQueues[$loc]['queue'] = $queue; + $this->coalescedQueues[$loc]['types'] = array(); + } + if ( $type === 'default' ) { + $this->coalescedQueues[$loc]['types'] = array_merge( + $this->coalescedQueues[$loc]['types'], + array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) + ); + } else { + $this->coalescedQueues[$loc]['types'][] = $type; + } + } + } + + return $this->coalescedQueues; + } + /** * Check if jobs should not be popped of a queue right now. * This is only used for performance, such as to avoid spamming diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 57189a50c0..378e17555d 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -501,7 +501,7 @@ LUA; foreach ( $props as $prop ) { $keys[] = $this->getQueueKey( $prop ); } - $res = ( $conn->delete( $keys ) !== false ); + return ( $conn->delete( $keys ) !== false ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } @@ -547,6 +547,35 @@ LUA; } } + public function getCoalesceLocationInternal() { + return "RedisServer:" . $this->server; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $sizes = array(); // (type => size) + $types = array_values( $types ); // reindex + try { + $conn = $this->getConnection(); + $conn->multi( Redis::PIPELINE ); + foreach ( $types as $type ) { + $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); + } + $res = $conn->exec(); + if ( is_array( $res ) ) { + foreach ( $res as $i => $size ) { + $sizes[$types[$i]] = $size; + } + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + return $sizes; + } + /** * This function should not be called outside JobQueueRedis * @@ -804,14 +833,16 @@ LUA; /** * @param $prop string + * @param $type string|null * @return string */ - private function getQueueKey( $prop ) { + private function getQueueKey( $prop, $type = null ) { + $type = is_string( $type ) ? $type : $this->type; list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); if ( strlen( $this->key ) ) { // namespaced queue (for testing) - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); } else { - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); } } -- 2.20.1