Merge "Fix API output formatting (change lines delimited with * as bold)"
[lhc/web/wiklou.git] / includes / job / JobQueueRedis.php
1 <?php
2 /**
3 * Redis-backed job queue code.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @author Aaron Schulz
22 */
23
24 /**
25 * Class to handle job queues stored in Redis
26 *
27 * @ingroup JobQueue
28 * @since 1.21
29 */
30 class JobQueueRedis extends JobQueue {
31 /** @var RedisConnectionPool */
32 protected $redisPool;
33
34 protected $server; // string; server address
35
36 const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
37 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
38
39 /**
40 * @params include:
41 * - redisConf : An array of parameters to RedisConnectionPool::__construct().
42 * - server : A hostname/port combination or the absolute path of a UNIX socket.
43 * If a hostname is specified but no port, the standard port number
44 * 6379 will be used. Required.
45 * @param array $params
46 */
47 public function __construct( array $params ) {
48 parent::__construct( $params );
49 $this->server = $params['redisConf']['server'];
50 $this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] );
51 }
52
53 /**
54 * @see JobQueue::doIsEmpty()
55 * @return bool
56 * @throws MWException
57 */
58 protected function doIsEmpty() {
59 if ( mt_rand( 0, 99 ) == 0 ) {
60 $this->doInternalMaintenance();
61 }
62
63 $conn = $this->getConnection();
64 try {
65 return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
66 } catch ( RedisException $e ) {
67 $this->throwRedisException( $this->server, $conn, $e );
68 }
69 }
70
71 /**
72 * @see JobQueue::doGetSize()
73 * @return integer
74 * @throws MWException
75 */
76 protected function doGetSize() {
77 if ( mt_rand( 0, 99 ) == 0 ) {
78 $this->doInternalMaintenance();
79 }
80
81 $conn = $this->getConnection();
82 try {
83 return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
84 } catch ( RedisException $e ) {
85 $this->throwRedisException( $this->server, $conn, $e );
86 }
87 }
88
89 /**
90 * @see JobQueue::doGetAcquiredCount()
91 * @return integer
92 * @throws MWException
93 */
94 protected function doGetAcquiredCount() {
95 if ( mt_rand( 0, 99 ) == 0 ) {
96 $this->doInternalMaintenance();
97 }
98
99 $conn = $this->getConnection();
100 try {
101 if ( $this->claimTTL > 0 ) {
102 return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
103 } else {
104 return 0;
105 }
106 } catch ( RedisException $e ) {
107 $this->throwRedisException( $this->server, $conn, $e );
108 }
109 }
110
111 /**
112 * @see JobQueue::doBatchPush()
113 * @param array $jobs
114 * @param $flags
115 * @return bool
116 * @throws MWException
117 */
118 protected function doBatchPush( array $jobs, $flags ) {
119 if ( !count( $jobs ) ) {
120 return true;
121 }
122
123 // Convert the jobs into a list of field maps
124 $items = array(); // (uid => job fields map)
125 foreach ( $jobs as $job ) {
126 $item = $this->getNewJobFields( $job );
127 $items[$item['uid']] = $item;
128 }
129
130 $dedupUids = array(); // list of uids to check for duplicates
131 foreach ( $items as $item ) {
132 if ( $this->isHashUid( $item['uid'] ) ) { // hash identifier => de-duplicate
133 $dedupUids[] = $item['uid'];
134 }
135 }
136
137 $conn = $this->getConnection();
138 try {
139 // Find which of these jobs are duplicates of unclaimed jobs in the queue...
140 if ( count( $dedupUids ) ) {
141 $conn->multi( Redis::PIPELINE );
142 foreach ( $dedupUids as $uid ) { // check if job data exists
143 $conn->exists( $this->prefixWithQueueKey( 'data', $uid ) );
144 }
145 if ( $this->claimTTL > 0 ) { // check which jobs were claimed
146 foreach ( $dedupUids as $uid ) {
147 $conn->hExists( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime' );
148 }
149 list( $exists, $claimed ) = array_chunk( $conn->exec(), count( $dedupUids ) );
150 } else {
151 $exists = $conn->exec();
152 $claimed = array(); // no claim system
153 }
154 // Remove the duplicate jobs to cut down on pushing duplicate uids...
155 foreach ( $dedupUids as $k => $uid ) {
156 if ( $exists[$k] && empty( $claimed[$k] ) ) {
157 unset( $items[$uid] );
158 }
159 }
160 }
161 // Actually push the non-duplicate jobs into the queue...
162 if ( count( $items ) ) {
163 $uids = array_keys( $items );
164 $conn->multi( Redis::MULTI ); // begin (atomic trx)
165 $conn->mSet( $this->prefixKeysWithQueueKey( 'data', $items ) );
166 call_user_func_array(
167 array( $conn, 'lPush' ),
168 array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uids )
169 );
170 $res = $conn->exec(); // commit (atomic trx)
171 if ( in_array( false, $res, true ) ) {
172 wfDebugLog( 'JobQueueRedis', "Could not insert {$this->type} job(s)." );
173 return false;
174 }
175 }
176 wfIncrStats( 'job-insert', count( $items ) );
177 wfIncrStats( 'job-insert-duplicate', count( $jobs ) - count( $items ) );
178 } catch ( RedisException $e ) {
179 $this->throwRedisException( $this->server, $conn, $e );
180 }
181
182 return true;
183 }
184
185 /**
186 * @see JobQueue::doPop()
187 * @return Job|bool
188 * @throws MWException
189 */
190 protected function doPop() {
191 $job = false;
192
193 if ( mt_rand( 0, 99 ) == 0 ) {
194 $this->doInternalMaintenance();
195 }
196
197 $conn = $this->getConnection();
198 try {
199 do {
200 // Atomically pop an item off the queue and onto the "claimed" list
201 $uid = $conn->rpoplpush(
202 $this->getQueueKey( 'l-unclaimed' ),
203 $this->getQueueKey( 'l-claimed' )
204 );
205 if ( $uid === false ) {
206 break; // no jobs; nothing to do
207 }
208
209 wfIncrStats( 'job-pop' );
210 $conn->multi( Redis::PIPELINE );
211 $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
212 if ( $this->claimTTL > 0 ) {
213 // Set the claim timestamp metadata. If this step fails, then
214 // the timestamp will be assumed to be the current timestamp by
215 // recycleAndDeleteStaleJobs() as of the next time that it runs.
216 // If two runners claim duplicate jobs, one will abort here.
217 $conn->hSetNx( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', time() );
218 } else {
219 // If this fails, the message key will be deleted in cleanupClaimedJobs().
220 // If two runners claim duplicate jobs, one of them will abort here.
221 $conn->delete(
222 $this->prefixWithQueueKey( 'h-meta', $uid ),
223 $this->prefixWithQueueKey( 'data', $uid ) );
224 }
225 list( $item, $ok ) = $conn->exec();
226 if ( $item === false || ( $this->claimTTL && !$ok ) ) {
227 wfDebug( "Could not find or delete job $uid; probably was a duplicate." );
228 continue; // job was probably a duplicate
229 }
230
231 // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
232 $job = $this->getJobFromFields( $item ); // may be false
233 } while ( !$job ); // job may be false if invalid
234 } catch ( RedisException $e ) {
235 $this->throwRedisException( $this->server, $conn, $e );
236 }
237
238 // Flag this job as an old duplicate based on its "root" job...
239 try {
240 if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
241 wfIncrStats( 'job-pop-duplicate' );
242 return DuplicateJob::newFromJob( $job ); // convert to a no-op
243 }
244 } catch ( MWException $e ) {} // don't lose jobs over this
245
246 return $job;
247 }
248
249 /**
250 * @see JobQueue::doAck()
251 * @param Job $job
252 * @return Job|bool
253 * @throws MWException
254 */
255 protected function doAck( Job $job ) {
256 if ( $this->claimTTL > 0 ) {
257 $conn = $this->getConnection();
258 try {
259 // Get the exact field map this Job came from, regardless of whether
260 // the job was transformed into a DuplicateJob or anything of the sort.
261 $item = $job->metadata['sourceFields'];
262
263 $conn->multi( Redis::MULTI ); // begin (atomic trx)
264 // Remove the first instance of this job scanning right-to-left.
265 // This is O(N) in the worst case, but is likely to be much faster since
266 // jobs are pushed to the left and we are starting from the right, where
267 // the longest running jobs are likely to be. These should be the first
268 // jobs to be acknowledged assuming that job run times are roughly equal.
269 $conn->lRem( $this->getQueueKey( 'l-claimed' ), $item['uid'], -1 );
270 // Delete the job data and its claim metadata
271 $conn->delete(
272 $this->prefixWithQueueKey( 'h-meta', $item['uid'] ),
273 $this->prefixWithQueueKey( 'data', $item['uid'] ) );
274 $res = $conn->exec(); // commit (atomic trx)
275
276 if ( in_array( false, $res, true ) ) {
277 wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
278 return false;
279 }
280 } catch ( RedisException $e ) {
281 $this->throwRedisException( $this->server, $conn, $e );
282 }
283 }
284 return true;
285 }
286
287 /**
288 * @see JobQueue::doDeduplicateRootJob()
289 * @param Job $job
290 * @return bool
291 * @throws MWException
292 */
293 protected function doDeduplicateRootJob( Job $job ) {
294 $params = $job->getParams();
295 if ( !isset( $params['rootJobSignature'] ) ) {
296 throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
297 } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
298 throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
299 }
300 $key = $this->getRootJobKey( $params['rootJobSignature'] );
301
302 $conn = $this->getConnection();
303 try {
304 $timestamp = $conn->get( $key ); // current last timestamp of this job
305 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
306 return true; // a newer version of this root job was enqueued
307 }
308 // Update the timestamp of the last root job started at the location...
309 return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
310 } catch ( RedisException $e ) {
311 $this->throwRedisException( $this->server, $conn, $e );
312 }
313 }
314
315 /**
316 * Check if the "root" job of a given job has been superseded by a newer one
317 *
318 * @param $job Job
319 * @return bool
320 * @throws MWException
321 */
322 protected function isRootJobOldDuplicate( Job $job ) {
323 $params = $job->getParams();
324 if ( !isset( $params['rootJobSignature'] ) ) {
325 return false; // job has no de-deplication info
326 } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
327 wfDebugLog( 'JobQueueRedis', "Cannot check root job; missing 'rootJobTimestamp'." );
328 return false;
329 }
330
331 $conn = $this->getConnection();
332 try {
333 // Get the last time this root job was enqueued
334 $timestamp = $conn->get( $this->getRootJobKey( $params['rootJobSignature'] ) );
335 } catch ( RedisException $e ) {
336 $this->throwRedisException( $this->server, $conn, $e );
337 }
338
339 // Check if a new root job was started at the location after this one's...
340 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
341 }
342
343 /**
344 * Do any job recycling or queue cleanup as needed
345 *
346 * @return void
347 * @return integer Number of jobs recycled/deleted
348 * @throws MWException
349 */
350 protected function doInternalMaintenance() {
351 return ( $this->claimTTL > 0 ) ?
352 $this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
353 }
354
355 /**
356 * Recycle or destroy any jobs that have been claimed for too long
357 *
358 * @return integer Number of jobs recycled/deleted
359 * @throws MWException
360 */
361 protected function recycleAndDeleteStaleJobs() {
362 $count = 0;
363 // For each job item that can be retried, we need to add it back to the
364 // main queue and remove it from the list of currenty claimed job items.
365 $conn = $this->getConnection();
366 try {
367 // Avoid duplicate insertions of items to be re-enqueued
368 $conn->multi( Redis::MULTI );
369 $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
370 $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
371 if ( $conn->exec() !== array( true, true ) ) { // lock
372 return $count; // already in progress
373 }
374
375 $now = time();
376 $claimCutoff = $now - $this->claimTTL;
377 $pruneCutoff = $now - self::MAX_AGE_PRUNE;
378
379 // Get the list of all claimed jobs
380 $claimedUids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
381 // Get a map of (uid => claim metadata) for all claimed jobs
382 $metadata = $conn->mGet( $this->prefixValuesWithQueueKey( 'h-meta', $claimedUids ) );
383
384 $uidsPush = array(); // items IDs to move to the "unclaimed" queue
385 $uidsRemove = array(); // item IDs to remove from "claimed" queue
386 foreach ( $claimedUids as $i => $uid ) { // all claimed items
387 $info = $metadata[$i] ? $metadata[$i] : array();
388 if ( isset( $info['ctime'] ) || isset( $info['rctime'] ) ) {
389 // Prefer "ctime" (set by pop()) over "rctime" (set by this function)
390 $ctime = isset( $info['ctime'] ) ? $info['ctime'] : $info['rctime'];
391 // Claimed job claimed for too long?
392 if ( $ctime < $claimCutoff ) {
393 // Get the number of failed attempts
394 $attempts = isset( $info['attempts'] ) ? $info['attempts'] : 0;
395 if ( $attempts < self::MAX_ATTEMPTS ) {
396 $uidsPush[] = $uid; // retry it
397 } elseif ( $ctime < $pruneCutoff ) {
398 $uidsRemove[] = $uid; // just remove it
399 }
400 }
401 } else {
402 // If pop() failed to set the claim timestamp, set it to the current time.
403 // Since that function sets this non-atomically *after* moving the job to
404 // the "claimed" queue, it may be the case that it just didn't set it yet.
405 $conn->hSet( $this->prefixWithQueueKey( 'h-meta', $uid ), 'rctime', $now );
406 }
407 }
408
409 $conn->multi( Redis::MULTI ); // begin (atomic trx)
410 if ( count( $uidsPush ) ) { // move from "l-claimed" to "l-unclaimed"
411 call_user_func_array(
412 array( $conn, 'lPush' ),
413 array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uidsPush )
414 );
415 foreach ( $uidsPush as $uid ) {
416 $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
417 $conn->hDel( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', 'rctime' );
418 $conn->hIncrBy( $this->prefixWithQueueKey( 'h-meta', $uid ), 'attempts', 1 );
419 }
420 }
421 foreach ( $uidsRemove as $uid ) { // remove from "l-claimed"
422 $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
423 $conn->delete( // delete job data and metadata
424 $this->prefixWithQueueKey( 'h-meta', $uid ),
425 $this->prefixWithQueueKey( 'data', $uid ) );
426 }
427 $res = $conn->exec(); // commit (atomic trx)
428
429 if ( in_array( false, $res, true ) ) {
430 wfDebugLog( 'JobQueueRedis', "Could not recycle {$this->type} job(s)." );
431 } else {
432 $count += ( count( $uidsPush ) + count( $uidsRemove ) );
433 wfIncrStats( 'job-recycle', count( $uidsPush ) );
434 }
435
436 $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
437 } catch ( RedisException $e ) {
438 $this->throwRedisException( $this->server, $conn, $e );
439 }
440
441 return $count;
442 }
443
444 /**
445 * Destroy any jobs that have been claimed
446 *
447 * @return integer Number of jobs deleted
448 * @throws MWException
449 */
450 protected function cleanupClaimedJobs() {
451 $count = 0;
452 // Make sure the message for claimed jobs was deleted
453 // and remove the claimed job IDs from the "claimed" list.
454 $conn = $this->getConnection();
455 try {
456 // Avoid races and duplicate effort
457 $conn->multi( Redis::MULTI );
458 $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
459 $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
460 if ( $conn->exec() !== array( true, true ) ) { // lock
461 return $count; // already in progress
462 }
463 // Get the list of all claimed jobs
464 $uids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
465 if ( count( $uids ) ) {
466 // Delete the message keys and delist the corresponding ids.
467 // Since the only other changes to "l-claimed" are left pushes, we can just strip
468 // off the elements read here using a right trim based on the number of ids read.
469 $conn->multi( Redis::MULTI ); // begin (atomic trx)
470 $conn->lTrim( $this->getQueueKey( 'l-claimed' ), 0, -count( $uids ) - 1 );
471 $conn->delete( array_merge(
472 $this->prefixValuesWithQueueKey( 'h-meta', $uids ),
473 $this->prefixValuesWithQueueKey( 'data', $uids )
474 ) );
475 $res = $conn->exec(); // commit (atomic trx)
476
477 if ( in_array( false, $res, true ) ) {
478 wfDebugLog( 'JobQueueRedis', "Could not purge {$this->type} job(s)." );
479 } else {
480 $count += count( $uids );
481 }
482 }
483 $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
484 } catch ( RedisException $e ) {
485 $this->throwRedisException( $this->server, $conn, $e );
486 }
487
488 return $count;
489 }
490
491 /**
492 * @param $job Job
493 * @return array
494 */
495 protected function getNewJobFields( Job $job ) {
496 return array(
497 // Fields that describe the nature of the job
498 'type' => $job->getType(),
499 'namespace' => $job->getTitle()->getNamespace(),
500 'title' => $job->getTitle()->getDBkey(),
501 'params' => $job->getParams(),
502 // Additional metadata
503 'uid' => $job->ignoreDuplicates()
504 ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
505 : wfRandomString( 32 ),
506 'timestamp' => time() // UNIX timestamp
507 );
508 }
509
510 /**
511 * @param $fields array
512 * @return Job|bool
513 */
514 protected function getJobFromFields( array $fields ) {
515 $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
516 if ( $title ) {
517 $job = Job::factory( $fields['type'], $title, $fields['params'] );
518 $job->metadata['sourceFields'] = $fields;
519 return $job;
520 }
521 return false;
522 }
523
524 /**
525 * @param $uid string Job UID
526 * @return bool Whether $uid is a SHA-1 hash based identifier for de-duplication
527 */
528 protected function isHashUid( $uid ) {
529 return strlen( $uid ) == 31;
530 }
531
532 /**
533 * Get a connection to the server that handles all sub-queues for this queue
534 *
535 * @return Array (server name, Redis instance)
536 * @throws MWException
537 */
538 protected function getConnection() {
539 $conn = $this->redisPool->getConnection( $this->server );
540 if ( !$conn ) {
541 throw new MWException( "Unable to connect to redis server." );
542 }
543 return $conn;
544 }
545
546 /**
547 * @param $server string
548 * @param $conn RedisConnRef
549 * @param $e RedisException
550 * @throws MWException
551 */
552 protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
553 $this->redisPool->handleException( $server, $conn, $e );
554 throw new MWException( "Redis server error: {$e->getMessage()}\n" );
555 }
556
557 /**
558 * @param $prop string
559 * @return string
560 */
561 private function getQueueKey( $prop ) {
562 list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
563 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
564 }
565
566 /**
567 * @param string $signature Hash identifier of the root job
568 * @return string
569 */
570 private function getRootJobKey( $signature ) {
571 list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
572 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
573 }
574
575 /**
576 * @param $prop string
577 * @param $string string
578 * @return string
579 */
580 private function prefixWithQueueKey( $prop, $string ) {
581 return $this->getQueueKey( $prop ) . ':' . $string;
582 }
583
584 /**
585 * @param $prop string
586 * @param $items array
587 * @return Array
588 */
589 private function prefixValuesWithQueueKey( $prop, array $items ) {
590 $res = array();
591 foreach ( $items as $item ) {
592 $res[] = $this->prefixWithQueueKey( $prop, $item );
593 }
594 return $res;
595 }
596
597 /**
598 * @param $prop string
599 * @param $items array
600 * @return Array
601 */
602 private function prefixKeysWithQueueKey( $prop, array $items ) {
603 $res = array();
604 foreach ( $items as $key => $item ) {
605 $res[$this->prefixWithQueueKey( $prop, $key )] = $item;
606 }
607 return $res;
608 }
609 }