) {
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
- // The class handles all aggregator calls already
- unset( $baseConfig['aggregator'] );
// Get the partition queue objects
foreach ( $partitionMap as $partition => $w ) {
if ( !isset( $params['configByPartition'][$partition] ) ) {
* @param HashRing &$partitionRing
* @param int $flags
* @throws JobQueueError
- * @return array List of Job object that could not be inserted
+ * @return IJobSpecification[] List of Job object that could not be inserted
*/
protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
$jobsLeft = [];
$job = false;
}
if ( $job ) {
- $job->metadata['QueuePartition'] = $partition;
+ $job->setMetadata( 'QueuePartition', $partition );
return $job;
} else {
return false;
}
- protected function doAck( Job $job ) {
- if ( !isset( $job->metadata['QueuePartition'] ) ) {
+ protected function doAck( RunnableJob $job ) {
+ $partition = $job->getMetadata( 'QueuePartition' );
+ if ( $partition === null ) {
throw new MWException( "The given job has no defined partition name." );
}
- $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
+ $this->partitionQueues[$partition]->ack( $job );
}
- protected function doIsRootJobOldDuplicate( Job $job ) {
+ protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
$signature = $job->getRootJobParams()['rootJobSignature'];
$partition = $this->partitionRing->getLiveLocation( $signature );
try {