* Also added a JobQueueGroup::getQueueSizes() function.
This function is now used by the SiteStats to make is
meaningful for when queues are not in the DB. This number
is already exposed via the API.
bug: 45072
bug: 50635
bug: 9518
Change-Id: I75c16ffa14c963e7f8fb7cb390e6cc4cde0a5804
static function jobs() {
if ( !isset( self::$jobs ) ) {
$dbr = wfGetDB( DB_SLAVE );
- self::$jobs = $dbr->estimateRowCount( 'job' );
+ self::$jobs = array_sum( JobQueueGroup::singleton()->getQueueSizes() );
/* Zero rows still do single row read for row that doesn't exist, but people are annoyed by that */
if ( self::$jobs == 1 ) {
self::$jobs = 0;
return new ArrayIterator( array() ); // not implemented
}
+ /**
+ * Do not use this function outside of JobQueue/JobQueueGroup
+ *
+ * @return string
+ * @since 1.22
+ */
+ public function getCoalesceLocationInternal() {
+ return null;
+ }
+
+ /**
+ * Check whether each of the given queues are empty.
+ * This is used for batching checks for queues stored at the same place.
+ *
+ * @param array $types List of queues types
+ * @return array|null (list of non-empty queue types) or null if unsupported
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getSiblingQueuesWithJobs( array $types ) {
+ $section = new ProfileSection( __METHOD__ );
+ return $this->doGetSiblingQueuesWithJobs( $types );
+ }
+
+ /**
+ * @see JobQueue::getSiblingQueuesWithJobs()
+ * @param array $types List of queues types
+ * @return array|null (list of queue types) or null if unsupported
+ */
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ return null; // not supported
+ }
+
+ /**
+ * Check the size of each of the given queues.
+ * For queues not served by the same store as this one, 0 is returned.
+ * This is used for batching checks for queues stored at the same place.
+ *
+ * @param array $types List of queues types
+ * @return array|null (job type => whether queue is empty) or null if unsupported
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getSiblingQueueSizes( array $types ) {
+ $section = new ProfileSection( __METHOD__ );
+ return $this->doGetSiblingQueueSizes( $types );
+ }
+
+ /**
+ * @see JobQueue::getSiblingQueuesSize()
+ * @param array $types List of queues types
+ * @return array|null (list of queue types) or null if unsupported
+ */
+ protected function doGetSiblingQueueSizes( array $types ) {
+ return null; // not supported
+ }
+
/**
* Call wfIncrStats() for the queue overall and for the queue type
*
}
}
+ public function getCoalesceLocationInternal() {
+ return $this->cluster ? "DBCluster:{$this->cluster}" : "LBFactory:{$this->wiki}";
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ $res = $dbr->select( 'job', 'DISTINCT job_cmd',
+ array( 'job_cmd' => $types ), __METHOD__ );
+
+ $types = array();
+ foreach ( $res as $row ) {
+ $types[] = $row->job_cmd;
+ }
+ return $types;
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
+ array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
+
+ $sizes = array();
+ foreach ( $res as $row ) {
+ $sizes[$row->job_cmd] = (int)$row->count;
+ }
+ return $sizes;
+ }
+
/**
* Recycle or destroy any jobs that have been claimed for too long
*
return $iterator;
}
+ public function getCoalesceLocationInternal() {
+ return "JobQueueFederated:wiki:" . $this->wiki;
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ $result = array();
+ foreach ( $this->partitionQueues as $queue ) {
+ $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
+ if ( is_array( $nonEmpty ) ) {
+ $result = array_merge( $result, $nonEmpty );
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ }
+ return array_values( array_unique( $result ) );
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $result = array();
+ foreach ( $this->partitionQueues as $queue ) {
+ $sizes = $queue->doGetSiblingQueueSizes( $types );
+ if ( is_array( $sizes ) ) {
+ foreach ( $sizes as $type => $size ) {
+ $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+ }
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ }
+ return $result;
+ }
+
public function setTestingPrefix( $key ) {
foreach ( $this->partitionQueues as $queue ) {
$queue->setTestingPrefix( $key );
protected $wiki; // string; wiki ID
+ /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
+ protected $coalescedQueues;
+
const TYPE_DEFAULT = 1; // integer; jobs popped by default
const TYPE_ANY = 2; // integer; any job
*/
public function getQueuesWithJobs() {
$types = array();
- foreach ( $this->getQueueTypes() as $type ) {
- if ( !$this->get( $type )->isEmpty() ) {
- $types[] = $type;
+ foreach ( $this->getCoalescedQueues() as $info ) {
+ $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+ if ( is_array( $nonEmpty ) ) { // batching features supported
+ $types = array_merge( $types, $nonEmpty );
+ } else { // we have to go through the queues in the bucket one-by-one
+ foreach ( $info['types'] as $type ) {
+ if ( !$this->get( $type )->isEmpty() ) {
+ $types[] = $type;
+ }
+ }
}
}
return $types;
}
+ /**
+ * Get the size of the queus for a list of job types
+ *
+ * @return Array Map of (job type => size)
+ */
+ public function getQueueSizes() {
+ $sizeMap = array();
+ foreach ( $this->getCoalescedQueues() as $info ) {
+ $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+ if ( is_array( $sizes ) ) { // batching features supported
+ $sizeMap = $sizeMap + $sizes;
+ } else { // we have to go through the queues in the bucket one-by-one
+ foreach ( $info['types'] as $type ) {
+ $sizeMap[$type] = $this->get( $type )->getSize();
+ }
+ }
+ }
+ return $sizeMap;
+ }
+
+ /**
+ * @return array
+ */
+ protected function getCoalescedQueues() {
+ global $wgJobTypeConf;
+
+ if ( $this->coalescedQueues === null ) {
+ $this->coalescedQueues = array();
+ foreach ( $wgJobTypeConf as $type => $conf ) {
+ $queue = JobQueue::factory(
+ array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
+ $loc = $queue->getCoalesceLocationInternal();
+ if ( !isset( $this->coalescedQueues[$loc] ) ) {
+ $this->coalescedQueues[$loc]['queue'] = $queue;
+ $this->coalescedQueues[$loc]['types'] = array();
+ }
+ if ( $type === 'default' ) {
+ $this->coalescedQueues[$loc]['types'] = array_merge(
+ $this->coalescedQueues[$loc]['types'],
+ array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
+ );
+ } else {
+ $this->coalescedQueues[$loc]['types'][] = $type;
+ }
+ }
+ }
+
+ return $this->coalescedQueues;
+ }
+
/**
* Check if jobs should not be popped of a queue right now.
* This is only used for performance, such as to avoid spamming
foreach ( $props as $prop ) {
$keys[] = $this->getQueueKey( $prop );
}
- $res = ( $conn->delete( $keys ) !== false );
+ return ( $conn->delete( $keys ) !== false );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
}
}
+ public function getCoalesceLocationInternal() {
+ return "RedisServer:" . $this->server;
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $sizes = array(); // (type => size)
+ $types = array_values( $types ); // reindex
+ try {
+ $conn = $this->getConnection();
+ $conn->multi( Redis::PIPELINE );
+ foreach ( $types as $type ) {
+ $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
+ }
+ $res = $conn->exec();
+ if ( is_array( $res ) ) {
+ foreach ( $res as $i => $size ) {
+ $sizes[$types[$i]] = $size;
+ }
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ return $sizes;
+ }
+
/**
* This function should not be called outside JobQueueRedis
*
/**
* @param $prop string
+ * @param $type string|null
* @return string
*/
- private function getQueueKey( $prop ) {
+ private function getQueueKey( $prop, $type = null ) {
+ $type = is_string( $type ) ? $type : $this->type;
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
if ( strlen( $this->key ) ) { // namespaced queue (for testing)
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
} else {
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
}
}