}
/**
- * Quickly check if the queue is empty.
+ * Quickly check if the queue is empty (has no available jobs).
* Queue classes should use caching if they are any slower without memcached.
*
* @return bool
*/
abstract protected function doIsEmpty();
+ /**
+ * Get the number of available jobs in the queue.
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * @return integer
+ */
+ final public function getSize() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetSize();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getSize()
+ * @return integer
+ */
+ abstract protected function doGetSize();
+
+ /**
+ * Get the number of acquired jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * @return integer
+ */
+ final public function getAcquiredCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetAcquiredCount();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getAcquiredCount()
+ * @return integer
+ */
+ abstract protected function doGetAcquiredCount();
+
/**
* Push a batch of jobs into the queue
*
* @since 1.21
*/
class JobQueueDB extends JobQueue {
- const CACHE_TTL = 300; // integer; seconds to cache queue information
- const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
- const MAX_ATTEMPTS = 3; // integer; number of times to try a job
- const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
+ const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
+ const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
+ const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
+ const MAX_ATTEMPTS = 3; // integer; number of times to try a job
+ const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
/**
* @see JobQueue::doIsEmpty()
protected function doIsEmpty() {
global $wgMemc;
- $key = $this->getEmptinessCacheKey();
+ $key = $this->getCacheKey( 'empty' );
$isEmpty = $wgMemc->get( $key );
if ( $isEmpty === 'true' ) {
$found = $this->getSlaveDB()->selectField( // unclaimed job
'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
);
- $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL );
+ $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
return !$found;
}
+ /**
+ * @see JobQueue::doGetSize()
+ * @return integer
+ */
+ protected function doGetSize() {
+ global $wgMemc;
+
+ $key = $this->getCacheKey( 'size' );
+
+ $size = $wgMemc->get( $key );
+ if ( is_int( $size ) ) {
+ return $size;
+ }
+
+ $dbr = $this->getSlaveDB();
+ $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array( 'job_cmd' => $this->type, 'job_token' => '' ),
+ __METHOD__
+ );
+ $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT );
+
+ return $size;
+ }
+
+ /**
+ * @see JobQueue::doGetAcquiredCount()
+ * @return integer
+ */
+ protected function doGetAcquiredCount() {
+ global $wgMemc;
+
+ $key = $this->getCacheKey( 'acquiredcount' );
+
+ $count = $wgMemc->get( $key );
+ if ( is_int( $count ) ) {
+ return $count;
+ }
+
+ $dbr = $this->getSlaveDB();
+ $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ array( 'job_cmd' => $this->type, "job_token !={$dbr->addQuotes('')}" ),
+ __METHOD__
+ );
+ $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
+
+ return $count;
+ }
+
/**
* @see JobQueue::doBatchPush()
* @param array $jobs
}
$atomic = ( $flags & self::QoS_Atomic );
- $key = $this->getEmptinessCacheKey();
- $ttl = self::CACHE_TTL;
+ $key = $this->getCacheKey( 'empty' );
+ $ttl = self::CACHE_TTL_LONG;
$dbw->onTransactionIdle(
function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl
protected function doPop() {
global $wgMemc;
- if ( $wgMemc->get( $this->getEmptinessCacheKey() ) === 'true' ) {
+ if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
return false; // queue is empty
}
}
// Check if we found a row to reserve...
if ( !$row ) {
- $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
+ $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
break; // nothing to do
}
wfIncrStats( 'job-pop' );
/**
* @return string
*/
- private function getEmptinessCacheKey() {
+ private function getCacheKey( $property ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'isempty' );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
}
/**
$this->mDescription = "Show number of jobs waiting in master database";
$this->addOption( 'group', 'Show number of jobs per job type' );
}
+
public function execute() {
- $dbw = wfGetDB( DB_MASTER );
+ $group = JobQueueGroup::singleton();
if ( $this->hasOption( 'group' ) ) {
- $res = $dbw->select(
- 'job',
- array( 'job_cmd', 'count(*) as count' ),
- array(),
- __METHOD__,
- array( 'GROUP BY' => 'job_cmd' )
- );
- foreach ( $res as $row ) {
- $this->output( $row->job_cmd . ': ' . $row->count . "\n" );
+ foreach ( $group->getQueueTypes() as $type ) {
+ $queue = $group->get( $type );
+ $pending = $queue->getSize();
+ $claimed = $queue->getAcquiredCount();
+ if ( ( $pending + $claimed ) > 0 ) {
+ $this->output( "{$type}: $pending queued; $claimed acquired\n" );
+ }
}
} else {
- $this->output( $dbw->selectField( 'job', 'count(*)', '', __METHOD__ ) . "\n" );
+ $count = 0;
+ foreach ( $group->getQueueTypes() as $type ) {
+ $count += $group->get( $type )->getSize();
+ }
+ $this->output( "$count\n" );
}
}
}