From dbba2ec623d725cd92c2bb5587d55c8df1dafae3 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Thu, 28 Feb 2013 14:21:28 -0800 Subject: [PATCH] [JobQueue] Factored "root job" de-duplication code into base class. * Moved the root job handling up to the base class and provided some default function implementations. * Also bumped ROOTJOB_TTL from 14 to 28 days. Change-Id: I70bc043bfc039c5d0b009e0b5d39fd2887f46093 --- includes/job/JobQueue.php | 82 +++++++++++++++++++++++++++++++++- includes/job/JobQueueDB.php | 39 ---------------- includes/job/JobQueueRedis.php | 30 +++---------- 3 files changed, 86 insertions(+), 65 deletions(-) diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index b0dd925885..5ef52b5ef1 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -37,6 +37,8 @@ abstract class JobQueue { const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions + const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) + /** * @param $params array */ @@ -262,6 +264,15 @@ abstract class JobQueue { wfProfileIn( __METHOD__ ); $job = $this->doPop(); wfProfileOut( __METHOD__ ); + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + wfIncrStats( 'job-pop-duplicate' ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( MWException $e ) {} // don't lose jobs over this + return $job; } @@ -344,7 +355,76 @@ abstract class JobQueue { * @return bool */ protected function doDeduplicateRootJob( Job $job ) { - return true; + global $wgMemc; + + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); + } + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $timestamp = $wgMemc->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param $job Job + * @return bool + * @throws MWException + */ + final protected function isRootJobOldDuplicate( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); + wfProfileOut( __METHOD__ ); + return $isDuplicate; + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + global $wgMemc; + + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + return false; // job has no de-deplication info + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." ); + return false; + } + + // Get the last time this root job was enqueued + $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); } /** diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index a7a459f5cf..4b22e942ed 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -28,7 +28,6 @@ * @since 1.21 */ class JobQueueDB extends JobQueue { - const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days) const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed @@ -252,11 +251,6 @@ class JobQueueDB extends JobQueue { $job = Job::factory( $row->job_cmd, $title, self::extractBlob( $row->job_params ), $row->job_id ); $job->id = $row->job_id; // XXX: work around broken subclasses - // Flag this job as an old duplicate based on its "root" job... - if ( $this->isRootJobOldDuplicate( $job ) ) { - wfIncrStats( 'job-pop-duplicate' ); - $job = DuplicateJob::newFromJob( $job ); // convert to a no-op - } break; // done } while( true ); @@ -533,30 +527,6 @@ class JobQueueDB extends JobQueue { return true; } - /** - * Check if the "root" job of a given job has been superseded by a newer one - * - * @param $job Job - * @return bool - */ - protected function isRootJobOldDuplicate( Job $job ) { - global $wgMemc; - - $params = $job->getParams(); - if ( !isset( $params['rootJobSignature'] ) ) { - return false; // job has no de-deplication info - } elseif ( !isset( $params['rootJobTimestamp'] ) ) { - trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." ); - return false; - } - - // Get the last time this root job was enqueued - $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); - - // Check if a new root job was started at the location after this one's... - return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); - } - /** * @see JobQueue::doWaitForBackups() * @return void @@ -671,15 +641,6 @@ class JobQueueDB extends JobQueue { return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); } - /** - * @param string $signature Hash identifier of the root job - * @return string - */ - private function getRootJobCacheKey( $signature ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); - } - /** * @param $params * @return string diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 41855a72ff..cfdb8671b3 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -60,7 +60,6 @@ class JobQueueRedis extends JobQueue { protected $server; // string; server address - const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days) const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) protected $key; // string; key to prefix the queue keys with (used for testing) @@ -274,14 +273,6 @@ LUA; $this->throwRedisException( $this->server, $conn, $e ); } - // Flag this job as an old duplicate based on its "root" job... - try { - if ( $job && $this->isRootJobOldDuplicate( $job ) ) { - wfIncrStats( 'job-pop-duplicate' ); - return DuplicateJob::newFromJob( $job ); // convert to a no-op - } - } catch ( MWException $e ) {} // don't lose jobs over this - return $job; } @@ -407,7 +398,7 @@ LUA; } elseif ( !isset( $params['rootJobTimestamp'] ) ) { throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); } - $key = $this->getRootJobKey( $params['rootJobSignature'] ); + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); $conn = $this->getConnection(); try { @@ -423,13 +414,11 @@ LUA; } /** - * Check if the "root" job of a given job has been superseded by a newer one - * - * @param $job Job + * @see JobQueue::doIsRootJobOldDuplicate() + * @param Job $job * @return bool - * @throws MWException */ - protected function isRootJobOldDuplicate( Job $job ) { + protected function doIsRootJobOldDuplicate( Job $job ) { $params = $job->getParams(); if ( !isset( $params['rootJobSignature'] ) ) { return false; // job has no de-deplication info @@ -441,7 +430,7 @@ LUA; $conn = $this->getConnection(); try { // Get the last time this root job was enqueued - $timestamp = $conn->get( $this->getRootJobKey( $params['rootJobSignature'] ) ); + $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } @@ -674,15 +663,6 @@ LUA; } } - /** - * @param string $signature Hash identifier of the root job - * @return string - */ - private function getRootJobKey( $signature ) { - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); - } - /** * @param $key string * @return void -- 2.20.1