protected function doWaitForBackups() {
}
- /**
- * Return a map of task names to task definition maps.
- * A "task" is a fast periodic queue maintenance action.
- * Mutually exclusive tasks must implement their own locking in the callback.
- *
- * Each task value is an associative array with:
- * - name : the name of the task
- * - callback : a PHP callable that performs the task
- * - period : the period in seconds corresponding to the task frequency
- *
- * @return array
- */
- final public function getPeriodicTasks() {
- $tasks = $this->doGetPeriodicTasks();
- foreach ( $tasks as $name => &$def ) {
- $def['name'] = $name;
- }
-
- return $tasks;
- }
-
- /**
- * @see JobQueue::getPeriodicTasks()
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array();
- }
-
/**
* Clear any process and persistent caches
*
$job->metadata['id'] = $row->job_id;
break; // done
} while ( true );
+
+ if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+ // Handled jobs that need to be recycled/deleted;
+ // any recycled jobs will be picked up next attempt
+ $this->recycleAndDeleteStaleJobs();
+ }
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
}
- /**
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array(
- 'recycleAndDeleteStaleJobs' => array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- )
- );
- }
-
/**
* @return void
*/
protected function doGetSiblingQueuesWithJobs( array $types ) {
$dbr = $this->getSlaveDB();
+ // @note: this does not check whether the jobs are claimed or not.
+ // This is useful so JobQueueGroup::pop() also sees queues that only
+ // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+ // failed jobs so that they can be popped again for that edge case.
$res = $dbr->select( 'job', 'DISTINCT job_cmd',
array( 'job_cmd' => $types ), __METHOD__ );
$this->throwErrorIfAllPartitionsDown( $failed );
}
- protected function doGetPeriodicTasks() {
- $tasks = array();
- /** @var JobQueue $queue */
- foreach ( $this->partitionQueues as $partition => $queue ) {
- foreach ( $queue->getPeriodicTasks() as $task => $def ) {
- $tasks["{$partition}:{$task}"] = $def;
- }
- }
-
- return $tasks;
- }
-
protected function doFlushCaches() {
static $types = array(
'empty',
return $this->coalescedQueues;
}
- /**
- * Execute any due periodic queue maintenance tasks for all queues.
- *
- * A task is "due" if the time ellapsed since the last run is greater than
- * the defined run period. Concurrent calls to this function will cause tasks
- * to be attempted twice, so they may need their own methods of mutual exclusion.
- *
- * @return int Number of tasks run
- */
- public function executeReadyPeriodicTasks() {
- global $wgMemc;
-
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
- $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
-
- $count = 0;
- $tasksRun = array(); // (queue => task => UNIX timestamp)
- foreach ( $this->getQueueTypes() as $type ) {
- $queue = $this->get( $type );
- foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
- if ( $definition['period'] <= 0 ) {
- continue; // disabled
- } elseif ( !isset( $lastRuns[$type][$task] )
- || $lastRuns[$type][$task] < ( time() - $definition['period'] )
- ) {
- try {
- if ( call_user_func( $definition['callback'] ) !== null ) {
- $tasksRun[$type][$task] = time();
- ++$count;
- }
- } catch ( JobQueueError $e ) {
- MWExceptionHandler::logException( $e );
- }
- }
- }
- }
-
- if ( $count === 0 ) {
- return $count; // nothing to update
- }
-
- $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
- if ( is_array( $lastRuns ) ) {
- foreach ( $tasksRun as $type => $tasks ) {
- foreach ( $tasks as $task => $timestamp ) {
- if ( !isset( $lastRuns[$type][$task] )
- || $timestamp > $lastRuns[$type][$task]
- ) {
- $lastRuns[$type][$task] = $timestamp;
- }
- }
- }
- } else {
- $lastRuns = $tasksRun;
- }
-
- return $lastRuns;
- } );
-
- return $count;
- }
-
/**
* @param string $name
* @return mixed
return $response;
}
- $group = JobQueueGroup::singleton();
- // Handle any required periodic queue maintenance
- $count = $group->executeReadyPeriodicTasks();
- if ( $count > 0 ) {
- $msg = "Executed $count periodic queue task(s).";
- $this->logger->debug( $msg );
- $this->debugCallback( $msg );
- }
-
// Bail out if in read-only mode
if ( wfReadOnly() ) {
$response['reached'] = 'read-only';
return $response;
}
+ $group = JobQueueGroup::singleton();
+
// Flush any pending DB writes for sanity
wfGetLBFactory()->commitMasterChanges();