3 * Job queue task base code.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
21 * @defgroup JobQueue JobQueue
25 * Class to both describe a background job and handle jobs.
26 * To push jobs onto queues, use JobQueueGroup::singleton()->push();
30 abstract class Job
implements RunnableJob
{
34 /** @var array Array of job parameters */
37 /** @var array Additional queue metadata */
38 public $metadata = [];
43 /** @var bool Expensive jobs may set this to true */
44 protected $removeDuplicates = false;
46 /** @var string Text for error that occurred last */
49 /** @var callable[] */
50 protected $teardownCallbacks = [];
52 /** @var int Bitfield of JOB_* class constants */
53 protected $executionFlags = 0;
55 /** @var int Job must not be wrapped in the usual explicit LBFactory transaction round */
56 const JOB_NO_EXPLICIT_TRX_ROUND
= 1;
59 * Create the appropriate object to handle a specific job
61 * @param string $command Job command
62 * @param array|Title $params Job parameters
63 * @throws InvalidArgumentException
66 public static function factory( $command, $params = [] ) {
69 if ( $params instanceof Title
) {
70 // Backwards compatibility for old signature ($command, $title, $params)
72 $params = func_num_args() >= 3 ?
func_get_arg( 2 ) : [];
73 } elseif ( isset( $params['namespace'] ) && isset( $params['title'] ) ) {
74 // Handle job classes that take title as constructor parameter.
75 // If a newer classes like GenericParameterJob uses these parameters,
76 // then this happens in Job::__construct instead.
77 $title = Title
::makeTitle( $params['namespace'], $params['title'] );
79 // Default title for job classes not implementing GenericParameterJob.
80 // This must be a valid title because it not directly passed to
81 // our Job constructor, but rather it's subclasses which may expect
82 // to be able to use it.
83 $title = Title
::makeTitle( NS_SPECIAL
, 'Blankpage' );
86 if ( isset( $wgJobClasses[$command] ) ) {
87 $handler = $wgJobClasses[$command];
89 if ( is_callable( $handler ) ) {
90 $job = call_user_func( $handler, $title, $params );
91 } elseif ( class_exists( $handler ) ) {
92 if ( is_subclass_of( $handler, GenericParameterJob
::class ) ) {
93 $job = new $handler( $params );
95 $job = new $handler( $title, $params );
101 if ( $job instanceof Job
) {
102 $job->command
= $command;
106 throw new InvalidArgumentException( "Could instantiate job '$command': bad spec!" );
110 throw new InvalidArgumentException( "Invalid job command '{$command}'" );
114 * @param string $command
115 * @param array|Title|null $params
117 public function __construct( $command, $params = null ) {
118 if ( $params instanceof Title
) {
119 // Backwards compatibility for old signature ($command, $title, $params)
121 $params = func_num_args() >= 3 ?
func_get_arg( 2 ) : [];
123 // Newer jobs may choose to not have a top-level title (e.g. GenericParameterJob)
127 if ( !is_array( $params ) ) {
128 throw new InvalidArgumentException( '$params must be an array' );
133 !isset( $params['namespace'] ) &&
134 !isset( $params['title'] )
136 // When constructing this class for submitting to the queue,
137 // normalise the $title arg of old job classes as part of $params.
138 $params['namespace'] = $title->getNamespace();
139 $params['title'] = $title->getDBKey();
142 $this->command
= $command;
143 $this->params
= $params +
[ 'requestId' => WebRequest
::getRequestId() ];
145 if ( $this->title
=== null ) {
146 // Set this field for access via getTitle().
147 $this->title
= ( isset( $params['namespace'] ) && isset( $params['title'] ) )
148 ? Title
::makeTitle( $params['namespace'], $params['title'] )
149 // GenericParameterJob classes without namespace/title params
150 // should not use getTitle(). Set an invalid title as placeholder.
151 : Title
::makeTitle( NS_SPECIAL
, '' );
156 * @param int $flag JOB_* class constant
160 public function hasExecutionFlag( $flag ) {
161 return ( $this->executionFlags
& $flag ) === $flag;
167 public function getType() {
168 return $this->command
;
174 final public function getTitle() {
181 public function getParams() {
182 return $this->params
;
186 * @param string|null $field Metadata field or null to get all the metadata
187 * @return mixed|null Value; null if missing
190 public function getMetadata( $field = null ) {
191 if ( $field === null ) {
192 return $this->metadata
;
195 return $this->metadata
[$field] ??
null;
199 * @param string $field Key name to set the value for
200 * @param mixed $value The value to set the field for
201 * @return mixed|null The prior field value; null if missing
204 public function setMetadata( $field, $value ) {
205 $old = $this->getMetadata( $field );
206 if ( $value === null ) {
207 unset( $this->metadata
[$field] );
209 $this->metadata
[$field] = $value;
216 * @return int|null UNIX timestamp to delay running this job until, otherwise null
219 public function getReleaseTimestamp() {
220 return isset( $this->params
['jobReleaseTimestamp'] )
221 ?
wfTimestampOrNull( TS_UNIX
, $this->params
['jobReleaseTimestamp'] )
226 * @return int|null UNIX timestamp of when the job was queued, or null
229 public function getQueuedTimestamp() {
230 return isset( $this->metadata
['timestamp'] )
231 ?
wfTimestampOrNull( TS_UNIX
, $this->metadata
['timestamp'] )
236 * @return string|null Id of the request that created this job. Follows
237 * jobs recursively, allowing to track the id of the request that started a
238 * job when jobs insert jobs which insert other jobs.
241 public function getRequestId() {
242 return $this->params
['requestId'] ??
null;
246 * @return int|null UNIX timestamp of when the job was runnable, or null
249 public function getReadyTimestamp() {
250 return $this->getReleaseTimestamp() ?
: $this->getQueuedTimestamp();
254 * Whether the queue should reject insertion of this job if a duplicate exists
256 * This can be used to avoid duplicated effort or combined with delayed jobs to
257 * coalesce updates into larger batches. Claimed jobs are never treated as
258 * duplicates of new jobs, and some queues may allow a few duplicates due to
259 * network partitions and fail-over. Thus, additional locking is needed to
260 * enforce mutual exclusion if this is really needed.
264 public function ignoreDuplicates() {
265 return $this->removeDuplicates
;
269 * @return bool Whether this job can be retried on failure by job runners
272 public function allowRetries() {
277 * @return int Number of actually "work items" handled in this job
278 * @see $wgJobBackoffThrottling
281 public function workItemCount() {
286 * Subclasses may need to override this to make duplication detection work.
287 * The resulting map conveys everything that makes the job unique. This is
288 * only checked if ignoreDuplicates() returns true, meaning that duplicate
289 * jobs are supposed to be ignored.
291 * @return array Map of key/values
294 public function getDeduplicationInfo() {
296 'type' => $this->getType(),
297 'params' => $this->getParams()
299 if ( is_array( $info['params'] ) ) {
300 // Identical jobs with different "root" jobs should count as duplicates
301 unset( $info['params']['rootJobSignature'] );
302 unset( $info['params']['rootJobTimestamp'] );
303 // Likewise for jobs with different delay times
304 unset( $info['params']['jobReleaseTimestamp'] );
305 // Identical jobs from different requests should count as duplicates
306 unset( $info['params']['requestId'] );
307 // Queues pack and hash this array, so normalize the order
308 ksort( $info['params'] );
315 * Get "root job" parameters for a task
317 * This is used to no-op redundant jobs, including child jobs of jobs,
318 * as long as the children inherit the root job parameters. When a job
319 * with root job parameters and "rootJobIsSelf" set is pushed, the
320 * deduplicateRootJob() method is automatically called on it. If the
321 * root job is only virtual and not actually pushed (e.g. the sub-jobs
322 * are inserted directly), then call deduplicateRootJob() directly.
324 * @see JobQueue::deduplicateRootJob()
326 * @param string $key A key that identifies the task
327 * @return array Map of:
328 * - rootJobIsSelf : true
329 * - rootJobSignature : hash (e.g. SHA1) that identifies the task
330 * - rootJobTimestamp : TS_MW timestamp of this instance of the task
333 public static function newRootJobParams( $key ) {
335 'rootJobIsSelf' => true,
336 'rootJobSignature' => sha1( $key ),
337 'rootJobTimestamp' => wfTimestampNow()
342 * @see JobQueue::deduplicateRootJob()
346 public function getRootJobParams() {
348 'rootJobSignature' => $this->params
['rootJobSignature'] ??
null,
349 'rootJobTimestamp' => $this->params
['rootJobTimestamp'] ??
null
354 * @see JobQueue::deduplicateRootJob()
358 public function hasRootJobParams() {
359 return isset( $this->params
['rootJobSignature'] )
360 && isset( $this->params
['rootJobTimestamp'] );
364 * @see JobQueue::deduplicateRootJob()
365 * @return bool Whether this is job is a root job
367 public function isRootJob() {
368 return $this->hasRootJobParams() && !empty( $this->params
['rootJobIsSelf'] );
372 * @param callable $callback A function with one parameter, the success status, which will be
373 * false if the job failed or it succeeded but the DB changes could not be committed or
374 * any deferred updates threw an exception. (This parameter was added in 1.28.)
377 protected function addTeardownCallback( $callback ) {
378 $this->teardownCallbacks
[] = $callback;
382 * Do any final cleanup after run(), deferred updates, and all DB commits happen
383 * @param bool $status Whether the job, its deferred updates, and DB commit all succeeded
386 public function teardown( $status ) {
387 foreach ( $this->teardownCallbacks
as $callback ) {
388 call_user_func( $callback, $status );
395 public function toString() {
397 if ( $this->params
) {
398 foreach ( $this->params
as $key => $value ) {
399 if ( $paramString != '' ) {
402 if ( is_array( $value ) ) {
404 foreach ( $value as $k => $v ) {
405 $json = FormatJson
::encode( $v );
406 if ( $json === false ||
mb_strlen( $json ) > 512 ) {
407 $filteredValue[$k] = gettype( $v ) . '(...)';
409 $filteredValue[$k] = $v;
412 if ( count( $filteredValue ) <= 10 ) {
413 $value = FormatJson
::encode( $filteredValue );
415 $value = "array(" . count( $value ) . ")";
417 } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) {
418 $value = "object(" . get_class( $value ) . ")";
421 $flatValue = (string)$value;
422 if ( mb_strlen( $value ) > 1024 ) {
423 $flatValue = "string(" . mb_strlen( $value ) . ")";
426 $paramString .= "$key={$flatValue}";
431 foreach ( $this->metadata
as $key => $value ) {
432 if ( is_scalar( $value ) && mb_strlen( $value ) < 1024 ) {
433 $metaString .= ( $metaString ?
",$key=$value" : "$key=$value" );
438 if ( is_object( $this->title
) ) {
439 $s .= " {$this->title->getPrefixedDBkey()}";
441 if ( $paramString != '' ) {
442 $s .= " $paramString";
444 if ( $metaString != '' ) {
445 $s .= " ($metaString)";
451 protected function setLastError( $error ) {
452 $this->error
= $error;
455 public function getLastError() {