From b48dcf28d6d5a881582cb5d5569f8d4b006a2f60 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 29 Mar 2019 21:41:34 -0700 Subject: [PATCH] jobqueue: dependency inject more objects into JobQueue Also moved some WikiMap/$wgJobClasses checks to JobQueueGroup::pop which is the method callers are supposed to use. Change-Id: I2ab82d8adc4ae1f54697d2935afa2053539cf2db --- includes/jobqueue/JobQueue.php | 34 +++++++------------ includes/jobqueue/JobQueueDB.php | 15 ++++---- includes/jobqueue/JobQueueGroup.php | 15 ++++++++ includes/jobqueue/JobQueueMemory.php | 11 +++--- includes/jobqueue/JobQueueRedis.php | 10 +++--- .../includes/jobqueue/JobQueueTest.php | 2 ++ 6 files changed, 50 insertions(+), 37 deletions(-) diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php index 8cfed3b149..064400268e 100644 --- a/includes/jobqueue/JobQueue.php +++ b/includes/jobqueue/JobQueue.php @@ -20,7 +20,7 @@ * @file * @defgroup JobQueue JobQueue */ -use MediaWiki\MediaWikiServices; +use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface; /** * Class to handle enqueueing and running of background jobs @@ -41,6 +41,8 @@ abstract class JobQueue { protected $maxTries; /** @var string|bool Read only rationale (or false if r/w) */ protected $readOnlyReason; + /** @var StatsdDataFactoryInterface */ + protected $stats; /** @var BagOStuff */ protected $dupCache; @@ -66,8 +68,9 @@ abstract class JobQueue { if ( !in_array( $this->order, $this->supportedOrders() ) ) { throw new JobQueueError( __CLASS__ . " does not support '{$this->order}' order." ); } - $this->dupCache = wfGetCache( CACHE_ANYTHING ); $this->readOnlyReason = $params['readOnlyReason'] ?? false; + $this->stats = $params['stats'] ?? new NullStatsdDataFactory(); + $this->dupCache = $params['stash'] ?? new EmptyBagOStuff(); } /** @@ -91,6 +94,8 @@ abstract class JobQueue { * of jobs simply means re-inserting them into the queue. Jobs can be * attempted up to three times before being discarded. * - readOnlyReason : Set this to a string to make the queue read-only. + * - stash : A BagOStuff instance that can be used for root job deduplication + * - stats : A StatsdDataFactoryInterface [optional] * * Queue classes should throw an exception if they do not support the options given. * @@ -112,7 +117,7 @@ abstract class JobQueue { } /** - * @return string Wiki ID + * @return string Database domain ID */ final public function getDomain() { return $this->domain; @@ -359,23 +364,14 @@ abstract class JobQueue { * @return Job|bool Returns false if there are no jobs */ final public function pop() { - global $wgJobClasses; - $this->assertNotReadOnly(); - if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) { - throw new JobQueueError( - "Cannot pop '{$this->type}' job off foreign '{$this->domain}' wiki queue." ); - } elseif ( !isset( $wgJobClasses[$this->type] ) ) { - // Do not pop jobs if there is no class for the queue type - throw new JobQueueError( "Unrecognized job type '{$this->type}'." ); - } $job = $this->doPop(); // Flag this job as an old duplicate based on its "root" job... try { if ( $job && $this->isRootJobOldDuplicate( $job ) ) { - self::incrStats( 'dupe_pops', $this->type ); + $this->incrStats( 'dupe_pops', $this->type ); $job = DuplicateJob::newFromJob( $job ); // convert to a no-op } } catch ( Exception $e ) { @@ -480,7 +476,7 @@ abstract class JobQueue { } // Update the timestamp of the last root job started at the location... - return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); } /** @@ -707,12 +703,8 @@ abstract class JobQueue { * @param int $delta * @since 1.22 */ - public static function incrStats( $key, $type, $delta = 1 ) { - static $stats; - if ( !$stats ) { - $stats = MediaWikiServices::getInstance()->getStatsdDataFactory(); - } - $stats->updateCount( "jobqueue.{$key}.all", $delta ); - $stats->updateCount( "jobqueue.{$key}.{$type}", $delta ); + protected function incrStats( $key, $type, $delta = 1 ) { + $this->stats->updateCount( "jobqueue.{$key}.all", $delta ); + $this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta ); } } diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php index 7aecfe9dd1..c2772a6381 100644 --- a/includes/jobqueue/JobQueueDB.php +++ b/includes/jobqueue/JobQueueDB.php @@ -55,6 +55,7 @@ class JobQueueDB extends JobQueue { * If not specified, the primary DB cluster for the wiki will be used. * This can be overridden with a custom cluster so that DB handles will * be retrieved via LBFactory::getExternalLB() and getConnection(). + * - wanCache : An instance of WANObjectCache to use for caching. * @param array $params */ protected function __construct( array $params ) { @@ -66,7 +67,7 @@ class JobQueueDB extends JobQueue { $this->cluster = $params['cluster']; } - $this->cache = MediaWikiServices::getInstance()->getMainWANObjectCache(); + $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty(); } protected function supportedOrders() { @@ -275,8 +276,8 @@ class JobQueueDB extends JobQueue { foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { $dbw->insert( 'job', $rowBatch, $method ); } - JobQueue::incrStats( 'inserts', $this->type, count( $rows ) ); - JobQueue::incrStats( 'dupe_inserts', $this->type, + $this->incrStats( 'inserts', $this->type, count( $rows ) ); + $this->incrStats( 'dupe_inserts', $this->type, count( $rowSet ) + count( $rowList ) - count( $rows ) ); } catch ( DBError $e ) { @@ -312,7 +313,7 @@ class JobQueueDB extends JobQueue { if ( !$row ) { break; // nothing to do } - JobQueue::incrStats( 'pops', $this->type ); + $this->incrStats( 'pops', $this->type ); // Get the job object from the row... $title = Title::makeTitle( $row->job_namespace, $row->job_title ); $job = Job::factory( $row->job_cmd, $title, @@ -500,7 +501,7 @@ class JobQueueDB extends JobQueue { __METHOD__ ); - JobQueue::incrStats( 'acks', $this->type ); + $this->incrStats( 'acks', $this->type ); } catch ( DBError $e ) { $this->throwDBException( $e ); } @@ -727,7 +728,7 @@ class JobQueueDB extends JobQueue { ); $affected = $dbw->affectedRows(); $count += $affected; - JobQueue::incrStats( 'recycles', $this->type, $affected ); + $this->incrStats( 'recycles', $this->type, $affected ); } } @@ -753,7 +754,7 @@ class JobQueueDB extends JobQueue { $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ ); $affected = $dbw->affectedRows(); $count += $affected; - JobQueue::incrStats( 'abandons', $this->type, $affected ); + $this->incrStats( 'abandons', $this->type, $affected ); } $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); diff --git a/includes/jobqueue/JobQueueGroup.php b/includes/jobqueue/JobQueueGroup.php index b9c4157574..4bac304d13 100644 --- a/includes/jobqueue/JobQueueGroup.php +++ b/includes/jobqueue/JobQueueGroup.php @@ -118,6 +118,11 @@ class JobQueueGroup { $conf['readOnlyReason'] = $this->readOnlyReason; } + $services = MediaWikiServices::getInstance(); + $conf['stats'] = $services->getStatsdDataFactory(); + $conf['wanCache'] = $services->getMainWANObjectCache(); + $conf['stash'] = $services->getMainObjectStash(); + return JobQueue::factory( $conf ); } @@ -232,8 +237,18 @@ class JobQueueGroup { * @return Job|bool Returns false on failure */ public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) { + global $wgJobClasses; + $job = false; + if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) { + throw new JobQueueError( + "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." ); + } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) { + // Do not pop jobs if there is no class for the queue type + throw new JobQueueError( "Unrecognized job type '$qtype'." ); + } + if ( is_string( $qtype ) ) { // specific job type if ( !in_array( $qtype, $blacklist ) ) { $job = $this->get( $qtype )->pop(); diff --git a/includes/jobqueue/JobQueueMemory.php b/includes/jobqueue/JobQueueMemory.php index 6c45e9676a..b6c40054a8 100644 --- a/includes/jobqueue/JobQueueMemory.php +++ b/includes/jobqueue/JobQueueMemory.php @@ -32,6 +32,12 @@ class JobQueueMemory extends JobQueue { /** @var array[] */ protected static $data = []; + public function __construct( array $params ) { + parent::__construct( $params ); + + $this->dupCache = new HashBagOStuff(); + } + /** * @see JobQueue::doBatchPush * @@ -43,10 +49,7 @@ class JobQueueMemory extends JobQueue { foreach ( $jobs as $job ) { if ( $job->ignoreDuplicates() ) { - $sha1 = Wikimedia\base_convert( - sha1( serialize( $job->getDeduplicationInfo() ) ), - 16, 36, 31 - ); + $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); if ( !isset( $unclaimed[$sha1] ) ) { $unclaimed[$sha1] = $job; } diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index 4d07a09a71..98a5491501 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -225,9 +225,9 @@ class JobQueueRedis extends JobQueue { $failed += count( $itemBatch ); } } - JobQueue::incrStats( 'inserts', $this->type, count( $items ) ); - JobQueue::incrStats( 'inserts_actual', $this->type, $pushed ); - JobQueue::incrStats( 'dupe_inserts', $this->type, + $this->incrStats( 'inserts', $this->type, count( $items ) ); + $this->incrStats( 'inserts_actual', $this->type, $pushed ); + $this->incrStats( 'dupe_inserts', $this->type, count( $items ) - $failed - $pushed ); if ( $failed > 0 ) { $err = "Could not insert {$failed} {$this->type} job(s)."; @@ -321,7 +321,7 @@ LUA; break; // no jobs; nothing to do } - JobQueue::incrStats( 'pops', $this->type ); + $this->incrStats( 'pops', $this->type ); $item = $this->unserialize( $blob ); if ( $item === false ) { wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); @@ -424,7 +424,7 @@ LUA; return false; } - JobQueue::incrStats( 'acks', $this->type ); + $this->incrStats( 'acks', $this->type ); } catch ( RedisException $e ) { $this->throwRedisException( $conn, $e ); } diff --git a/tests/phpunit/includes/jobqueue/JobQueueTest.php b/tests/phpunit/includes/jobqueue/JobQueueTest.php index d38c6c7093..81a80b66d5 100644 --- a/tests/phpunit/includes/jobqueue/JobQueueTest.php +++ b/tests/phpunit/includes/jobqueue/JobQueueTest.php @@ -32,6 +32,8 @@ class JobQueueTest extends MediaWikiTestCase { } $baseConfig['type'] = 'null'; $baseConfig['domain'] = WikiMap::getCurrentWikiDbDomain()->getId(); + $baseConfig['stash'] = new HashBagOStuff(); + $baseConfig['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] ); $variants = [ 'queueRand' => [ 'order' => 'random', 'claimTTL' => 0 ], 'queueRandTTL' => [ 'order' => 'random', 'claimTTL' => 10 ], -- 2.20.1