X-Git-Url: https://git.cyclocoop.org/%242?a=blobdiff_plain;f=includes%2Fjob%2FJobQueueDB.php;h=4d0f294bc0478a7ab344d10c26f4418105b88dae;hb=2df4cb35478f9946b3538a8b0ddc86b2dfad2124;hp=cf6f1b97cf02b6bbd4ef41372896fc9138a034af;hpb=d249317054ce04cdcc863887f0848e326b7ee251;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index cf6f1b97cf..4d0f294bc0 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -49,15 +49,19 @@ class JobQueueDB extends JobQueue { return false; } - $found = $this->getSlaveDB()->selectField( - 'job', '1', array( 'job_cmd' => $this->type ), __METHOD__ + $found = $this->getSlaveDB()->selectField( // unclaimed job + 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ ); $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL ); + return (bool)$found; } /** * @see JobQueue::doBatchPush() + * @param array $jobs + * @param $flags + * @throws DBError|Exception * @return bool */ protected function doBatchPush( array $jobs, $flags ) { @@ -174,6 +178,7 @@ class JobQueueDB extends JobQueue { } $job = Job::factory( $row->job_cmd, $title, self::extractBlob( $row->job_params ), $row->job_id ); + $job->id = $row->job_id; // XXX: work around broken subclasses // Flag this job as an old duplicate based on its "root" job... if ( $this->isRootJobOldDuplicate( $job ) ) { $job = DuplicateJob::newFromJob( $job ); // convert to a no-op @@ -304,26 +309,42 @@ class JobQueueDB extends JobQueue { $dbw = $this->getMasterDB(); $count = 0; // affected rows - if ( $this->claimTTL > 0 ) { // re-try stale jobs... + if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { + return $count; // already in progress + } + + // Remove claims on jobs acquired for too long if enabled... + if ( $this->claimTTL > 0 ) { $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); - // Reset job_token for these jobs so that other runners will pick them up. - // Set the timestamp to the current time, as it is useful to now that the job - // was already tried before. - $dbw->update( 'job', - array( - 'job_token' => '', - 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + // Get the IDs of jobs that have be claimed but not finished after too long. + // These jobs can be recycled into the queue by expiring the claim. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', array( 'job_cmd' => $this->type, "job_token != {$dbw->addQuotes( '' )}", // was acquired "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale - "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ), + "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ), // retries left __METHOD__ ); - $count += $dbw->affectedRows(); + $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); + if ( count( $ids ) ) { + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before (the timestamp becomes the "released" time). + $dbw->update( 'job', + array( + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + array( + 'job_id' => $ids ), + __METHOD__ + ); + $count += $dbw->affectedRows(); + } } - // Just destroy stale jobs... + // Just destroy any stale jobs... $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); $conds = array( 'job_cmd' => $this->type, @@ -333,28 +354,45 @@ class JobQueueDB extends JobQueue { if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... $conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}"; } - $dbw->delete( 'job', $conds, __METHOD__ ); - $count += $dbw->affectedRows(); + // Get the IDs of jobs that are considered stale and should be removed. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); + $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); + if ( count( $ids ) ) { + $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); + $count += $dbw->affectedRows(); + } + + $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); return $count; } /** * @see JobQueue::doAck() + * @param Job $job + * @throws MWException * @return Job|bool */ protected function doAck( Job $job ) { + if ( !$job->getId() ) { + throw new MWException( "Job of type '{$job->getType()}' has no ID." ); + } + $dbw = $this->getMasterDB(); $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction // Delete a row with a single DELETE without holding row locks over RTTs... - $dbw->delete( 'job', array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ) ); + $dbw->delete( 'job', + array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ), __METHOD__ ); return true; } /** * @see JobQueue::doDeduplicateRootJob() + * @param Job $job + * @throws MWException * @return bool */ protected function doDeduplicateRootJob( Job $job ) {