* @return void
*/
protected function doWaitForBackups() {}
+
+ /**
+ * Return a map of task names to task definition maps.
+ * A "task" is a fast periodic queue maintenance action.
+ * Mutually exclusive tasks must implement their own locking in the callback.
+ *
+ * Each task value is an associative array with:
+ * - name : the name of the task
+ * - callback : a PHP callable that performs the task
+ * - period : the period in seconds corresponding to the task frequency
+ *
+ * @return Array
+ */
+ final public function getPeriodicTasks() {
+ $tasks = $this->doGetPeriodicTasks();
+ foreach ( $tasks as $name => &$def ) {
+ $def['name'] = $name;
+ }
+ return $tasks;
+ }
+
+ /**
+ * @see JobQueue::getPeriodicTasks()
+ * @return Array
+ */
+ protected function doGetPeriodicTasks() {
+ return array();
+ }
}
* @since 1.21
*/
class JobQueueDB extends JobQueue {
+ const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
$uuid = wfRandomString( 32 ); // pop attempt
$job = false; // job popped off
- // Occasionally recycle jobs back into the queue that have been claimed too long
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->recycleStaleJobs();
- }
do { // retry when our row is invalid or deleted as a duplicate
// Try to reserve a row in the DB...
if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
*
* @return integer Number of jobs recycled/deleted
*/
- protected function recycleStaleJobs() {
+ public function recycleAndDeleteStaleJobs() {
global $wgMemc;
- $now = time();
+ $now = time();
list( $dbw, $scope ) = $this->getMasterDB();
$count = 0; // affected rows
}
// Update the timestamp of the last root job started at the location...
- return $wgMemc->set( $key, $params['rootJobTimestamp'], 14*86400 ); // 2 weeks
+ return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
} );
return true;
wfWaitForSlaves();
}
+ /**
+ * @return Array
+ */
+ protected function doGetPeriodicTasks() {
+ return array(
+ 'recycleAndDeleteStaleJobs' => array(
+ 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+ 'period' => ceil( $this->claimTTL / 2 )
+ )
+ );
+ }
+
/**
* @return Array (DatabaseBase, ScopedCallback)
*/
}
return $types;
}
+
+ /**
+ * Execute any due periodic queue maintenance tasks for all queues.
+ *
+ * A task is "due" if the time ellapsed since the last run is greater than
+ * the defined run period. Concurrent calls to this function will cause tasks
+ * to be attempted twice, so they may need their own methods of mutual exclusion.
+ *
+ * @return integer Number of tasks run
+ */
+ public function executeReadyPeriodicTasks() {
+ global $wgMemc;
+
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
+ $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
+
+ $count = 0;
+ $tasksRun = array(); // (queue => task => UNIX timestamp)
+ foreach ( $this->getQueueTypes() as $type ) {
+ $queue = $this->get( $type );
+ foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
+ if ( $definition['period'] <= 0 ) {
+ continue; // disabled
+ } elseif ( !isset( $lastRuns[$type][$task] )
+ || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
+ {
+ if ( call_user_func( $definition['callback'] ) !== null ) {
+ $tasksRun[$type][$task] = time();
+ ++$count;
+ }
+ }
+ }
+ }
+
+ $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
+ if ( is_array( $lastRuns ) ) {
+ foreach ( $tasksRun as $type => $tasks ) {
+ foreach ( $tasks as $task => $timestamp ) {
+ if ( !isset( $lastRuns[$type][$task] )
+ || $timestamp > $lastRuns[$type][$task] )
+ {
+ $lastRuns[$type][$task] = $timestamp;
+ }
+ }
+ }
+ } else {
+ $lastRuns = $tasksRun;
+ }
+ return $lastRuns;
+ } );
+
+ return $count;
+ }
}
$types = JobQueueGroup::singleton()->getDefaultQueueTypes();
}
+ // Handle any required periodic queue maintenance
+ $this->executeReadyPeriodicTasks();
+
$memcKey = 'jobqueue:dbs:v3';
$pendingDbInfo = $wgMemc->get( $memcKey );
return; // no DBs with jobs or cache is both empty and locked
}
+ $type = $this->getOption( 'type', false );
$pendingDBs = $pendingDbInfo['pendingDBs']; // convenience
do {
$again = false;
$pendingDBs = array(); // (job type => (db list))
foreach ( $wgLocalDatabases as $db ) {
- $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs();
- foreach ( $types as $type ) {
+ foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) {
$pendingDBs[$type][] = $db;
}
}
return $pendingDBs;
}
+
+ /**
+ * Do all ready periodic jobs for all databases every 5 minutes (and .1% of the time)
+ * @return integer
+ */
+ private function executeReadyPeriodicTasks() {
+ global $wgLocalDatabases, $wgMemc;
+
+ $count = 0;
+ $memcKey = 'jobqueue:periodic:lasttime';
+ $timestamp = (int)$wgMemc->get( $memcKey ); // UNIX timestamp or 0
+ if ( ( time() - $timestamp ) > 300 || mt_rand( 0, 999 ) == 0 ) { // 5 minutes
+ if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock
+ foreach ( $wgLocalDatabases as $db ) {
+ $count += JobQueueGroup::singleton( $db )->executeReadyPeriodicTasks();
+ }
+ $wgMemc->set( $memcKey, time() );
+ $wgMemc->delete( "$memcKey:rebuild" ); // unlock
+ }
+ }
+
+ return $count;
+ }
}
$maintClass = "nextJobDb";
$n = 0;
$group = JobQueueGroup::singleton();
+ // Handle any required periodic queue maintenance
+ $count = $group->executeReadyPeriodicTasks();
+ if ( $count > 0 ) {
+ $this->runJobsLog( "Executed $count periodic queue task(s)." );
+ }
+
do {
$job = ( $type === false )
? $group->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE )