$job->metadata['id'] = $row->job_id;
break; // done
} while ( true );
+
+ if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+ // Handled jobs that need to be recycled/deleted;
+ // any recycled jobs will be picked up next attempt
+ $this->recycleAndDeleteStaleJobs();
+ }
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
// Delete a row with a single DELETE without holding row locks over RTTs...
$dbw->delete( 'job',
array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+
+ JobQueue::incrStats( 'job-ack', $this->type );
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
}
- /**
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array(
- 'recycleAndDeleteStaleJobs' => array(
- 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
- )
- );
- }
-
/**
* @return void
*/
protected function doGetSiblingQueuesWithJobs( array $types ) {
$dbr = $this->getSlaveDB();
+ // @note: this does not check whether the jobs are claimed or not.
+ // This is useful so JobQueueGroup::pop() also sees queues that only
+ // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+ // failed jobs so that they can be popped again for that edge case.
$res = $dbr->select( 'job', 'DISTINCT job_cmd',
array( 'job_cmd' => $types ), __METHOD__ );
$this->throwErrorIfAllPartitionsDown( $failed );
}
- protected function doGetPeriodicTasks() {
- $tasks = array();
- /** @var JobQueue $queue */
- foreach ( $this->partitionQueues as $partition => $queue ) {
- foreach ( $queue->getPeriodicTasks() as $task => $def ) {
- $tasks["{$partition}:{$task}"] = $def;
- }
- }
-
- return $tasks;
- }
-
protected function doFlushCaches() {
- static $types = array(
- 'empty',
- 'size',
- 'acquiredcount',
- 'delayedcount',
- 'abandonedcount'
- );
-
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$queue->doFlushCaches();
return $response;
}
- $group = JobQueueGroup::singleton();
- // Handle any required periodic queue maintenance
- $count = $group->executeReadyPeriodicTasks();
- if ( $count > 0 ) {
- $msg = "Executed $count periodic queue task(s).";
- $this->logger->debug( $msg );
- $this->debugCallback( $msg );
- }
-
// Bail out if in read-only mode
if ( wfReadOnly() ) {
$response['reached'] = 'read-only';
return $response;
}
+ $profiler = Profiler::instance();
+
// Catch huge single updates that lead to slave lag
- $trxProfiler = Profiler::instance()->getTransactionProfiler();
+ $trxProfiler = $profiler->getTransactionProfiler();
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
$trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
return $response;
}
+ $group = JobQueueGroup::singleton();
+
// Flush any pending DB writes for sanity
wfGetLBFactory()->commitMasterChanges();
}
$msg = $job->toString() . " STARTING";
- $this->logger->info( $msg );
+ $this->logger->debug( $msg );
$this->debugCallback( $msg );
// Run the job...
+ $psection = $profiler->scopedProfileIn( __METHOD__ . '-' . $jType );
$jobStartTime = microtime( true );
try {
++$jobsRun;
}
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
$timeMsTotal += $timeMs;
+ $profiler->scopedProfileOut( $psection );
// Mark the job as done on success or when the job cannot be retried
if ( $status !== false || !$job->allowRetries() ) {