X-Git-Url: https://git.cyclocoop.org/%242?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobQueue.php;h=e52f29529c6b761f82b7817779e5f6965d343edf;hb=b120fa48410839773877ae4434e031203f0c1ecf;hp=064400268e524724794438a7882393fb636b8228;hpb=a44ae41d5806992c7b5524b8098022856f03f016;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php index 064400268e..e52f29529c 100644 --- a/includes/jobqueue/JobQueue.php +++ b/includes/jobqueue/JobQueue.php @@ -44,8 +44,8 @@ abstract class JobQueue { /** @var StatsdDataFactoryInterface */ protected $stats; - /** @var BagOStuff */ - protected $dupCache; + /** @var WANObjectCache */ + protected $wanCache; const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions @@ -53,6 +53,14 @@ abstract class JobQueue { /** * @param array $params + * - type : A job type + * - domain : A DB domain ID + * - wanCache : An instance of WANObjectCache to use for caching [default: none] + * - stats : An instance of StatsdDataFactoryInterface [default: none] + * - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever] + * - maxTries : Total times a job can be tried, assuming claims expire [default: 3] + * - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable] + * - readOnlyReason : Mark the queue as read-only with this reason [default: false] * @throws JobQueueError */ protected function __construct( array $params ) { @@ -70,7 +78,7 @@ abstract class JobQueue { } $this->readOnlyReason = $params['readOnlyReason'] ?? false; $this->stats = $params['stats'] ?? new NullStatsdDataFactory(); - $this->dupCache = $params['stash'] ?? new EmptyBagOStuff(); + $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty(); } /** @@ -361,7 +369,7 @@ abstract class JobQueue { * Outside callers should use JobQueueGroup::pop() instead of this function. * * @throws JobQueueError - * @return Job|bool Returns false if there are no jobs + * @return RunnableJob|bool Returns false if there are no jobs */ final public function pop() { $this->assertNotReadOnly(); @@ -383,7 +391,7 @@ abstract class JobQueue { /** * @see JobQueue::pop() - * @return Job|bool + * @return RunnableJob|bool */ abstract protected function doPop(); @@ -393,11 +401,11 @@ abstract class JobQueue { * This does nothing for certain queue classes or if "claimTTL" is not set. * Outside callers should use JobQueueGroup::ack() instead of this function. * - * @param Job $job + * @param RunnableJob $job * @return void * @throws JobQueueError */ - final public function ack( Job $job ) { + final public function ack( RunnableJob $job ) { $this->assertNotReadOnly(); if ( $job->getType() !== $this->type ) { throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." ); @@ -408,9 +416,9 @@ abstract class JobQueue { /** * @see JobQueue::ack() - * @param Job $job + * @param RunnableJob $job */ - abstract protected function doAck( Job $job ); + abstract protected function doAck( RunnableJob $job ); /** * Register the "root job" of a given job into the queue for de-duplication. @@ -459,56 +467,58 @@ abstract class JobQueue { * @return bool */ protected function doDeduplicateRootJob( IJobSpecification $job ) { - if ( !$job->hasRootJobParams() ) { + $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null; + if ( !$params ) { throw new JobQueueError( "Cannot register root job; missing parameters." ); } - $params = $job->getRootJobParams(); $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); - // Callers should call JobQueueGroup::push() before this method so that if the insert - // fails, the de-duplication registration will be aborted. Since the insert is - // deferred till "transaction idle", do the same here, so that the ordering is - // maintained. Having only the de-duplication registration succeed would cause - // jobs to become no-ops without any actual jobs that made them redundant. - $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job - if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + // Callers should call JobQueueGroup::push() before this method so that if the + // insert fails, the de-duplication registration will be aborted. Having only the + // de-duplication registration succeed would cause jobs to become no-ops without + // any actual jobs that made them redundant. + $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job + if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) { return true; // a newer version of this root job was enqueued } // Update the timestamp of the last root job started at the location... - return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); + return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); } /** * Check if the "root" job of a given job has been superseded by a newer one * - * @param Job $job + * @param IJobSpecification $job * @throws JobQueueError * @return bool */ - final protected function isRootJobOldDuplicate( Job $job ) { + final protected function isRootJobOldDuplicate( IJobSpecification $job ) { if ( $job->getType() !== $this->type ) { throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } - $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); - return $isDuplicate; + return $this->doIsRootJobOldDuplicate( $job ); } /** * @see JobQueue::isRootJobOldDuplicate() - * @param Job $job + * @param IJobSpecification $job * @return bool */ - protected function doIsRootJobOldDuplicate( Job $job ) { - if ( !$job->hasRootJobParams() ) { + protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { + $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null; + if ( !$params ) { return false; // job has no de-deplication info } - $params = $job->getRootJobParams(); $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); // Get the last time this root job was enqueued - $timestamp = $this->dupCache->get( $key ); + $timestamp = $this->wanCache->get( $key ); + if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) { + // Update the timestamp of the last known root job started at the location... + $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); + } // Check if a new root job was started at the location after this one's... return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); @@ -519,7 +529,7 @@ abstract class JobQueue { * @return string */ protected function getRootJobCacheKey( $signature ) { - return $this->dupCache->makeGlobalKey( + return $this->wanCache->makeGlobalKey( 'jobqueue', $this->domain, $this->type, @@ -686,6 +696,16 @@ abstract class JobQueue { return null; // not supported } + /** + * @param string $command + * @param array $params + * @return Job + */ + protected function factoryJob( $command, $params ) { + // @TODO: dependency inject this as a callback + return Job::factory( $command, $params ); + } + /** * @throws JobQueueReadOnlyError */