Merge "Removed executeReadyPeriodicTasks() method"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Tue, 12 May 2015 20:03:02 +0000 (20:03 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Tue, 12 May 2015 20:03:02 +0000 (20:03 +0000)
includes/jobqueue/JobQueue.php
includes/jobqueue/JobQueueDB.php
includes/jobqueue/JobQueueFederated.php
includes/jobqueue/JobQueueGroup.php
includes/jobqueue/JobRunner.php

index 7df85ff..fd4234d 100644 (file)
@@ -548,35 +548,6 @@ abstract class JobQueue {
        protected function doWaitForBackups() {
        }
 
-       /**
-        * Return a map of task names to task definition maps.
-        * A "task" is a fast periodic queue maintenance action.
-        * Mutually exclusive tasks must implement their own locking in the callback.
-        *
-        * Each task value is an associative array with:
-        *   - name     : the name of the task
-        *   - callback : a PHP callable that performs the task
-        *   - period   : the period in seconds corresponding to the task frequency
-        *
-        * @return array
-        */
-       final public function getPeriodicTasks() {
-               $tasks = $this->doGetPeriodicTasks();
-               foreach ( $tasks as $name => &$def ) {
-                       $def['name'] = $name;
-               }
-
-               return $tasks;
-       }
-
-       /**
-        * @see JobQueue::getPeriodicTasks()
-        * @return array
-        */
-       protected function doGetPeriodicTasks() {
-               return array();
-       }
-
        /**
         * Clear any process and persistent caches
         *
index 000fa4f..491092a 100644 (file)
@@ -301,6 +301,12 @@ class JobQueueDB extends JobQueue {
                                $job->metadata['id'] = $row->job_id;
                                break; // done
                        } while ( true );
+
+                       if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+                               // Handled jobs that need to be recycled/deleted;
+                               // any recycled jobs will be picked up next attempt
+                               $this->recycleAndDeleteStaleJobs();
+                       }
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -537,18 +543,6 @@ class JobQueueDB extends JobQueue {
                wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
        }
 
-       /**
-        * @return array
-        */
-       protected function doGetPeriodicTasks() {
-               return array(
-                       'recycleAndDeleteStaleJobs' => array(
-                               'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
-                               'period' => ceil( $this->claimTTL / 2 )
-                       )
-               );
-       }
-
        /**
         * @return void
         */
@@ -591,6 +585,10 @@ class JobQueueDB extends JobQueue {
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $dbr = $this->getSlaveDB();
+               // @note: this does not check whether the jobs are claimed or not.
+               // This is useful so JobQueueGroup::pop() also sees queues that only
+               // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+               // failed jobs so that they can be popped again for that edge case.
                $res = $dbr->select( 'job', 'DISTINCT job_cmd',
                        array( 'job_cmd' => $types ), __METHOD__ );
 
index b86819e..a35ab84 100644 (file)
@@ -373,18 +373,6 @@ class JobQueueFederated extends JobQueue {
                $this->throwErrorIfAllPartitionsDown( $failed );
        }
 
-       protected function doGetPeriodicTasks() {
-               $tasks = array();
-               /** @var JobQueue $queue */
-               foreach ( $this->partitionQueues as $partition => $queue ) {
-                       foreach ( $queue->getPeriodicTasks() as $task => $def ) {
-                               $tasks["{$partition}:{$task}"] = $def;
-                       }
-               }
-
-               return $tasks;
-       }
-
        protected function doFlushCaches() {
                /** @var JobQueue $queue */
                foreach ( $this->partitionQueues as $queue ) {
index ebd547a..fdf7b87 100644 (file)
@@ -341,69 +341,6 @@ class JobQueueGroup {
                return $this->coalescedQueues;
        }
 
-       /**
-        * Execute any due periodic queue maintenance tasks for all queues.
-        *
-        * A task is "due" if the time ellapsed since the last run is greater than
-        * the defined run period. Concurrent calls to this function will cause tasks
-        * to be attempted twice, so they may need their own methods of mutual exclusion.
-        *
-        * @return int Number of tasks run
-        */
-       public function executeReadyPeriodicTasks() {
-               global $wgMemc;
-
-               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
-               $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
-
-               $count = 0;
-               $tasksRun = array(); // (queue => task => UNIX timestamp)
-               foreach ( $this->getQueueTypes() as $type ) {
-                       $queue = $this->get( $type );
-                       foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
-                               if ( $definition['period'] <= 0 ) {
-                                       continue; // disabled
-                               } elseif ( !isset( $lastRuns[$type][$task] )
-                                       || $lastRuns[$type][$task] < ( time() - $definition['period'] )
-                               ) {
-                                       try {
-                                               if ( call_user_func( $definition['callback'] ) !== null ) {
-                                                       $tasksRun[$type][$task] = time();
-                                                       ++$count;
-                                               }
-                                       } catch ( JobQueueError $e ) {
-                                               MWExceptionHandler::logException( $e );
-                                       }
-                               }
-                       }
-               }
-
-               if ( $count === 0 ) {
-                       return $count; // nothing to update
-               }
-
-               $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
-                       if ( is_array( $lastRuns ) ) {
-                               foreach ( $tasksRun as $type => $tasks ) {
-                                       foreach ( $tasks as $task => $timestamp ) {
-                                               if ( !isset( $lastRuns[$type][$task] )
-                                                       || $timestamp > $lastRuns[$type][$task]
-                                               ) {
-                                                       $lastRuns[$type][$task] = $timestamp;
-                                               }
-                                       }
-                               }
-                       } else {
-                               $lastRuns = $tasksRun;
-                       }
-
-                       return $lastRuns;
-               } );
-
-               return $count;
-       }
-
        /**
         * @param string $name
         * @return mixed
index e84c17b..6bf33aa 100644 (file)
@@ -103,15 +103,6 @@ class JobRunner implements LoggerAwareInterface {
                        return $response;
                }
 
-               $group = JobQueueGroup::singleton();
-               // Handle any required periodic queue maintenance
-               $count = $group->executeReadyPeriodicTasks();
-               if ( $count > 0 ) {
-                       $msg = "Executed $count periodic queue task(s).";
-                       $this->logger->debug( $msg );
-                       $this->debugCallback( $msg );
-               }
-
                // Bail out if in read-only mode
                if ( wfReadOnly() ) {
                        $response['reached'] = 'read-only';
@@ -134,6 +125,8 @@ class JobRunner implements LoggerAwareInterface {
                        return $response;
                }
 
+               $group = JobQueueGroup::singleton();
+               
                // Flush any pending DB writes for sanity
                wfGetLBFactory()->commitMasterChanges();