From 450e9c3258f5c61d110dd0ea7f709a4ef6f8389d Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Tue, 18 Jun 2013 20:09:10 -0700 Subject: [PATCH] jobqueue: cleaned up JobQueue exception handling * Added JobQueueError exceptions. * Periodic tasks that fail are logged and skipped. * JobQueueFederated properly fails over now. Change-Id: I9d9f0dae548a9dde693a7cd25c255a8bfbf37899 --- includes/AutoLoader.php | 2 + includes/job/JobQueue.php | 37 ++-- includes/job/JobQueueDB.php | 331 +++++++++++++++++------------ includes/job/JobQueueFederated.php | 51 ++++- includes/job/JobQueueGroup.php | 10 +- includes/job/JobQueueRedis.php | 4 +- 6 files changed, 267 insertions(+), 168 deletions(-) diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index dc9acf3211..42d7d8869a 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -665,6 +665,8 @@ $wgAutoloadLocalClasses = array( 'JobQueueAggregatorMemc' => 'includes/job/aggregator/JobQueueAggregatorMemc.php', 'JobQueueAggregatorRedis' => 'includes/job/aggregator/JobQueueAggregatorRedis.php', 'JobQueueDB' => 'includes/job/JobQueueDB.php', + 'JobQueueConnectionError' => 'includes/job/JobQueue.php', + 'JobQueueError' => 'includes/job/JobQueue.php', 'JobQueueGroup' => 'includes/job/JobQueueGroup.php', 'JobQueueFederated' => 'includes/job/JobQueueFederated.php', 'JobQueueRedis' => 'includes/job/JobQueueRedis.php', diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index aa47432ff3..3e94b13bba 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -168,7 +168,7 @@ abstract class JobQueue { * not distinguishable from the race condition between isEmpty() and pop(). * * @return bool - * @throws MWException + * @throws JobQueueError */ final public function isEmpty() { wfProfileIn( __METHOD__ ); @@ -190,7 +190,7 @@ abstract class JobQueue { * If caching is used, this number might be out of date for a minute. * * @return integer - * @throws MWException + * @throws JobQueueError */ final public function getSize() { wfProfileIn( __METHOD__ ); @@ -212,7 +212,7 @@ abstract class JobQueue { * If caching is used, this number might be out of date for a minute. * * @return integer - * @throws MWException + * @throws JobQueueError */ final public function getAcquiredCount() { wfProfileIn( __METHOD__ ); @@ -234,7 +234,7 @@ abstract class JobQueue { * If caching is used, this number might be out of date for a minute. * * @return integer - * @throws MWException + * @throws JobQueueError * @since 1.22 */ final public function getDelayedCount() { @@ -259,7 +259,7 @@ abstract class JobQueue { * If caching is used, this number might be out of date for a minute. * * @return integer - * @throws MWException + * @throws JobQueueError */ final public function getAbandonedCount() { wfProfileIn( __METHOD__ ); @@ -284,7 +284,7 @@ abstract class JobQueue { * @param $jobs Job|Array * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) * @return bool Returns false on failure - * @throws MWException + * @throws JobQueueError */ final public function push( $jobs, $flags = 0 ) { return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); @@ -298,7 +298,7 @@ abstract class JobQueue { * @param array $jobs List of Jobs * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) * @return bool Returns false on failure - * @throws MWException + * @throws JobQueueError */ final public function batchPush( array $jobs, $flags = 0 ) { if ( !count( $jobs ) ) { @@ -333,7 +333,7 @@ abstract class JobQueue { * Outside callers should use JobQueueGroup::pop() instead of this function. * * @return Job|bool Returns false if there are no jobs - * @throws MWException + * @throws JobQueueError */ final public function pop() { global $wgJobClasses; @@ -374,7 +374,7 @@ abstract class JobQueue { * * @param $job Job * @return bool - * @throws MWException + * @throws JobQueueError */ final public function ack( Job $job ) { if ( $job->getType() !== $this->type ) { @@ -421,7 +421,7 @@ abstract class JobQueue { * * @param $job Job * @return bool - * @throws MWException + * @throws JobQueueError */ final public function deduplicateRootJob( Job $job ) { if ( $job->getType() !== $this->type ) { @@ -466,7 +466,7 @@ abstract class JobQueue { * * @param $job Job * @return bool - * @throws MWException + * @throws JobQueueError */ final protected function isRootJobOldDuplicate( Job $job ) { if ( $job->getType() !== $this->type ) { @@ -511,7 +511,7 @@ abstract class JobQueue { * Deleted all unclaimed and delayed jobs from the queue * * @return bool Success - * @throws MWException + * @throws JobQueueError * @since 1.22 */ final public function delete() { @@ -535,7 +535,7 @@ abstract class JobQueue { * This does nothing for certain queue classes. * * @return void - * @throws MWException + * @throws JobQueueError */ final public function waitForBackups() { wfProfileIn( __METHOD__ ); @@ -600,7 +600,7 @@ abstract class JobQueue { * Note: results may be stale if the queue is concurrently modified. * * @return Iterator - * @throws MWException + * @throws JobQueueError */ abstract public function getAllQueuedJobs(); @@ -609,7 +609,7 @@ abstract class JobQueue { * Note: results may be stale if the queue is concurrently modified. * * @return Iterator - * @throws MWException + * @throws JobQueueError * @since 1.22 */ public function getAllDelayedJobs() { @@ -640,3 +640,10 @@ abstract class JobQueue { throw new MWException( "Queue namespacing not supported for this queue type." ); } } + +/** + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueError extends MWException {} +class JobQueueConnectionError extends JobQueueError {} diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 56da4f3d8d..3fa06556cd 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -80,9 +80,13 @@ class JobQueueDB extends JobQueue { } list( $dbr, $scope ) = $this->getSlaveDB(); - $found = $dbr->selectField( // unclaimed job - 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ - ); + try { + $found = $dbr->selectField( // unclaimed job + 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); return !$found; @@ -100,11 +104,15 @@ class JobQueueDB extends JobQueue { return $size; } - list( $dbr, $scope ) = $this->getSlaveDB(); - $size = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( 'job_cmd' => $this->type, 'job_token' => '' ), - __METHOD__ - ); + try { + list( $dbr, $scope ) = $this->getSlaveDB(); + $size = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); return $size; @@ -127,10 +135,14 @@ class JobQueueDB extends JobQueue { } list( $dbr, $scope ) = $this->getSlaveDB(); - $count = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), - __METHOD__ - ); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); return $count; @@ -156,14 +168,18 @@ class JobQueueDB extends JobQueue { } list( $dbr, $scope ) = $this->getSlaveDB(); - $count = (int)$dbr->selectField( 'job', 'COUNT(*)', - array( - 'job_cmd' => $this->type, - "job_token != {$dbr->addQuotes( '' )}", - "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) - ), - __METHOD__ - ); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( + 'job_cmd' => $this->type, + "job_token != {$dbr->addQuotes( '' )}", + "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) + ), + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); return $count; @@ -269,43 +285,47 @@ class JobQueueDB extends JobQueue { } list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); - - $uuid = wfRandomString( 32 ); // pop attempt - $job = false; // job popped off - do { // retry when our row is invalid or deleted as a duplicate - // Try to reserve a row in the DB... - if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { - $row = $this->claimOldest( $uuid ); - } else { // random first - $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs - $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand - $row = $this->claimRandom( $uuid, $rand, $gte ); - } - // Check if we found a row to reserve... - if ( !$row ) { - $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); - break; // nothing to do - } - JobQueue::incrStats( 'job-pop', $this->type ); - // Get the job object from the row... - $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); - if ( !$title ) { - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - wfDebug( "Row has invalid title '{$row->job_title}'." ); - continue; // try again - } - $job = Job::factory( $row->job_cmd, $title, - self::extractBlob( $row->job_params ), $row->job_id ); - $job->metadata['id'] = $row->job_id; - $job->id = $row->job_id; // XXX: work around broken subclasses - break; // done - } while ( true ); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + $uuid = wfRandomString( 32 ); // pop attempt + $job = false; // job popped off + do { // retry when our row is invalid or deleted as a duplicate + // Try to reserve a row in the DB... + if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { + $row = $this->claimOldest( $uuid ); + } else { // random first + $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + $row = $this->claimRandom( $uuid, $rand, $gte ); + } + // Check if we found a row to reserve... + if ( !$row ) { + $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); + break; // nothing to do + } + JobQueue::incrStats( 'job-pop', $this->type ); + // Get the job object from the row... + $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); + if ( !$title ) { + $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); + wfDebug( "Row has invalid title '{$row->job_title}'." ); + continue; // try again + } + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + $job->metadata['id'] = $row->job_id; + $job->id = $row->job_id; // XXX: work around broken subclasses + break; // done + } while ( true ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } return $job; } @@ -461,16 +481,20 @@ class JobQueueDB extends JobQueue { } list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction - $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting - $dbw->clearFlag( DBO_TRX ); // make each query its own transaction - $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { - $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting - } ); - - // Delete a row with a single DELETE without holding row locks over RTTs... - $dbw->delete( 'job', - array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); + try { + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + // Delete a row with a single DELETE without holding row locks over RTTs... + $dbw->delete( 'job', + array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } return true; } @@ -516,7 +540,11 @@ class JobQueueDB extends JobQueue { protected function doDelete() { list( $dbw, $scope ) = $this->getMasterDB(); - $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); + try { + $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } return true; } @@ -555,20 +583,25 @@ class JobQueueDB extends JobQueue { */ public function getAllQueuedJobs() { list( $dbr, $scope ) = $this->getSlaveDB(); - return new MappedIterator( - $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function( $row ) use ( $scope ) { - $job = Job::factory( - $row->job_cmd, - Title::makeTitle( $row->job_namespace, $row->job_title ), - strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, - $row->job_id - ); - $job->metadata['id'] = $row->job_id; - $job->id = $row->job_id; // XXX: work around broken subclasses - return $job; - } - ); + try { + return new MappedIterator( + $dbr->select( 'job', '*', + array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), + function( $row ) use ( $scope ) { + $job = Job::factory( + $row->job_cmd, + Title::makeTitle( $row->job_namespace, $row->job_title ), + strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, + $row->job_id + ); + $job->metadata['id'] = $row->job_id; + $job->id = $row->job_id; // XXX: work around broken subclasses + return $job; + } + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } } /** @@ -578,76 +611,80 @@ class JobQueueDB extends JobQueue { */ public function recycleAndDeleteStaleJobs() { $now = time(); - list( $dbw, $scope ) = $this->getMasterDB(); $count = 0; // affected rows + list( $dbw, $scope ) = $this->getMasterDB(); - if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { - return $count; // already in progress - } + try { + if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { + return $count; // already in progress + } - // Remove claims on jobs acquired for too long if enabled... - if ( $this->claimTTL > 0 ) { - $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); - // Get the IDs of jobs that have be claimed but not finished after too long. - // These jobs can be recycled into the queue by expiring the claim. Selecting - // the IDs first means that the UPDATE can be done by primary key (less deadlocks). - $res = $dbw->select( 'job', 'job_id', - array( - 'job_cmd' => $this->type, - "job_token != {$dbw->addQuotes( '' )}", // was acquired - "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale - "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left - __METHOD__ + // Remove claims on jobs acquired for too long if enabled... + if ( $this->claimTTL > 0 ) { + $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); + // Get the IDs of jobs that have be claimed but not finished after too long. + // These jobs can be recycled into the queue by expiring the claim. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', + array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale + "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left + __METHOD__ + ); + $ids = array_map( + function( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before (the timestamp becomes the "released" time). + $dbw->update( 'job', + array( + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + array( + 'job_id' => $ids ), + __METHOD__ + ); + $count += $dbw->affectedRows(); + JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() ); + $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + } + } + + // Just destroy any stale jobs... + $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); + $conds = array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale ); + if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... + $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; + } + // Get the IDs of jobs that are considered stale and should be removed. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); if ( count( $ids ) ) { - // Reset job_token for these jobs so that other runners will pick them up. - // Set the timestamp to the current time, as it is useful to now that the job - // was already tried before (the timestamp becomes the "released" time). - $dbw->update( 'job', - array( - 'job_token' => '', - 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release - array( - 'job_id' => $ids ), - __METHOD__ - ); + $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); $count += $dbw->affectedRows(); - JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() ); - $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() ); } - } - // Just destroy any stale jobs... - $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); - $conds = array( - 'job_cmd' => $this->type, - "job_token != {$dbw->addQuotes( '' )}", // was acquired - "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale - ); - if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... - $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; - } - // Get the IDs of jobs that are considered stale and should be removed. Selecting - // the IDs first means that the UPDATE can be done by primary key (less deadlocks). - $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); - $ids = array_map( - function( $o ) { - return $o->job_id; - }, iterator_to_array( $res ) - ); - if ( count( $ids ) ) { - $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); - $count += $dbw->affectedRows(); - JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() ); + $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); } - $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); - return $count; } @@ -655,14 +692,22 @@ class JobQueueDB extends JobQueue { * @return Array (DatabaseBase, ScopedCallback) */ protected function getSlaveDB() { - return $this->getDB( DB_SLAVE ); + try { + return $this->getDB( DB_SLAVE ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } } /** * @return Array (DatabaseBase, ScopedCallback) */ protected function getMasterDB() { - return $this->getDB( DB_MASTER ); + try { + return $this->getDB( DB_MASTER ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } } /** @@ -737,4 +782,12 @@ class JobQueueDB extends JobQueue { return false; } } + + /** + * @param DBError $e + * @throws JobQueueError + */ + protected function throwDBException( DBError $e ) { + throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); + } } diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index 19de8bb529..35b80ca6a7 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -138,9 +138,13 @@ class JobQueueFederated extends JobQueue { } foreach ( $this->partitionQueues as $queue ) { - if ( !$queue->doIsEmpty() ) { - $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); - return false; + try { + if ( !$queue->doIsEmpty() ) { + $this->cache->add( $key, 'false', self::CACHE_TTL_LONG ); + return false; + } + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); } } @@ -179,7 +183,11 @@ class JobQueueFederated extends JobQueue { $count = 0; foreach ( $this->partitionQueues as $queue ) { - $count += $queue->$method(); + try { + $count += $queue->$method(); + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); + } } $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); @@ -244,7 +252,13 @@ class JobQueueFederated extends JobQueue { // Insert the de-duplicated jobs into the queues... foreach ( $uJobsByPartition as $partition => $jobBatch ) { $queue = $this->partitionQueues[$partition]; - if ( $queue->doBatchPush( $jobBatch, $flags ) ) { + try { + $ok = $queue->doBatchPush( $jobBatch, $flags ); + } catch ( JobQueueError $e ) { + $ok = false; + wfDebugLog( 'exception', $e->getLogMessage() ); + } + if ( $ok ) { $key = $this->getCacheKey( 'empty' ); $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); } else { @@ -259,7 +273,13 @@ class JobQueueFederated extends JobQueue { $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted } else { $queue = $this->partitionQueues[$partition]; - if ( $queue->doBatchPush( $jobBatch, $flags ) ) { + try { + $ok = $queue->doBatchPush( $jobBatch, $flags ); + } catch ( JobQueueError $e ) { + $ok = false; + wfDebugLog( 'exception', $e->getLogMessage() ); + } + if ( $ok ) { $key = $this->getCacheKey( 'empty' ); $this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); } else { @@ -288,7 +308,12 @@ class JobQueueFederated extends JobQueue { break; // all partitions at 0 weight } $queue = $this->partitionQueues[$partition]; - $job = $queue->pop(); + try { + $job = $queue->pop(); + } catch ( JobQueueError $e ) { + $job = false; + wfDebugLog( 'exception', $e->getLogMessage() ); + } if ( $job ) { $job->metadata['QueuePartition'] = $partition; return $job; @@ -336,13 +361,21 @@ class JobQueueFederated extends JobQueue { protected function doDelete() { foreach ( $this->partitionQueues as $queue ) { - $queue->doDelete(); + try { + $queue->doDelete(); + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); + } } } protected function doWaitForBackups() { foreach ( $this->partitionQueues as $queue ) { - $queue->waitForBackups(); + try { + $queue->waitForBackups(); + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); + } } } diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index 85f99b7167..e483e05797 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -310,9 +310,13 @@ class JobQueueGroup { } elseif ( !isset( $lastRuns[$type][$task] ) || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) { - if ( call_user_func( $definition['callback'] ) !== null ) { - $tasksRun[$type][$task] = time(); - ++$count; + try { + if ( call_user_func( $definition['callback'] ) !== null ) { + $tasksRun[$type][$task] = time(); + ++$count; + } + } catch ( JobQueueError $e ) { + wfDebugLog( 'exception', $e->getLogMessage() ); } } } diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 939fa42c17..57189a50c0 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -786,7 +786,7 @@ LUA; protected function getConnection() { $conn = $this->redisPool->getConnection( $this->server ); if ( !$conn ) { - throw new MWException( "Unable to connect to redis server." ); + throw new JobQueueConnectionError( "Unable to connect to redis server." ); } return $conn; } @@ -799,7 +799,7 @@ LUA; */ protected function throwRedisException( $server, RedisConnRef $conn, $e ) { $this->redisPool->handleException( $server, $conn, $e ); - throw new MWException( "Redis server error: {$e->getMessage()}\n" ); + throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); } /** -- 2.20.1