const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
+ const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
+
/**
* @param $params array
*/
wfProfileIn( __METHOD__ );
$job = $this->doPop();
wfProfileOut( __METHOD__ );
+
+ // Flag this job as an old duplicate based on its "root" job...
+ try {
+ if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
+ wfIncrStats( 'job-pop-duplicate' );
+ $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
+ }
+ } catch ( MWException $e ) {} // don't lose jobs over this
+
return $job;
}
* @return bool
*/
protected function doDeduplicateRootJob( Job $job ) {
- return true;
+ global $wgMemc;
+
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+ }
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Callers should call batchInsert() and then this function so that if the insert
+ // fails, the de-duplication registration will be aborted. Since the insert is
+ // deferred till "transaction idle", do the same here, so that the ordering is
+ // maintained. Having only the de-duplication registration succeed would cause
+ // jobs to become no-ops without any actual jobs that made them redundant.
+ $timestamp = $wgMemc->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ }
+
+ /**
+ * Check if the "root" job of a given job has been superseded by a newer one
+ *
+ * @param $job Job
+ * @return bool
+ * @throws MWException
+ */
+ final protected function isRootJobOldDuplicate( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
+ wfProfileOut( __METHOD__ );
+ return $isDuplicate;
+ }
+
+ /**
+ * @see JobQueue::isRootJobOldDuplicate()
+ * @param Job $job
+ * @return bool
+ */
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ global $wgMemc;
+
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ return false; // job has no de-deplication info
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
+ return false;
+ }
+
+ // Get the last time this root job was enqueued
+ $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
+
+ // Check if a new root job was started at the location after this one's...
+ return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+ }
+
+ /**
+ * @param string $signature Hash identifier of the root job
+ * @return string
+ */
+ protected function getRootJobCacheKey( $signature ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
}
/**
* @since 1.21
*/
class JobQueueDB extends JobQueue {
- const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->id = $row->job_id; // XXX: work around broken subclasses
- // Flag this job as an old duplicate based on its "root" job...
- if ( $this->isRootJobOldDuplicate( $job ) ) {
- wfIncrStats( 'job-pop-duplicate' );
- $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
- }
break; // done
} while( true );
return true;
}
- /**
- * Check if the "root" job of a given job has been superseded by a newer one
- *
- * @param $job Job
- * @return bool
- */
- protected function isRootJobOldDuplicate( Job $job ) {
- global $wgMemc;
-
- $params = $job->getParams();
- if ( !isset( $params['rootJobSignature'] ) ) {
- return false; // job has no de-deplication info
- } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
- trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
- return false;
- }
-
- // Get the last time this root job was enqueued
- $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
-
- // Check if a new root job was started at the location after this one's...
- return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
- }
-
/**
* @see JobQueue::doWaitForBackups()
* @return void
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
}
- /**
- * @param string $signature Hash identifier of the root job
- * @return string
- */
- private function getRootJobCacheKey( $signature ) {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
- }
-
/**
* @param $params
* @return string
protected $server; // string; server address
- const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
protected $key; // string; key to prefix the queue keys with (used for testing)
$this->throwRedisException( $this->server, $conn, $e );
}
- // Flag this job as an old duplicate based on its "root" job...
- try {
- if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
- wfIncrStats( 'job-pop-duplicate' );
- return DuplicateJob::newFromJob( $job ); // convert to a no-op
- }
- } catch ( MWException $e ) {} // don't lose jobs over this
-
return $job;
}
} elseif ( !isset( $params['rootJobTimestamp'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
}
- $key = $this->getRootJobKey( $params['rootJobSignature'] );
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
$conn = $this->getConnection();
try {
}
/**
- * Check if the "root" job of a given job has been superseded by a newer one
- *
- * @param $job Job
+ * @see JobQueue::doIsRootJobOldDuplicate()
+ * @param Job $job
* @return bool
- * @throws MWException
*/
- protected function isRootJobOldDuplicate( Job $job ) {
+ protected function doIsRootJobOldDuplicate( Job $job ) {
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
return false; // job has no de-deplication info
$conn = $this->getConnection();
try {
// Get the last time this root job was enqueued
- $timestamp = $conn->get( $this->getRootJobKey( $params['rootJobSignature'] ) );
+ $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
}
}
- /**
- * @param string $signature Hash identifier of the root job
- * @return string
- */
- private function getRootJobKey( $signature ) {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
- }
-
/**
* @param $key string
* @return void