[JobQueue] Use SELECT for deduplication rather than more expensive DELETEs.
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 29 Nov 2012 00:09:46 +0000 (16:09 -0800)
committerAaron Schulz <aschulz@wikimedia.org>
Mon, 3 Dec 2012 17:51:39 +0000 (09:51 -0800)
Change-Id: Iff5dbc0ca6dc7032596d1b4ffddb3cc1848b9e10

includes/job/JobQueueDB.php

index 80bc6d5..67c5083 100644 (file)
@@ -64,15 +64,25 @@ class JobQueueDB extends JobQueue {
                if ( count( $jobs ) ) {
                        $dbw = $this->getMasterDB();
 
-                       $rows = array();
+                       $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
+                       $rowList = array(); // list of jobs for jobs that are are not de-duplicated
+
                        foreach ( $jobs as $job ) {
-                               $rows[] = $this->insertFields( $job );
+                               $row = $this->insertFields( $job );
+                               if ( $job->ignoreDuplicates() ) {
+                                       $rowSet[$row['job_sha1']] = $row;
+                               } else {
+                                       $rowList[] = $row;
+                               }
                        }
+
                        $atomic = ( $flags & self::QoS_Atomic );
                        $key    = $this->getEmptinessCacheKey();
                        $ttl    = self::CACHE_TTL;
 
-                       $dbw->onTransactionIdle( function() use ( $dbw, $rows, $atomic, $key, $ttl ) {
+                       $dbw->onTransactionIdle(
+                               function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $ttl
+                       ) {
                                global $wgMemc;
 
                                $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
@@ -82,7 +92,25 @@ class JobQueueDB extends JobQueue {
                                        $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
                                }
                                try {
-                                       foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { // avoid slave lag
+                                       // Strip out any duplicate jobs that are already in the queue...
+                                       if ( count( $rowSet ) ) {
+                                               $res = $dbw->select( 'job', 'job_sha1',
+                                                       array(
+                                                               // No job_type condition since it's part of the job_sha1 hash
+                                                               'job_sha1'  => array_keys( $rowSet ),
+                                                               'job_token' => '' // unclaimed
+                                                       ),
+                                                       __METHOD__
+                                               );
+                                               foreach ( $res as $row ) {
+                                                       wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
+                                                       unset( $rowSet[$row->job_sha1] ); // already enqueued
+                                               }
+                                       }
+                                       // Build the full list of job rows to insert
+                                       $rows = array_merge( $rowList, array_values( $rowSet ) );
+                                       // Insert the job rows in chunks to avoid slave lag...
+                                       foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
                                                $dbw->insert( 'job', $rowBatch, __METHOD__ );
                                        }
                                } catch ( DBError $e ) {
@@ -156,15 +184,6 @@ class JobQueueDB extends JobQueue {
                                }
                                $job = Job::factory( $row->job_cmd, $title,
                                        self::extractBlob( $row->job_params ), $row->job_id );
-                               // Delete any *other* duplicate jobs in the queue...
-                               if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
-                                       $dbw->delete( 'job',
-                                               array( 'job_sha1' => $row->job_sha1,
-                                                       "job_id != {$dbw->addQuotes( $row->job_id )}" ),
-                                               __METHOD__
-                                       );
-                                       wfIncrStats( 'job-pop', $dbw->affectedRows() );
-                               }
                                // Flag this job as an old duplicate based on its "root" job...
                                if ( $this->isRootJobOldDuplicate( $job ) ) {
                                        $job = DuplicateJob::newFromJob( $job ); // convert to a no-op