Automatically deduplicate root jobs on insertion
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueDB.php
index b1b650b..e094850 100644 (file)
@@ -301,6 +301,12 @@ class JobQueueDB extends JobQueue {
                                $job->metadata['id'] = $row->job_id;
                                break; // done
                        } while ( true );
+
+                       if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+                               // Handled jobs that need to be recycled/deleted;
+                               // any recycled jobs will be picked up next attempt
+                               $this->recycleAndDeleteStaleJobs();
+                       }
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -471,6 +477,8 @@ class JobQueueDB extends JobQueue {
                        // Delete a row with a single DELETE without holding row locks over RTTs...
                        $dbw->delete( 'job',
                                array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+
+                       JobQueue::incrStats( 'job-ack', $this->type );
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -480,11 +488,11 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doDeduplicateRootJob()
-        * @param Job $job
+        * @param IJobSpecification $job
         * @throws MWException
         * @return bool
         */
-       protected function doDeduplicateRootJob( Job $job ) {
+       protected function doDeduplicateRootJob( IJobSpecification $job ) {
                $params = $job->getParams();
                if ( !isset( $params['rootJobSignature'] ) ) {
                        throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
@@ -535,18 +543,6 @@ class JobQueueDB extends JobQueue {
                wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
        }
 
-       /**
-        * @return array
-        */
-       protected function doGetPeriodicTasks() {
-               return array(
-                       'recycleAndDeleteStaleJobs' => array(
-                               'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
-                               'period' => ceil( $this->claimTTL / 2 )
-                       )
-               );
-       }
-
        /**
         * @return void
         */
@@ -589,6 +585,10 @@ class JobQueueDB extends JobQueue {
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $dbr = $this->getSlaveDB();
+               // @note: this does not check whether the jobs are claimed or not.
+               // This is useful so JobQueueGroup::pop() also sees queues that only
+               // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+               // failed jobs so that they can be popped again for that edge case.
                $res = $dbr->select( 'job', 'DISTINCT job_cmd',
                        array( 'job_cmd' => $types ), __METHOD__ );