if ( count( $jobs ) ) {
$dbw = $this->getMasterDB();
- $rows = array();
+ $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
+ $rowList = array(); // list of jobs for jobs that are are not de-duplicated
+
foreach ( $jobs as $job ) {
- $rows[] = $this->insertFields( $job );
+ $row = $this->insertFields( $job );
+ if ( $job->ignoreDuplicates() ) {
+ $rowSet[$row['job_sha1']] = $row;
+ } else {
+ $rowList[] = $row;
+ }
}
+
$atomic = ( $flags & self::QoS_Atomic );
$key = $this->getEmptinessCacheKey();
$ttl = self::CACHE_TTL;
- $dbw->onTransactionIdle( function() use ( $dbw, $rows, $atomic, $key, $ttl ) {
+ $dbw->onTransactionIdle(
+ function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl
+ ) {
global $wgMemc;
$autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
}
try {
- foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { // avoid slave lag
+ // Strip out any duplicate jobs that are already in the queue...
+ if ( count( $rowSet ) ) {
+ $res = $dbw->select( 'job', 'job_sha1',
+ array(
+ // No job_type condition since it's part of the job_sha1 hash
+ 'job_sha1' => array_keys( $rowSet ),
+ 'job_token' => '' // unclaimed
+ ),
+ __METHOD__
+ );
+ foreach ( $res as $row ) {
+ wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
+ unset( $rowSet[$row->job_sha1] ); // already enqueued
+ }
+ }
+ // Build the full list of job rows to insert
+ $rows = array_merge( $rowList, array_values( $rowSet ) );
+ // Insert the job rows in chunks to avoid slave lag...
+ foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->insert( 'job', $rowBatch, __METHOD__ );
}
} catch ( DBError $e ) {
}
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
- // Delete any *other* duplicate jobs in the queue...
- if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
- $dbw->delete( 'job',
- array( 'job_sha1' => $row->job_sha1,
- "job_id != {$dbw->addQuotes( $row->job_id )}" ),
- __METHOD__
- );
- wfIncrStats( 'job-pop', $dbw->affectedRows() );
- }
// Flag this job as an old duplicate based on its "root" job...
if ( $this->isRootJobOldDuplicate( $job ) ) {
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op