) {
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
- // The class handles all aggregator calls already
- unset( $baseConfig['aggregator'] );
// Get the partition queue objects
foreach ( $partitionMap as $partition => $w ) {
if ( !isset( $params['configByPartition'][$partition] ) ) {
// Try to insert the jobs and update $partitionsTry on any failures.
// Retry to insert any remaning jobs again, ignoring the bad partitions.
$jobsLeft = $jobs;
- // phpcs:ignore Generic.CodeAnalysis.ForLoopWithTestFunctionCall
for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
try {
$partitionRing->getLiveLocationWeights();
* @param HashRing &$partitionRing
* @param int $flags
* @throws JobQueueError
- * @return array List of Job object that could not be inserted
+ * @return IJobSpecification[] List of Job object that could not be inserted
*/
protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
$jobsLeft = [];
$job = false;
}
if ( $job ) {
- $job->metadata['QueuePartition'] = $partition;
+ $job->setMetadata( 'QueuePartition', $partition );
return $job;
} else {
return false;
}
- protected function doAck( Job $job ) {
- if ( !isset( $job->metadata['QueuePartition'] ) ) {
+ protected function doAck( RunnableJob $job ) {
+ $partition = $job->getMetadata( 'QueuePartition' );
+ if ( $partition === null ) {
throw new MWException( "The given job has no defined partition name." );
}
- $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
+ $this->partitionQueues[$partition]->ack( $job );
}
- protected function doIsRootJobOldDuplicate( Job $job ) {
+ protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
$signature = $job->getRootJobParams()['rootJobSignature'];
$partition = $this->partitionRing->getLiveLocation( $signature );
try {