From 2fbce9d970a4f5f6ea3da482f61f3083c82df42f Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Wed, 28 Nov 2012 16:09:46 -0800 Subject: [PATCH] [JobQueue] Use SELECT for deduplication rather than more expensive DELETEs. Change-Id: Iff5dbc0ca6dc7032596d1b4ffddb3cc1848b9e10 --- includes/job/JobQueueDB.php | 45 ++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 80bc6d5898..67c5083608 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -64,15 +64,25 @@ class JobQueueDB extends JobQueue { if ( count( $jobs ) ) { $dbw = $this->getMasterDB(); - $rows = array(); + $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated + $rowList = array(); // list of jobs for jobs that are are not de-duplicated + foreach ( $jobs as $job ) { - $rows[] = $this->insertFields( $job ); + $row = $this->insertFields( $job ); + if ( $job->ignoreDuplicates() ) { + $rowSet[$row['job_sha1']] = $row; + } else { + $rowList[] = $row; + } } + $atomic = ( $flags & self::QoS_Atomic ); $key = $this->getEmptinessCacheKey(); $ttl = self::CACHE_TTL; - $dbw->onTransactionIdle( function() use ( $dbw, $rows, $atomic, $key, $ttl ) { + $dbw->onTransactionIdle( + function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl + ) { global $wgMemc; $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled? @@ -82,7 +92,25 @@ class JobQueueDB extends JobQueue { $dbw->clearFlag( DBO_TRX ); // make each query its own transaction } try { - foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { // avoid slave lag + // Strip out any duplicate jobs that are already in the queue... + if ( count( $rowSet ) ) { + $res = $dbw->select( 'job', 'job_sha1', + array( + // No job_type condition since it's part of the job_sha1 hash + 'job_sha1' => array_keys( $rowSet ), + 'job_token' => '' // unclaimed + ), + __METHOD__ + ); + foreach ( $res as $row ) { + wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); + unset( $rowSet[$row->job_sha1] ); // already enqueued + } + } + // Build the full list of job rows to insert + $rows = array_merge( $rowList, array_values( $rowSet ) ); + // Insert the job rows in chunks to avoid slave lag... + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { $dbw->insert( 'job', $rowBatch, __METHOD__ ); } } catch ( DBError $e ) { @@ -156,15 +184,6 @@ class JobQueueDB extends JobQueue { } $job = Job::factory( $row->job_cmd, $title, self::extractBlob( $row->job_params ), $row->job_id ); - // Delete any *other* duplicate jobs in the queue... - if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) { - $dbw->delete( 'job', - array( 'job_sha1' => $row->job_sha1, - "job_id != {$dbw->addQuotes( $row->job_id )}" ), - __METHOD__ - ); - wfIncrStats( 'job-pop', $dbw->affectedRows() ); - } // Flag this job as an old duplicate based on its "root" job... if ( $this->isRootJobOldDuplicate( $job ) ) { $job = DuplicateJob::newFromJob( $job ); // convert to a no-op -- 2.20.1