Merge "Removed executeReadyPeriodicTasks() method"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Tue, 12 May 2015 20:03:02 +0000 (20:03 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Tue, 12 May 2015 20:03:02 +0000 (20:03 +0000)
1  2 
includes/jobqueue/JobQueueDB.php
includes/jobqueue/JobQueueFederated.php
includes/jobqueue/JobRunner.php

@@@ -301,6 -301,12 +301,12 @@@ class JobQueueDB extends JobQueue 
                                $job->metadata['id'] = $row->job_id;
                                break; // done
                        } while ( true );
+                       if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+                               // Handled jobs that need to be recycled/deleted;
+                               // any recycled jobs will be picked up next attempt
+                               $this->recycleAndDeleteStaleJobs();
+                       }
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
                        // Delete a row with a single DELETE without holding row locks over RTTs...
                        $dbw->delete( 'job',
                                array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
 +
 +                      JobQueue::incrStats( 'job-ack', $this->type );
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
                wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
        }
  
-       /**
-        * @return array
-        */
-       protected function doGetPeriodicTasks() {
-               return array(
-                       'recycleAndDeleteStaleJobs' => array(
-                               'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
-                               'period' => ceil( $this->claimTTL / 2 )
-                       )
-               );
-       }
        /**
         * @return void
         */
  
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $dbr = $this->getSlaveDB();
+               // @note: this does not check whether the jobs are claimed or not.
+               // This is useful so JobQueueGroup::pop() also sees queues that only
+               // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+               // failed jobs so that they can be popped again for that edge case.
                $res = $dbr->select( 'job', 'DISTINCT job_cmd',
                        array( 'job_cmd' => $types ), __METHOD__ );
  
@@@ -373,19 -373,15 +373,7 @@@ class JobQueueFederated extends JobQueu
                $this->throwErrorIfAllPartitionsDown( $failed );
        }
  
-       protected function doGetPeriodicTasks() {
-               $tasks = array();
-               /** @var JobQueue $queue */
-               foreach ( $this->partitionQueues as $partition => $queue ) {
-                       foreach ( $queue->getPeriodicTasks() as $task => $def ) {
-                               $tasks["{$partition}:{$task}"] = $def;
-                       }
-               }
-               return $tasks;
-       }
        protected function doFlushCaches() {
 -              static $types = array(
 -                      'empty',
 -                      'size',
 -                      'acquiredcount',
 -                      'delayedcount',
 -                      'abandonedcount'
 -              );
 -
                /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->doFlushCaches();
@@@ -103,25 -103,14 +103,16 @@@ class JobRunner implements LoggerAwareI
                        return $response;
                }
  
-               $group = JobQueueGroup::singleton();
-               // Handle any required periodic queue maintenance
-               $count = $group->executeReadyPeriodicTasks();
-               if ( $count > 0 ) {
-                       $msg = "Executed $count periodic queue task(s).";
-                       $this->logger->debug( $msg );
-                       $this->debugCallback( $msg );
-               }
                // Bail out if in read-only mode
                if ( wfReadOnly() ) {
                        $response['reached'] = 'read-only';
                        return $response;
                }
  
 +              $profiler = Profiler::instance();
 +
                // Catch huge single updates that lead to slave lag
 -              $trxProfiler = Profiler::instance()->getTransactionProfiler();
 +              $trxProfiler = $profiler->getTransactionProfiler();
                $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
                $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
  
                        return $response;
                }
  
+               $group = JobQueueGroup::singleton();
+               
                // Flush any pending DB writes for sanity
                wfGetLBFactory()->commitMasterChanges();
  
                                }
  
                                $msg = $job->toString() . " STARTING";
 -                              $this->logger->info( $msg );
 +                              $this->logger->debug( $msg );
                                $this->debugCallback( $msg );
  
                                // Run the job...
 +                              $psection = $profiler->scopedProfileIn( __METHOD__ . '-' . $jType );
                                $jobStartTime = microtime( true );
                                try {
                                        ++$jobsRun;
                                }
                                $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
                                $timeMsTotal += $timeMs;
 +                              $profiler->scopedProfileOut( $psection );
  
                                // Mark the job as done on success or when the job cannot be retried
                                if ( $status !== false || !$job->allowRetries() ) {