*/
class JobQueueDB extends JobQueue {
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
const MAX_OFFSET = 255; // integer; maximum number of rows to skip
* @return bool
*/
protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
$dbr = $this->getSlaveDB();
try {
$found = $dbr->selectField( // unclaimed job
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
- $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
return !$found;
}
foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->insert( 'job', $rowBatch, $method );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
+ JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
JobQueue::incrStats(
'job-insert-duplicate',
$this->type,
- count( $rowSet ) + count( $rowList ) - count( $rows ),
- $this->wiki
+ count( $rowSet ) + count( $rowList ) - count( $rows )
);
} catch ( DBError $e ) {
if ( $flags & self::QOS_ATOMIC ) {
$dbw->commit( $method );
}
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
-
return;
}
* @return Job|bool
*/
protected function doPop() {
- if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
- return false; // queue is empty
- }
-
$dbw = $this->getMasterDB();
try {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
}
// Check if we found a row to reserve...
if ( !$row ) {
- $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
break; // nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'job-pop', $this->type );
// Get the job object from the row...
$title = Title::makeTitle( $row->job_namespace, $row->job_title );
$job = Job::factory( $row->job_cmd, $title,
$job->metadata['id'] = $row->job_id;
break; // done
} while ( true );
+
+ if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+ // Handled jobs that need to be recycled/deleted;
+ // any recycled jobs will be picked up next attempt
+ $this->recycleAndDeleteStaleJobs();
+ }
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
// Delete a row with a single DELETE without holding row locks over RTTs...
$dbw->delete( 'job',
array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+
+ JobQueue::incrStats( 'job-ack', $this->type );
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
/**
* @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @throws MWException
* @return bool
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
}
- /**
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array(
- 'recycleAndDeleteStaleJobs' => array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- )
- );
- }
-
/**
* @return void
*/
protected function doFlushCaches() {
- foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+ foreach ( array( 'size', 'acquiredcount' ) as $type ) {
$this->cache->delete( $this->getCacheKey( $type ) );
}
}
protected function doGetSiblingQueuesWithJobs( array $types ) {
$dbr = $this->getSlaveDB();
+ // @note: this does not check whether the jobs are claimed or not.
+ // This is useful so JobQueueGroup::pop() also sees queues that only
+ // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+ // failed jobs so that they can be popped again for that edge case.
$res = $dbr->select( 'job', 'DISTINCT job_cmd',
array( 'job_cmd' => $types ), __METHOD__ );
);
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
- // The tasks recycled jobs or release delayed jobs into the queue
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+ JobQueue::incrStats( 'job-recycle', $this->type, $affected );
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
}
$dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
+ JobQueue::incrStats( 'job-abandon', $this->type, $affected );
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );