'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
return !$found;
__METHOD__
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
$this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
__METHOD__
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
__METHOD__
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
count( $rowSet ) + count( $rowList ) - count( $rows )
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
if ( $flags & self::QOS_ATOMIC ) {
$dbw->endAtomic( $method );
$this->incrStats( 'pops', $this->type );
// Get the job object from the row...
- $params = self::extractBlob( $row->job_params );
- $params = is_array( $params ) ? $params : []; // sanity
- $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
- $job = $this->factoryJob( $row->job_cmd, $params );
- $job->setMetadata( 'id', $row->job_id );
- $job->setMetadata( 'timestamp', $row->job_timestamp );
+ $job = $this->jobFromRow( $row );
break; // done
} while ( true );
$this->recycleAndDeleteStaleJobs();
}
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
return $job;
// Check cache to see if the queue has <= OFFSET items
$tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
- $row = false; // the row acquired
$invertedDirection = false; // whether one job_random direction was already scanned
// This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
// instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
$this->incrStats( 'acks', $this->type );
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
}
try {
$dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
return true;
return new MappedIterator(
$dbr->select( 'job', self::selectFields(), $conds ),
function ( $row ) {
- $params = strlen( $row->job_params ) ? unserialize( $row->job_params ) : [];
- $params = is_array( $params ) ? $params : []; // sanity
- $params += [
- 'namespace' => $row->job_namespace,
- 'title' => $row->job_title
- ];
-
- $job = $this->factoryJob( $row->job_cmd, $params );
- $job->setMetadata( 'id', $row->job_id );
- $job->setMetadata( 'timestamp', $row->job_timestamp );
-
- return $job;
+ return $this->jobFromRow( $row );
}
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
return $count;
}
/**
- * @param string $blob
- * @return bool|mixed
+ * @param stdClass $row
+ * @return RunnableJob|null
*/
- protected static function extractBlob( $blob ) {
- if ( (string)$blob !== '' ) {
- return unserialize( $blob );
- } else {
- return false;
+ protected function jobFromRow( $row ) {
+ $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
+ if ( !is_array( $params ) ) { // this shouldn't happen
+ throw new UnexpectedValueException(
+ "Could not unserialize job with ID '{$row->job_id}'." );
}
+
+ $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
+ $job = $this->factoryJob( $row->job_cmd, $params );
+ $job->setMetadata( 'id', $row->job_id );
+ $job->setMetadata( 'timestamp', $row->job_timestamp );
+
+ return $job;
}
/**
* @param DBError $e
- * @throws JobQueueError
+ * @return JobQueueError
*/
- protected function throwDBException( DBError $e ) {
- throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
+ protected function getDBException( DBError $e ) {
+ return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
}
/**