From 10d035c35e987aa78b5d38cb3fea9a48c62afa09 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 14 Dec 2012 10:47:06 -0800 Subject: [PATCH] [JobQueue] Made showJobs.php work for non-DB queues. Change-Id: Idada1e3ca8278898de6e53fdcc5dee4786d5bae8 --- includes/job/JobQueue.php | 40 +++++++++++++++++++- includes/job/JobQueueDB.php | 73 +++++++++++++++++++++++++++++++------ maintenance/showJobs.php | 25 +++++++------ 3 files changed, 114 insertions(+), 24 deletions(-) diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index ffd5d95f1f..ae80370155 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -96,7 +96,7 @@ abstract class JobQueue { } /** - * Quickly check if the queue is empty. + * Quickly check if the queue is empty (has no available jobs). * Queue classes should use caching if they are any slower without memcached. * * @return bool @@ -114,6 +114,44 @@ abstract class JobQueue { */ abstract protected function doIsEmpty(); + /** + * Get the number of available jobs in the queue. + * Queue classes should use caching if they are any slower without memcached. + * + * @return integer + */ + final public function getSize() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetSize(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getSize() + * @return integer + */ + abstract protected function doGetSize(); + + /** + * Get the number of acquired jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * @return integer + */ + final public function getAcquiredCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAcquiredCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getAcquiredCount() + * @return integer + */ + abstract protected function doGetAcquiredCount(); + /** * Push a batch of jobs into the queue * diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 14c1dca257..2917d4898c 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -28,10 +28,11 @@ * @since 1.21 */ class JobQueueDB extends JobQueue { - const CACHE_TTL = 300; // integer; seconds to cache queue information - const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed - const MAX_ATTEMPTS = 3; // integer; number of times to try a job - const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating + const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date + const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed + const MAX_ATTEMPTS = 3; // integer; number of times to try a job + const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random /** * @see JobQueue::doIsEmpty() @@ -40,7 +41,7 @@ class JobQueueDB extends JobQueue { protected function doIsEmpty() { global $wgMemc; - $key = $this->getEmptinessCacheKey(); + $key = $this->getCacheKey( 'empty' ); $isEmpty = $wgMemc->get( $key ); if ( $isEmpty === 'true' ) { @@ -52,11 +53,59 @@ class JobQueueDB extends JobQueue { $found = $this->getSlaveDB()->selectField( // unclaimed job 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ ); - $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL ); + $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); return !$found; } + /** + * @see JobQueue::doGetSize() + * @return integer + */ + protected function doGetSize() { + global $wgMemc; + + $key = $this->getCacheKey( 'size' ); + + $size = $wgMemc->get( $key ); + if ( is_int( $size ) ) { + return $size; + } + + $dbr = $this->getSlaveDB(); + $size = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__ + ); + $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT ); + + return $size; + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return integer + */ + protected function doGetAcquiredCount() { + global $wgMemc; + + $key = $this->getCacheKey( 'acquiredcount' ); + + $count = $wgMemc->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getSlaveDB(); + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, "job_token !={$dbr->addQuotes('')}" ), + __METHOD__ + ); + $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + /** * @see JobQueue::doBatchPush() * @param array $jobs @@ -81,8 +130,8 @@ class JobQueueDB extends JobQueue { } $atomic = ( $flags & self::QoS_Atomic ); - $key = $this->getEmptinessCacheKey(); - $ttl = self::CACHE_TTL; + $key = $this->getCacheKey( 'empty' ); + $ttl = self::CACHE_TTL_LONG; $dbw->onTransactionIdle( function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl @@ -139,7 +188,7 @@ class JobQueueDB extends JobQueue { protected function doPop() { global $wgMemc; - if ( $wgMemc->get( $this->getEmptinessCacheKey() ) === 'true' ) { + if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { return false; // queue is empty } @@ -166,7 +215,7 @@ class JobQueueDB extends JobQueue { } // Check if we found a row to reserve... if ( !$row ) { - $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL ); + $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); break; // nothing to do } wfIncrStats( 'job-pop' ); @@ -495,9 +544,9 @@ class JobQueueDB extends JobQueue { /** * @return string */ - private function getEmptinessCacheKey() { + private function getCacheKey( $property ) { list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'isempty' ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); } /** diff --git a/maintenance/showJobs.php b/maintenance/showJobs.php index 1dceb7907f..8b49517ff8 100644 --- a/maintenance/showJobs.php +++ b/maintenance/showJobs.php @@ -39,21 +39,24 @@ class ShowJobs extends Maintenance { $this->mDescription = "Show number of jobs waiting in master database"; $this->addOption( 'group', 'Show number of jobs per job type' ); } + public function execute() { - $dbw = wfGetDB( DB_MASTER ); + $group = JobQueueGroup::singleton(); if ( $this->hasOption( 'group' ) ) { - $res = $dbw->select( - 'job', - array( 'job_cmd', 'count(*) as count' ), - array(), - __METHOD__, - array( 'GROUP BY' => 'job_cmd' ) - ); - foreach ( $res as $row ) { - $this->output( $row->job_cmd . ': ' . $row->count . "\n" ); + foreach ( $group->getQueueTypes() as $type ) { + $queue = $group->get( $type ); + $pending = $queue->getSize(); + $claimed = $queue->getAcquiredCount(); + if ( ( $pending + $claimed ) > 0 ) { + $this->output( "{$type}: $pending queued; $claimed acquired\n" ); + } } } else { - $this->output( $dbw->selectField( 'job', 'count(*)', '', __METHOD__ ) . "\n" ); + $count = 0; + foreach ( $group->getQueueTypes() as $type ) { + $count += $group->get( $type )->getSize(); + } + $this->output( "$count\n" ); } } } -- 2.20.1