use MediaWiki\MediaWikiServices;
use MediaWiki\Logger\LoggerFactory;
-use Liuggio\StatsdClient\Factory\StatsdDataFactory;
+use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Wikimedia\ScopedCallback;
}
// Bail out if there is too much DB lag.
// This check should not block as we want to try other wiki queues.
- list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLag();
+ list( , $maxLag ) = $lbFactory->getMainLB()->getMaxLag();
if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
$response['reached'] = 'replica-lag-limit';
return $response;
}
/**
- * @param Job $job
+ * @param RunnableJob $job
* @param LBFactory $lbFactory
- * @param StatsdDataFactory $stats
+ * @param StatsdDataFactoryInterface $stats
* @param float $popTime
* @return array Map of status/error/timeMs
*/
- private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) {
+ private function executeJob( RunnableJob $job, LBFactory $lbFactory, $stats, $popTime ) {
$jType = $job->getType();
$msg = $job->toString() . " STARTING";
$this->logger->debug( $msg, [
] );
$this->debugCallback( $msg );
+ // Clear out title cache data from prior snapshots
+ // (e.g. from before JobRunner was invoked in this process)
+ MediaWikiServices::getInstance()->getLinkCache()->clear();
+
// Run the job...
$rssStart = $this->getMaxRssKb();
$jobStartTime = microtime( true );
try {
$fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
- if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
- $lbFactory->beginMasterChanges( $fnameTrxOwner );
+ // Flush any pending changes left over from an implicit transaction round
+ if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
+ $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
+ } else {
+ $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
}
+ // Clear any stale REPEATABLE-READ snapshots from replica DB connections
+ $lbFactory->flushReplicaSnapshots( $fnameTrxOwner );
$status = $job->run();
$error = $job->getLastError();
+ // Commit all pending changes from this job
$this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
- // Important: this must be the last deferred update added (T100085, T154425)
- DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
// Run any deferred update tasks; doUpdates() manages transactions itself
DeferredUpdates::doUpdates();
} catch ( Exception $e ) {
MWExceptionHandler::logException( $e );
}
- // Commit all outstanding connections that are in a transaction
- // to get a fresh repeatable read snapshot on every connection.
- // Note that jobs are still responsible for handling replica DB lag.
- $lbFactory->flushReplicaSnapshots( __METHOD__ );
- // Clear out title cache data from prior snapshots
- MediaWikiServices::getInstance()->getLinkCache()->clear();
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
$rssEnd = $this->getMaxRssKb();
}
/**
- * @param Job $job
+ * @param RunnableJob $job
* @return int Seconds for this runner to avoid doing more jobs of this type
* @see $wgJobBackoffThrottling
*/
- private function getBackoffTimeToWait( Job $job ) {
+ private function getBackoffTimeToWait( RunnableJob $job ) {
$throttling = $this->config->get( 'JobBackoffThrottling' );
if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
* $wgJobSerialCommitThreshold for more.
*
* @param LBFactory $lbFactory
- * @param Job $job
+ * @param RunnableJob $job
* @param string $fnameTrxOwner
* @throws DBError
*/
- private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
+ private function commitMasterChanges( LBFactory $lbFactory, RunnableJob $job, $fnameTrxOwner ) {
$syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
$time = false;
- $lb = $lbFactory->getMainLB( wfWikiID() );
- if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
+ $lb = $lbFactory->getMainLB();
+ if ( $syncThreshold !== false && $lb->hasStreamingReplicaServers() ) {
// Generally, there is one master connection to the local DB
$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
// We need natively blocking fast locks
$this->debugCallback( $msg );
// Wait for an exclusive lock to commit
- if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) {
+ if ( !$dbwSerial->lock( 'jobrunner-serial-commit', $fnameTrxOwner, 30 ) ) {
// This will trigger a rollback in the main loop
throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
}
- $unlocker = new ScopedCallback( function () use ( $dbwSerial ) {
- $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
+ $unlocker = new ScopedCallback( function () use ( $dbwSerial, $fnameTrxOwner ) {
+ $dbwSerial->unlock( 'jobrunner-serial-commit', $fnameTrxOwner );
} );
// Wait for the replica DBs to catch up