*/
$wgJobBackoffThrottling = array();
+/**
+ * Make job runners commit changes for slave-lag prone jobs one job at a time.
+ * This is useful if there are many job workers that race on slave lag checks.
+ * If set, jobs taking this many seconds of DB write time have serialized commits.
+ *
+ * Note that affected jobs may have worse lock contention. Also, if they affect
+ * several DBs at once they may have a smaller chance of being atomic due to the
+ * possibility of connection loss while queueing up to commit. Affected jobs may
+ * also fail due to the commit lock acquisition timeout.
+ *
+ * @var float|bool
+ * @since 1.26
+ */
+$wgJobSerialCommitThreshold = false;
+
/**
* Map of job types to configuration arrays.
* This determines which queue class and storage system is used for each job type.
}
/**
- * Check to see if a named lock is available. This is non-blocking.
+ * Check to see if a named lock is available (non-blocking)
*
* @param string $lockName Name of lock to poll
* @param string $method Name of method calling us
/**
* Acquire a named lock
*
- * Abstracted from Filestore::lock() so child classes can implement for
- * their own needs.
+ * Named locks are not related to transactions
*
* @param string $lockName Name of lock to aquire
* @param string $method Name of method calling us
}
/**
- * Release a lock.
+ * Release a lock
+ *
+ * Named locks are not related to transactions
*
* @param string $lockName Name of lock to release
* @param string $method Name of method calling us
return true;
}
+ /**
+ * Check to see if a named lock used by lock() use blocking queues
+ *
+ * @return bool
+ * @since 1.26
+ */
+ public function namedLocksEnqueue() {
+ return false;
+ }
+
/**
* Lock specific tables
*
/** @var callable|null Debug output handler */
protected $debug;
+ /**
+ * @var LoggerInterface $logger
+ */
+ protected $logger;
+
/**
* @param callable $debug Optional debug output handler
*/
$this->debug = $debug;
}
- /**
- * @var LoggerInterface $logger
- */
- protected $logger;
-
/**
* @param LoggerInterface $logger
+ * @return void
*/
public function setLogger( LoggerInterface $logger ) {
$this->logger = $logger;
++$jobsRun;
$status = $job->run();
$error = $job->getLastError();
- wfGetLBFactory()->commitMasterChanges();
+ $this->commitMasterChanges( $job );
} catch ( Exception $e ) {
MWExceptionHandler::rollbackMasterChangesAndLog( $e );
$status = false;
* @return array Map of (job type => backoff expiry timestamp)
*/
private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
-
$file = wfTempDir() . '/mw-runJobs-backoffs.json';
if ( is_file( $file ) ) {
$noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
* @return array The new backoffs account for $backoffs and the latest file data
*/
private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
-
if ( !$deltas ) {
return $this->loadBackoffs( $backoffs, $mode );
}
call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
}
}
+
+ /**
+ * Commit any DB master changes from a job on all load balancers
+ *
+ * @param Job $job
+ * @throws DBError
+ */
+ private function commitMasterChanges( Job $job ) {
+ global $wgJobSerialCommitThreshold;
+
+ $lb = wfGetLB( wfWikiID() );
+ if ( $wgJobSerialCommitThreshold !== false ) {
+ // Generally, there is one master connection to the local DB
+ $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+ } else {
+ $dbwSerial = false;
+ }
+
+ if ( !$dbwSerial
+ || !$dbwSerial->namedLocksEnqueue()
+ || $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold
+ ) {
+ // Writes are all to foreign DBs, named locks don't form queues,
+ // or $wgJobSerialCommitThreshold is not reached; commit changes now
+ wfGetLBFactory()->commitMasterChanges();
+ return;
+ }
+
+ $ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() );
+ $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
+ $this->logger->info( $msg );
+ $this->debugCallback( $msg );
+
+ // Wait for an exclusive lock to commit
+ if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) {
+ // This will trigger a rollback in the main loop
+ throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
+ }
+ // Wait for the generic slave to catch up
+ $pos = $lb->getMasterPos();
+ if ( $pos ) {
+ $lb->waitForOne( $pos );
+ }
+
+ // Re-ping all masters with transactions. This throws DBError if some
+ // connection died while waiting on locks/slaves, triggering a rollback.
+ wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) {
+ $lb->forEachOpenConnection( function( DatabaseBase $conn ) {
+ if ( $conn->writesOrCallbacksPending() ) {
+ $conn->query( "SELECT 1" );
+ }
+ } );
+ } );
+
+ // Actually commit the DB master changes
+ wfGetLBFactory()->commitMasterChanges();
+
+ // Release the lock
+ $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
+ }
}