* @since 1.21
*/
class JobQueueDB extends JobQueue {
- const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
+ const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
- const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
- const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
- const MAX_OFFSET = 255; // integer; maximum number of rows to skip
+ const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
+ const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
+ const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
+ const MAX_OFFSET = 255; // integer; maximum number of rows to skip
protected $cluster = false; // string; name of an external DB cluster
$this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
}
+ protected function supportedOrders() {
+ return array( 'random', 'timestamp', 'fifo' );
+ }
+
+ protected function optimalOrder() {
+ return 'random';
+ }
+
/**
* @see JobQueue::doIsEmpty()
* @return bool
protected function doGetAcquiredCount() {
global $wgMemc;
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+
$key = $this->getCacheKey( 'acquiredcount' );
$count = $wgMemc->get( $key );
}
}
+ $key = $this->getCacheKey( 'empty' );
$atomic = ( $flags & self::QoS_Atomic );
- $key = $this->getCacheKey( 'empty' );
- $ttl = self::CACHE_TTL_LONG;
$dbw->onTransactionIdle(
- function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl, $scope
+ function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $scope
) {
global $wgMemc;
$dbw->commit( __METHOD__ );
}
- $wgMemc->set( $key, 'false', $ttl ); // queue is not empty
+ $wgMemc->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
} );
}
$row = $this->claimOldest( $uuid );
} else { // random first
$rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
- $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
- $row = $this->claimRandom( $uuid, $rand, $gte );
+ $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+ $row = $this->claimRandom( $uuid, $rand, $gte );
}
// Check if we found a row to reserve...
if ( !$row ) {
/**
* Reserve a row with a single UPDATE without holding row locks over RTTs...
*
- * @param $uuid string 32 char hex string
+ * @param string $uuid 32 char hex string
* @param $rand integer Random unsigned integer (31 bits)
- * @param $gte bool Search for job_random >= $random (otherwise job_random <= $random)
+ * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
* @return Row|false
*/
protected function claimRandom( $uuid, $rand, $gte ) {
// For small queues, using OFFSET will overshoot and return no rows more often.
// Instead, this uses job_random to pick a row (possibly checking both directions).
$ineq = $gte ? '>=' : '<=';
- $dir = $gte ? 'ASC' : 'DESC';
- $row = $dbw->selectRow( 'job', '*', // find a random job
+ $dir = $gte ? 'ASC' : 'DESC';
+ $row = $dbw->selectRow( 'job', '*', // find a random job
array(
'job_cmd' => $this->type,
'job_token' => '', // unclaimed
/**
* Reserve a row with a single UPDATE without holding row locks over RTTs...
*
- * @param $uuid string 32 char hex string
+ * @param string $uuid 32 char hex string
* @return Row|false
*/
protected function claimOldest( $uuid ) {
'job_cmd' => $this->type,
"job_token != {$dbw->addQuotes( '' )}", // was acquired
"job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
- "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ), // retries left
+ "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
__METHOD__
);
$ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) );
"job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
);
if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
- $conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}";
+ $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
}
// Get the IDs of jobs that are considered stale and should be removed. Selecting
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
);
}
+ /**
+ * @return void
+ */
+ protected function doFlushCaches() {
+ global $wgMemc;
+
+ foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+ $wgMemc->delete( $this->getCacheKey( $type ) );
+ }
+ }
+
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllQueuedJobs() {
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ return new MappedIterator(
+ $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
+ function( $row ) use ( $scope ) {
+ $job = Job::factory(
+ $row->job_cmd,
+ Title::makeTitle( $row->job_namespace, $row->job_title ),
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
+ $row->job_id
+ );
+ $job->id = $row->job_id; // XXX: work around broken subclasses
+ return $job;
+ }
+ );
+ }
+
/**
* @return Array (DatabaseBase, ScopedCallback)
*/