[JobQueue] Factored "root job" de-duplication code into base class.
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 28 Feb 2013 22:21:28 +0000 (14:21 -0800)
committerAaron Schulz <aschulz@wikimedia.org>
Wed, 20 Mar 2013 04:05:33 +0000 (21:05 -0700)
* Moved the root job handling up to the base class and provided
  some default function implementations.
* Also bumped ROOTJOB_TTL from 14 to 28 days.

Change-Id: I70bc043bfc039c5d0b009e0b5d39fd2887f46093

includes/job/JobQueue.php
includes/job/JobQueueDB.php
includes/job/JobQueueRedis.php

index b0dd925..5ef52b5 100644 (file)
@@ -37,6 +37,8 @@ abstract class JobQueue {
 
        const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
 
+       const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
+
        /**
         * @param $params array
         */
@@ -262,6 +264,15 @@ abstract class JobQueue {
                wfProfileIn( __METHOD__ );
                $job = $this->doPop();
                wfProfileOut( __METHOD__ );
+
+               // Flag this job as an old duplicate based on its "root" job...
+               try {
+                       if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
+                               wfIncrStats( 'job-pop-duplicate' );
+                               $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
+                       }
+               } catch ( MWException $e ) {} // don't lose jobs over this
+
                return $job;
        }
 
@@ -344,7 +355,76 @@ abstract class JobQueue {
         * @return bool
         */
        protected function doDeduplicateRootJob( Job $job ) {
-               return true;
+               global $wgMemc;
+
+               $params = $job->getParams();
+               if ( !isset( $params['rootJobSignature'] ) ) {
+                       throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+                       throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+               }
+               $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+               // Callers should call batchInsert() and then this function so that if the insert
+               // fails, the de-duplication registration will be aborted. Since the insert is
+               // deferred till "transaction idle", do the same here, so that the ordering is
+               // maintained. Having only the de-duplication registration succeed would cause
+               // jobs to become no-ops without any actual jobs that made them redundant.
+               $timestamp = $wgMemc->get( $key ); // current last timestamp of this job
+               if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+                       return true; // a newer version of this root job was enqueued
+               }
+
+               // Update the timestamp of the last root job started at the location...
+               return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+       }
+
+       /**
+        * Check if the "root" job of a given job has been superseded by a newer one
+        *
+        * @param $job Job
+        * @return bool
+        * @throws MWException
+        */
+       final protected function isRootJobOldDuplicate( Job $job ) {
+               if ( $job->getType() !== $this->type ) {
+                       throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+               }
+               wfProfileIn( __METHOD__ );
+               $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
+               wfProfileOut( __METHOD__ );
+               return $isDuplicate;
+       }
+
+       /**
+        * @see JobQueue::isRootJobOldDuplicate()
+        * @param Job $job
+        * @return bool
+        */
+       protected function doIsRootJobOldDuplicate( Job $job ) {
+               global $wgMemc;
+
+               $params = $job->getParams();
+               if ( !isset( $params['rootJobSignature'] ) ) {
+                       return false; // job has no de-deplication info
+               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+                       trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
+                       return false;
+               }
+
+               // Get the last time this root job was enqueued
+               $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
+
+               // Check if a new root job was started at the location after this one's...
+               return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+       }
+
+       /**
+        * @param string $signature Hash identifier of the root job
+        * @return string
+        */
+       protected function getRootJobCacheKey( $signature ) {
+               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
        }
 
        /**
index a7a459f..4b22e94 100644 (file)
@@ -28,7 +28,6 @@
  * @since 1.21
  */
 class JobQueueDB extends JobQueue {
-       const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
        const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
        const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
@@ -252,11 +251,6 @@ class JobQueueDB extends JobQueue {
                        $job = Job::factory( $row->job_cmd, $title,
                                self::extractBlob( $row->job_params ), $row->job_id );
                        $job->id = $row->job_id; // XXX: work around broken subclasses
-                       // Flag this job as an old duplicate based on its "root" job...
-                       if ( $this->isRootJobOldDuplicate( $job ) ) {
-                               wfIncrStats( 'job-pop-duplicate' );
-                               $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
-                       }
                        break; // done
                } while( true );
 
@@ -533,30 +527,6 @@ class JobQueueDB extends JobQueue {
                return true;
        }
 
-       /**
-        * Check if the "root" job of a given job has been superseded by a newer one
-        *
-        * @param $job Job
-        * @return bool
-        */
-       protected function isRootJobOldDuplicate( Job $job ) {
-               global $wgMemc;
-
-               $params = $job->getParams();
-               if ( !isset( $params['rootJobSignature'] ) ) {
-                       return false; // job has no de-deplication info
-               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
-                       trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
-                       return false;
-               }
-
-               // Get the last time this root job was enqueued
-               $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
-
-               // Check if a new root job was started at the location after this one's...
-               return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
-       }
-
        /**
         * @see JobQueue::doWaitForBackups()
         * @return void
@@ -671,15 +641,6 @@ class JobQueueDB extends JobQueue {
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
        }
 
-       /**
-        * @param string $signature Hash identifier of the root job
-        * @return string
-        */
-       private function getRootJobCacheKey( $signature ) {
-               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
-       }
-
        /**
         * @param $params
         * @return string
index 41855a7..cfdb867 100644 (file)
@@ -60,7 +60,6 @@ class JobQueueRedis extends JobQueue {
 
        protected $server; // string; server address
 
-       const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
 
        protected $key; // string; key to prefix the queue keys with (used for testing)
@@ -274,14 +273,6 @@ LUA;
                        $this->throwRedisException( $this->server, $conn, $e );
                }
 
-               // Flag this job as an old duplicate based on its "root" job...
-               try {
-                       if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
-                               wfIncrStats( 'job-pop-duplicate' );
-                               return DuplicateJob::newFromJob( $job ); // convert to a no-op
-                       }
-               } catch ( MWException $e ) {} // don't lose jobs over this
-
                return $job;
        }
 
@@ -407,7 +398,7 @@ LUA;
                } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
                        throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
                }
-               $key = $this->getRootJobKey( $params['rootJobSignature'] );
+               $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
 
                $conn = $this->getConnection();
                try {
@@ -423,13 +414,11 @@ LUA;
        }
 
        /**
-        * Check if the "root" job of a given job has been superseded by a newer one
-        *
-        * @param $job Job
+        * @see JobQueue::doIsRootJobOldDuplicate()
+        * @param Job $job
         * @return bool
-        * @throws MWException
         */
-       protected function isRootJobOldDuplicate( Job $job ) {
+       protected function doIsRootJobOldDuplicate( Job $job ) {
                $params = $job->getParams();
                if ( !isset( $params['rootJobSignature'] ) ) {
                        return false; // job has no de-deplication info
@@ -441,7 +430,7 @@ LUA;
                $conn = $this->getConnection();
                try {
                        // Get the last time this root job was enqueued
-                       $timestamp = $conn->get( $this->getRootJobKey( $params['rootJobSignature'] ) );
+                       $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -674,15 +663,6 @@ LUA;
                }
        }
 
-       /**
-        * @param string $signature Hash identifier of the root job
-        * @return string
-        */
-       private function getRootJobKey( $signature ) {
-               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
-       }
-
        /**
         * @param $key string
         * @return void