Automatically deduplicate root jobs on insertion
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueDB.php
index 320b1b1..e094850 100644 (file)
@@ -29,7 +29,6 @@
  */
 class JobQueueDB extends JobQueue {
        const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
-       const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
        const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
        const MAX_OFFSET = 255; // integer; maximum number of rows to skip
@@ -71,15 +70,6 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doIsEmpty() {
-               $key = $this->getCacheKey( 'empty' );
-
-               $isEmpty = $this->cache->get( $key );
-               if ( $isEmpty === 'true' ) {
-                       return true;
-               } elseif ( $isEmpty === 'false' ) {
-                       return false;
-               }
-
                $dbr = $this->getSlaveDB();
                try {
                        $found = $dbr->selectField( // unclaimed job
@@ -88,7 +78,6 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
-               $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
 
                return !$found;
        }
@@ -272,8 +261,6 @@ class JobQueueDB extends JobQueue {
                        $dbw->commit( $method );
                }
 
-               $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
-
                return;
        }
 
@@ -282,10 +269,6 @@ class JobQueueDB extends JobQueue {
         * @return Job|bool
         */
        protected function doPop() {
-               if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
-                       return false; // queue is empty
-               }
-
                $dbw = $this->getMasterDB();
                try {
                        $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
@@ -308,7 +291,6 @@ class JobQueueDB extends JobQueue {
                                }
                                // Check if we found a row to reserve...
                                if ( !$row ) {
-                                       $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
                                        break; // nothing to do
                                }
                                JobQueue::incrStats( 'job-pop', $this->type );
@@ -319,6 +301,12 @@ class JobQueueDB extends JobQueue {
                                $job->metadata['id'] = $row->job_id;
                                break; // done
                        } while ( true );
+
+                       if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+                               // Handled jobs that need to be recycled/deleted;
+                               // any recycled jobs will be picked up next attempt
+                               $this->recycleAndDeleteStaleJobs();
+                       }
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -489,6 +477,8 @@ class JobQueueDB extends JobQueue {
                        // Delete a row with a single DELETE without holding row locks over RTTs...
                        $dbw->delete( 'job',
                                array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+
+                       JobQueue::incrStats( 'job-ack', $this->type );
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -498,11 +488,11 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doDeduplicateRootJob()
-        * @param Job $job
+        * @param IJobSpecification $job
         * @throws MWException
         * @return bool
         */
-       protected function doDeduplicateRootJob( Job $job ) {
+       protected function doDeduplicateRootJob( IJobSpecification $job ) {
                $params = $job->getParams();
                if ( !isset( $params['rootJobSignature'] ) ) {
                        throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
@@ -553,23 +543,11 @@ class JobQueueDB extends JobQueue {
                wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
        }
 
-       /**
-        * @return array
-        */
-       protected function doGetPeriodicTasks() {
-               return array(
-                       'recycleAndDeleteStaleJobs' => array(
-                               'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
-                               'period' => ceil( $this->claimTTL / 2 )
-                       )
-               );
-       }
-
        /**
         * @return void
         */
        protected function doFlushCaches() {
-               foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+               foreach ( array( 'size', 'acquiredcount' ) as $type ) {
                        $this->cache->delete( $this->getCacheKey( $type ) );
                }
        }
@@ -607,6 +585,10 @@ class JobQueueDB extends JobQueue {
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $dbr = $this->getSlaveDB();
+               // @note: this does not check whether the jobs are claimed or not.
+               // This is useful so JobQueueGroup::pop() also sees queues that only
+               // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+               // failed jobs so that they can be popped again for that edge case.
                $res = $dbr->select( 'job', 'DISTINCT job_cmd',
                        array( 'job_cmd' => $types ), __METHOD__ );
 
@@ -680,8 +662,6 @@ class JobQueueDB extends JobQueue {
                                        $affected = $dbw->affectedRows();
                                        $count += $affected;
                                        JobQueue::incrStats( 'job-recycle', $this->type, $affected );
-                                       // The tasks recycled jobs or release delayed jobs into the queue
-                                       $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
                                        $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
                                }
                        }