From cb344a05ad0f3bf19e74bf773dd472fd9661c205 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Tue, 16 Apr 2013 10:43:37 -0700 Subject: [PATCH] [JobQueue] Added per-type stat counter calls for better graphs. * Also refactored doBatchPush() for the DB job queue to reduce the amount of PHP closure work-arounds. I'd rather not add a $type variable to the bloated 'use' clause. Change-Id: I265effc061558d96048dcc331ac2338a8b95aff5 --- includes/job/JobQueue.php | 15 +++- includes/job/JobQueueDB.php | 136 ++++++++++++++++++--------------- includes/job/JobQueueRedis.php | 9 ++- 3 files changed, 94 insertions(+), 66 deletions(-) diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 17a1338f67..5e596dc5a5 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -338,7 +338,7 @@ abstract class JobQueue { // Flag this job as an old duplicate based on its "root" job... try { if ( $job && $this->isRootJobOldDuplicate( $job ) ) { - wfIncrStats( 'job-pop-duplicate' ); + JobQueue::incrStats( 'job-pop-duplicate', $this->type ); $job = DuplicateJob::newFromJob( $job ); // convert to a no-op } } catch ( MWException $e ) {} // don't lose jobs over this @@ -580,6 +580,19 @@ abstract class JobQueue { return new ArrayIterator( array() ); // not implemented } + /** + * Call wfIncrStats() for the queue overall and for the queue type + * + * @param string $key Event type + * @param string $type Job type + * @param integer $delta + * @since 1.22 + */ + public static function incrStats( $key, $type, $delta = 1 ) { + wfIncrStats( $key, $delta ); + wfIncrStats( "{$key}-{$type}", $delta ); + } + /** * Namespace the queue with a key to isolate it for testing * diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index b0778acefd..bab4830e14 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -177,71 +177,85 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doBatchPush( array $jobs, $flags ) { - if ( count( $jobs ) ) { - list( $dbw, $scope ) = $this->getMasterDB(); - - $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated - $rowList = array(); // list of jobs for jobs that are are not de-duplicated - - foreach ( $jobs as $job ) { - $row = $this->insertFields( $job ); - if ( $job->ignoreDuplicates() ) { - $rowSet[$row['job_sha1']] = $row; - } else { - $rowList[] = $row; - } + list( $dbw, $scope ) = $this->getMasterDB(); + + $that = $this; + $method = __METHOD__; + $dbw->onTransactionIdle( + function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) { + $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); } + ); - $key = $this->getCacheKey( 'empty' ); - $atomic = ( $flags & self::QOS_ATOMIC ); - $cache = $this->cache; - $method = __METHOD__; + return true; + } - $dbw->onTransactionIdle( - function() use ( $dbw, $cache, $rowSet, $rowList, $atomic, $key, $method, $scope - ) { - if ( $atomic ) { - $dbw->begin( $method ); // wrap all the job additions in one transaction - } - try { - // Strip out any duplicate jobs that are already in the queue... - if ( count( $rowSet ) ) { - $res = $dbw->select( 'job', 'job_sha1', - array( - // No job_type condition since it's part of the job_sha1 hash - 'job_sha1' => array_keys( $rowSet ), - 'job_token' => '' // unclaimed - ), - $method - ); - foreach ( $res as $row ) { - wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); - unset( $rowSet[$row->job_sha1] ); // already enqueued - } - } - // Build the full list of job rows to insert - $rows = array_merge( $rowList, array_values( $rowSet ) ); - // Insert the job rows in chunks to avoid slave lag... - foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { - $dbw->insert( 'job', $rowBatch, $method ); - } - wfIncrStats( 'job-insert', count( $rows ) ); - wfIncrStats( 'job-insert-duplicate', - count( $rowSet ) + count( $rowList ) - count( $rows ) ); - } catch ( DBError $e ) { - if ( $atomic ) { - $dbw->rollback( $method ); - } - throw $e; - } - if ( $atomic ) { - $dbw->commit( $method ); - } + /** + * This function should *not* be called outside of JobQueueDB + * + * @param DatabaseBase $dbw + * @param array $jobs + * @param int $flags + * @param string $method + * @return boolean + * @throws type + */ + public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) { + if ( !count( $jobs ) ) { + return true; + } - $cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); - } ); + $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated + $rowList = array(); // list of jobs for jobs that are are not de-duplicated + foreach ( $jobs as $job ) { + $row = $this->insertFields( $job ); + if ( $job->ignoreDuplicates() ) { + $rowSet[$row['job_sha1']] = $row; + } else { + $rowList[] = $row; + } + } + + if ( $flags & self::QOS_ATOMIC ) { + $dbw->begin( $method ); // wrap all the job additions in one transaction + } + try { + // Strip out any duplicate jobs that are already in the queue... + if ( count( $rowSet ) ) { + $res = $dbw->select( 'job', 'job_sha1', + array( + // No job_type condition since it's part of the job_sha1 hash + 'job_sha1' => array_keys( $rowSet ), + 'job_token' => '' // unclaimed + ), + $method + ); + foreach ( $res as $row ) { + wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); + unset( $rowSet[$row->job_sha1] ); // already enqueued + } + } + // Build the full list of job rows to insert + $rows = array_merge( $rowList, array_values( $rowSet ) ); + // Insert the job rows in chunks to avoid slave lag... + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { + $dbw->insert( 'job', $rowBatch, $method ); + } + JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) ); + JobQueue::incrStats( 'job-insert-duplicate', $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ) ); + } catch ( DBError $e ) { + if ( $flags & self::QOS_ATOMIC ) { + $dbw->rollback( $method ); + } + throw $e; + } + if ( $flags & self::QOS_ATOMIC ) { + $dbw->commit( $method ); } + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG ); + return true; } @@ -273,7 +287,7 @@ class JobQueueDB extends JobQueue { $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); break; // nothing to do } - wfIncrStats( 'job-pop' ); + JobQueue::incrStats( 'job-pop', $this->type ); // Get the job object from the row... $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); if ( !$title ) { @@ -576,7 +590,7 @@ class JobQueueDB extends JobQueue { __METHOD__ ); $count += $dbw->affectedRows(); - wfIncrStats( 'job-recycle', $dbw->affectedRows() ); + JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() ); $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); } } diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 6947f73134..3483f78c66 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -216,8 +216,9 @@ class JobQueueRedis extends JobQueue { wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); return false; } - wfIncrStats( 'job-insert', count( $items ) ); - wfIncrStats( 'job-insert-duplicate', count( $items ) - $failed - $pushed ); + JobQueue::incrStats( 'job-insert', $this->type, count( $items ) ); + JobQueue::incrStats( 'job-insert-duplicate', $this->type, + count( $items ) - $failed - $pushed ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } @@ -308,7 +309,7 @@ LUA; break; // no jobs; nothing to do } - wfIncrStats( 'job-pop' ); + JobQueue::incrStats( 'job-pop', $this->type ); $item = unserialize( $blob ); if ( $item === false ) { wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); @@ -658,7 +659,7 @@ LUA; if ( $res ) { list( $released, $abandoned, $pruned ) = $res; $count += $released + $pruned; - wfIncrStats( 'job-recycle', count( $released ) ); + JobQueue::incrStats( 'job-recycle', $this->type, count( $released ) ); } } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); -- 2.20.1