X-Git-Url: https://git.cyclocoop.org/%242?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobQueue.php;h=e52f29529c6b761f82b7817779e5f6965d343edf;hb=b120fa48410839773877ae4434e031203f0c1ecf;hp=f5ed7b91cb8ba253c8a4385598a25cd73d9dfad6;hpb=7ecbff2360a07755f771c648ed1e27ee3dca8ad5;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobQueue.php b/includes/jobqueue/JobQueue.php index f5ed7b91cb..e52f29529c 100644 --- a/includes/jobqueue/JobQueue.php +++ b/includes/jobqueue/JobQueue.php @@ -44,8 +44,8 @@ abstract class JobQueue { /** @var StatsdDataFactoryInterface */ protected $stats; - /** @var BagOStuff */ - protected $dupCache; + /** @var WANObjectCache */ + protected $wanCache; const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions @@ -53,6 +53,14 @@ abstract class JobQueue { /** * @param array $params + * - type : A job type + * - domain : A DB domain ID + * - wanCache : An instance of WANObjectCache to use for caching [default: none] + * - stats : An instance of StatsdDataFactoryInterface [default: none] + * - claimTTL : Seconds a job can be claimed for exclusive execution [default: forever] + * - maxTries : Total times a job can be tried, assuming claims expire [default: 3] + * - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable] + * - readOnlyReason : Mark the queue as read-only with this reason [default: false] * @throws JobQueueError */ protected function __construct( array $params ) { @@ -70,7 +78,7 @@ abstract class JobQueue { } $this->readOnlyReason = $params['readOnlyReason'] ?? false; $this->stats = $params['stats'] ?? new NullStatsdDataFactory(); - $this->dupCache = $params['stash'] ?? new EmptyBagOStuff(); + $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty(); } /** @@ -459,24 +467,23 @@ abstract class JobQueue { * @return bool */ protected function doDeduplicateRootJob( IJobSpecification $job ) { - if ( !$job->hasRootJobParams() ) { + $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null; + if ( !$params ) { throw new JobQueueError( "Cannot register root job; missing parameters." ); } - $params = $job->getRootJobParams(); $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); - // Callers should call JobQueueGroup::push() before this method so that if the insert - // fails, the de-duplication registration will be aborted. Since the insert is - // deferred till "transaction idle", do the same here, so that the ordering is - // maintained. Having only the de-duplication registration succeed would cause - // jobs to become no-ops without any actual jobs that made them redundant. - $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job - if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + // Callers should call JobQueueGroup::push() before this method so that if the + // insert fails, the de-duplication registration will be aborted. Having only the + // de-duplication registration succeed would cause jobs to become no-ops without + // any actual jobs that made them redundant. + $timestamp = $this->wanCache->get( $key ); // last known timestamp of such a root job + if ( $timestamp !== false && $timestamp >= $params['rootJobTimestamp'] ) { return true; // a newer version of this root job was enqueued } // Update the timestamp of the last root job started at the location... - return $this->dupCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); + return $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); } /** @@ -490,9 +497,8 @@ abstract class JobQueue { if ( $job->getType() !== $this->type ) { throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } - $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); - return $isDuplicate; + return $this->doIsRootJobOldDuplicate( $job ); } /** @@ -501,14 +507,18 @@ abstract class JobQueue { * @return bool */ protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { - if ( !$job->hasRootJobParams() ) { + $params = $job->hasRootJobParams() ? $job->getRootJobParams() : null; + if ( !$params ) { return false; // job has no de-deplication info } - $params = $job->getRootJobParams(); $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); // Get the last time this root job was enqueued - $timestamp = $this->dupCache->get( $key ); + $timestamp = $this->wanCache->get( $key ); + if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) { + // Update the timestamp of the last known root job started at the location... + $this->wanCache->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); + } // Check if a new root job was started at the location after this one's... return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); @@ -519,7 +529,7 @@ abstract class JobQueue { * @return string */ protected function getRootJobCacheKey( $signature ) { - return $this->dupCache->makeGlobalKey( + return $this->wanCache->makeGlobalKey( 'jobqueue', $this->domain, $this->type,