Merge "Add missing subjectspace pages to watchlist on update."
[lhc/web/wiklou.git] / includes / job / JobQueueDB.php
index 0e68355..79ff4e8 100644 (file)
@@ -37,7 +37,8 @@ class JobQueueDB extends JobQueue {
        /** @var BagOStuff */
        protected $cache;
 
-       protected $cluster = false; // string; name of an external DB cluster
+       /** @var bool|string Name of an external DB cluster. False if not set */
+       protected $cluster = false;
 
        /**
         * Additional parameters include:
@@ -45,7 +46,7 @@ class JobQueueDB extends JobQueue {
         *               If not specified, the primary DB cluster for the wiki will be used.
         *               This can be overridden with a custom cluster so that DB handles will
         *               be retrieved via LBFactory::getExternalLB() and getConnection().
-        * @param $params array
+        * @param array $params
         */
        protected function __construct( array $params ) {
                global $wgMemc;
@@ -79,10 +80,14 @@ class JobQueueDB extends JobQueue {
                        return false;
                }
 
-               list( $dbr, $scope ) = $this->getSlaveDB();
-               $found = $dbr->selectField( // unclaimed job
-                       'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
-               );
+               $dbr = $this->getSlaveDB();
+               try {
+                       $found = $dbr->selectField( // unclaimed job
+                               'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
 
                return !$found;
@@ -90,7 +95,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doGetSize()
-        * @return integer
+        * @return int
         */
        protected function doGetSize() {
                $key = $this->getCacheKey( 'size' );
@@ -100,11 +105,15 @@ class JobQueueDB extends JobQueue {
                        return $size;
                }
 
-               list( $dbr, $scope ) = $this->getSlaveDB();
-               $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
-                       array( 'job_cmd' => $this->type, 'job_token' => '' ),
-                       __METHOD__
-               );
+               try {
+                       $dbr = $this->getSlaveDB();
+                       $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                               array( 'job_cmd' => $this->type, 'job_token' => '' ),
+                               __METHOD__
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
 
                return $size;
@@ -112,7 +121,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doGetAcquiredCount()
-        * @return integer
+        * @return int
         */
        protected function doGetAcquiredCount() {
                if ( $this->claimTTL <= 0 ) {
@@ -126,11 +135,15 @@ class JobQueueDB extends JobQueue {
                        return $count;
                }
 
-               list( $dbr, $scope ) = $this->getSlaveDB();
-               $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
-                       array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
-                       __METHOD__
-               );
+               $dbr = $this->getSlaveDB();
+               try {
+                       $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                               array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
+                               __METHOD__
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
@@ -138,7 +151,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doGetAbandonedCount()
-        * @return integer
+        * @return int
         * @throws MWException
         */
        protected function doGetAbandonedCount() {
@@ -155,15 +168,19 @@ class JobQueueDB extends JobQueue {
                        return $count;
                }
 
-               list( $dbr, $scope ) = $this->getSlaveDB();
-               $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
-                       array(
-                               'job_cmd' => $this->type,
-                               "job_token != {$dbr->addQuotes( '' )}",
-                               "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
-                       ),
-                       __METHOD__
-               );
+               $dbr = $this->getSlaveDB();
+               try {
+                       $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+                               array(
+                                       'job_cmd' => $this->type,
+                                       "job_token != {$dbr->addQuotes( '' )}",
+                                       "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
+                               ),
+                               __METHOD__
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
                $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
 
                return $count;
@@ -177,12 +194,12 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doBatchPush( array $jobs, $flags ) {
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
 
                $that = $this;
                $method = __METHOD__;
                $dbw->onTransactionIdle(
-                       function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) {
+                       function () use ( $dbw, $that, $jobs, $flags, $method ) {
                                $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
                        }
                );
@@ -193,14 +210,14 @@ class JobQueueDB extends JobQueue {
        /**
         * This function should *not* be called outside of JobQueueDB
         *
-        * @param DatabaseBase $dbw
+        * @param IDatabase $dbw
         * @param array $jobs
         * @param int $flags
         * @param string $method
-        * @return boolean
-        * @throws type
+        * @throws DBError
+        * @return bool
         */
-       public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) {
+       public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
                if ( !count( $jobs ) ) {
                        return true;
                }
@@ -231,7 +248,7 @@ class JobQueueDB extends JobQueue {
                                        $method
                                );
                                foreach ( $res as $row ) {
-                                       wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
+                                       wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
                                        unset( $rowSet[$row->job_sha1] ); // already enqueued
                                }
                        }
@@ -242,8 +259,11 @@ class JobQueueDB extends JobQueue {
                                $dbw->insert( 'job', $rowBatch, $method );
                        }
                        JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
-                       JobQueue::incrStats( 'job-insert-duplicate', $this->type,
-                               count( $rowSet ) + count( $rowList ) - count( $rows ) );
+                       JobQueue::incrStats(
+                               'job-insert-duplicate',
+                               $this->type,
+                               count( $rowSet ) + count( $rowList ) - count( $rows )
+                       );
                } catch ( DBError $e ) {
                        if ( $flags & self::QOS_ATOMIC ) {
                                $dbw->rollback( $method );
@@ -268,44 +288,47 @@ class JobQueueDB extends JobQueue {
                        return false; // queue is empty
                }
 
-               list( $dbw, $scope ) = $this->getMasterDB();
-               $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
-               $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
-               $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
-               $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
-                       $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
-               } );
-
-               $uuid = wfRandomString( 32 ); // pop attempt
-               $job = false; // job popped off
-               do { // retry when our row is invalid or deleted as a duplicate
-                       // Try to reserve a row in the DB...
-                       if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
-                               $row = $this->claimOldest( $uuid );
-                       } else { // random first
-                               $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
-                               $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
-                               $row = $this->claimRandom( $uuid, $rand, $gte );
-                       }
-                       // Check if we found a row to reserve...
-                       if ( !$row ) {
-                               $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
-                               break; // nothing to do
-                       }
-                       JobQueue::incrStats( 'job-pop', $this->type );
-                       // Get the job object from the row...
-                       $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
-                       if ( !$title ) {
-                               $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
-                               wfDebug( "Row has invalid title '{$row->job_title}'." );
-                               continue; // try again
-                       }
-                       $job = Job::factory( $row->job_cmd, $title,
-                               self::extractBlob( $row->job_params ), $row->job_id );
-                       $job->metadata['id'] = $row->job_id;
-                       $job->id = $row->job_id; // XXX: work around broken subclasses
-                       break; // done
-               } while ( true );
+               $dbw = $this->getMasterDB();
+               try {
+                       $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
+                       $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+                       $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+                       $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
+                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+                       } );
+
+                       $uuid = wfRandomString( 32 ); // pop attempt
+                       $job = false; // job popped off
+                       do { // retry when our row is invalid or deleted as a duplicate
+                               // Try to reserve a row in the DB...
+                               if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
+                                       $row = $this->claimOldest( $uuid );
+                               } else { // random first
+                                       $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+                                       $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+                                       $row = $this->claimRandom( $uuid, $rand, $gte );
+                               }
+                               // Check if we found a row to reserve...
+                               if ( !$row ) {
+                                       $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
+                                       break; // nothing to do
+                               }
+                               JobQueue::incrStats( 'job-pop', $this->type );
+                               // Get the job object from the row...
+                               $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+                               if ( !$title ) {
+                                       $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+                                       wfDebug( "Row has invalid title '{$row->job_title}'." );
+                                       continue; // try again
+                               }
+                               $job = Job::factory( $row->job_cmd, $title,
+                                       self::extractBlob( $row->job_params ), $row->job_id );
+                               $job->metadata['id'] = $row->job_id;
+                               break; // done
+                       } while ( true );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
 
                return $job;
        }
@@ -316,10 +339,10 @@ class JobQueueDB extends JobQueue {
         * @param string $uuid 32 char hex string
         * @param $rand integer Random unsigned integer (31 bits)
         * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
-        * @return Row|false
+        * @return stdClass|bool Row|false
         */
        protected function claimRandom( $uuid, $rand, $gte ) {
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
                // Check cache to see if the queue has <= OFFSET items
                $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
 
@@ -366,6 +389,7 @@ class JobQueueDB extends JobQueue {
                                        continue; // use job_random
                                }
                        }
+
                        if ( $row ) { // claim the job
                                $dbw->update( 'job', // update by PK
                                        array(
@@ -392,10 +416,10 @@ class JobQueueDB extends JobQueue {
         * Reserve a row with a single UPDATE without holding row locks over RTTs...
         *
         * @param string $uuid 32 char hex string
-        * @return Row|false
+        * @return stdClass|bool Row|false
         */
        protected function claimOldest( $uuid ) {
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
 
                $row = false; // the row acquired
                do {
@@ -460,17 +484,21 @@ class JobQueueDB extends JobQueue {
                        throw new MWException( "Job of type '{$job->getType()}' has no ID." );
                }
 
-               list( $dbw, $scope ) = $this->getMasterDB();
-               $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
-               $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
-               $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
-               $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
-                       $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
-               } );
-
-               // Delete a row with a single DELETE without holding row locks over RTTs...
-               $dbw->delete( 'job',
-                       array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+               $dbw = $this->getMasterDB();
+               try {
+                       $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
+                       $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+                       $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+                       $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
+                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+                       } );
+
+                       // Delete a row with a single DELETE without holding row locks over RTTs...
+                       $dbw->delete( 'job',
+                               array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
 
                return true;
        }
@@ -494,9 +522,9 @@ class JobQueueDB extends JobQueue {
                // deferred till "transaction idle", do the same here, so that the ordering is
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
-               list( $dbw, $scope ) = $this->getMasterDB();
-               $cache = $this->cache;
-               $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $scope ) {
+               $dbw = $this->getMasterDB();
+               $cache = $this->dupCache;
+               $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
                        $timestamp = $cache->get( $key ); // current last timestamp of this job
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
@@ -514,9 +542,13 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doDelete() {
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
+               try {
+                       $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
 
-               $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
                return true;
        }
 
@@ -529,7 +561,7 @@ class JobQueueDB extends JobQueue {
        }
 
        /**
-        * @return Array
+        * @return array
         */
        protected function doGetPeriodicTasks() {
                return array(
@@ -554,163 +586,210 @@ class JobQueueDB extends JobQueue {
         * @return Iterator
         */
        public function getAllQueuedJobs() {
-               list( $dbr, $scope ) = $this->getSlaveDB();
-               return new MappedIterator(
-                       $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
-                       function( $row ) use ( $scope ) {
-                               $job = Job::factory(
-                                       $row->job_cmd,
-                                       Title::makeTitle( $row->job_namespace, $row->job_title ),
-                                       strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
-                                       $row->job_id
-                               );
-                               $job->metadata['id'] = $row->job_id;
-                               $job->id = $row->job_id; // XXX: work around broken subclasses
-                               return $job;
-                       }
-               );
+               $dbr = $this->getSlaveDB();
+               try {
+                       return new MappedIterator(
+                               $dbr->select( 'job', '*',
+                                       array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
+                               function ( $row ) use ( $dbr ) {
+                                       $job = Job::factory(
+                                               $row->job_cmd,
+                                               Title::makeTitle( $row->job_namespace, $row->job_title ),
+                                               strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
+                                       );
+                                       $job->metadata['id'] = $row->job_id;
+                                       return $job;
+                               }
+                       );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
+               }
+       }
+
+       public function getCoalesceLocationInternal() {
+               return $this->cluster
+                       ? "DBCluster:{$this->cluster}:{$this->wiki}"
+                       : "LBFactory:{$this->wiki}";
+       }
+
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               $dbr = $this->getSlaveDB();
+               $res = $dbr->select( 'job', 'DISTINCT job_cmd',
+                       array( 'job_cmd' => $types ), __METHOD__ );
+
+               $types = array();
+               foreach ( $res as $row ) {
+                       $types[] = $row->job_cmd;
+               }
+
+               return $types;
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $dbr = $this->getSlaveDB();
+               $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
+                       array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
+
+               $sizes = array();
+               foreach ( $res as $row ) {
+                       $sizes[$row->job_cmd] = (int)$row->count;
+               }
+
+               return $sizes;
        }
 
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *
-        * @return integer Number of jobs recycled/deleted
+        * @return int Number of jobs recycled/deleted
         */
        public function recycleAndDeleteStaleJobs() {
                $now = time();
-               list( $dbw, $scope ) = $this->getMasterDB();
                $count = 0; // affected rows
+               $dbw = $this->getMasterDB();
 
-               if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
-                       return $count; // already in progress
-               }
+               try {
+                       if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
+                               return $count; // already in progress
+                       }
 
-               // Remove claims on jobs acquired for too long if enabled...
-               if ( $this->claimTTL > 0 ) {
-                       $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
-                       // Get the IDs of jobs that have be claimed but not finished after too long.
-                       // These jobs can be recycled into the queue by expiring the claim. Selecting
-                       // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
-                       $res = $dbw->select( 'job', 'job_id',
-                               array(
-                                       'job_cmd' => $this->type,
-                                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
-                                       "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
-                                       "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
-                               __METHOD__
+                       // Remove claims on jobs acquired for too long if enabled...
+                       if ( $this->claimTTL > 0 ) {
+                               $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+                               // Get the IDs of jobs that have be claimed but not finished after too long.
+                               // These jobs can be recycled into the queue by expiring the claim. Selecting
+                               // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+                               $res = $dbw->select( 'job', 'job_id',
+                                       array(
+                                               'job_cmd' => $this->type,
+                                               "job_token != {$dbw->addQuotes( '' )}", // was acquired
+                                               "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+                                               "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
+                                       __METHOD__
+                               );
+                               $ids = array_map(
+                                       function ( $o ) {
+                                               return $o->job_id;
+                                       }, iterator_to_array( $res )
+                               );
+                               if ( count( $ids ) ) {
+                                       // Reset job_token for these jobs so that other runners will pick them up.
+                                       // Set the timestamp to the current time, as it is useful to now that the job
+                                       // was already tried before (the timestamp becomes the "released" time).
+                                       $dbw->update( 'job',
+                                               array(
+                                                       'job_token' => '',
+                                                       'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+                                               array(
+                                                       'job_id' => $ids ),
+                                               __METHOD__
+                                       );
+                                       $count += $dbw->affectedRows();
+                                       JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() );
+                                       $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+                               }
+                       }
+
+                       // Just destroy any stale jobs...
+                       $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+                       $conds = array(
+                               'job_cmd' => $this->type,
+                               "job_token != {$dbw->addQuotes( '' )}", // was acquired
+                               "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
                        );
+                       if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+                               $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
+                       }
+                       // Get the IDs of jobs that are considered stale and should be removed. Selecting
+                       // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+                       $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
                        $ids = array_map(
-                               function( $o ) {
+                               function ( $o ) {
                                        return $o->job_id;
                                }, iterator_to_array( $res )
                        );
                        if ( count( $ids ) ) {
-                               // Reset job_token for these jobs so that other runners will pick them up.
-                               // Set the timestamp to the current time, as it is useful to now that the job
-                               // was already tried before (the timestamp becomes the "released" time).
-                               $dbw->update( 'job',
-                                       array(
-                                               'job_token' => '',
-                                               'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
-                                       array(
-                                               'job_id' => $ids ),
-                                       __METHOD__
-                               );
+                               $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
                                $count += $dbw->affectedRows();
-                               JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() );
-                               $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+                               JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() );
                        }
-               }
 
-               // Just destroy any stale jobs...
-               $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
-               $conds = array(
-                       'job_cmd' => $this->type,
-                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
-                       "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
-               );
-               if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
-                       $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
-               }
-               // Get the IDs of jobs that are considered stale and should be removed. Selecting
-               // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
-               $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
-               $ids = array_map(
-                       function( $o ) {
-                               return $o->job_id;
-                       }, iterator_to_array( $res )
-               );
-               if ( count( $ids ) ) {
-                       $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
-                       $count += $dbw->affectedRows();
-                       JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() );
+                       $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
+               } catch ( DBError $e ) {
+                       $this->throwDBException( $e );
                }
 
-               $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
-
                return $count;
        }
 
        /**
-        * @return Array (DatabaseBase, ScopedCallback)
+        * @param Job $job
+        * @return array
+        */
+       protected function insertFields( Job $job ) {
+               $dbw = $this->getMasterDB();
+
+               return array(
+                       // Fields that describe the nature of the job
+                       'job_cmd' => $job->getType(),
+                       'job_namespace' => $job->getTitle()->getNamespace(),
+                       'job_title' => $job->getTitle()->getDBkey(),
+                       'job_params' => self::makeBlob( $job->getParams() ),
+                       // Additional job metadata
+                       'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+                       'job_timestamp' => $dbw->timestamp(),
+                       'job_sha1' => wfBaseConvert(
+                               sha1( serialize( $job->getDeduplicationInfo() ) ),
+                               16, 36, 31
+                       ),
+                       'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
+               );
+       }
+
+       /**
+        * @throws JobQueueConnectionError
+        * @return DBConnRef
         */
        protected function getSlaveDB() {
-               return $this->getDB( DB_SLAVE );
+               try {
+                       return $this->getDB( DB_SLAVE );
+               } catch ( DBConnectionError $e ) {
+                       throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
+               }
        }
 
        /**
-        * @return Array (DatabaseBase, ScopedCallback)
+        * @throws JobQueueConnectionError
+        * @return DBConnRef
         */
        protected function getMasterDB() {
-               return $this->getDB( DB_MASTER );
+               try {
+                       return $this->getDB( DB_MASTER );
+               } catch ( DBConnectionError $e ) {
+                       throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
+               }
        }
 
        /**
         * @param $index integer (DB_SLAVE/DB_MASTER)
-        * @return Array (DatabaseBase, ScopedCallback)
+        * @return DBConnRef
         */
        protected function getDB( $index ) {
                $lb = ( $this->cluster !== false )
                        ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
                        : wfGetLB( $this->wiki );
-               $conn = $lb->getConnection( $index, array(), $this->wiki );
-               return array(
-                       $conn,
-                       new ScopedCallback( function() use ( $lb, $conn ) {
-                               $lb->reuseConnection( $conn );
-                       } )
-               );
-       }
 
-       /**
-        * @param $job Job
-        * @return array
-        */
-       protected function insertFields( Job $job ) {
-               list( $dbw, $scope ) = $this->getMasterDB();
-               return array(
-                       // Fields that describe the nature of the job
-                       'job_cmd'       => $job->getType(),
-                       'job_namespace' => $job->getTitle()->getNamespace(),
-                       'job_title'     => $job->getTitle()->getDBkey(),
-                       'job_params'    => self::makeBlob( $job->getParams() ),
-                       // Additional job metadata
-                       'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
-                       'job_timestamp' => $dbw->timestamp(),
-                       'job_sha1'      => wfBaseConvert(
-                               sha1( serialize( $job->getDeduplicationInfo() ) ),
-                               16, 36, 31
-                       ),
-                       'job_random'    => mt_rand( 0, self::MAX_JOB_RANDOM )
-               );
+               return $lb->getConnectionRef( $index, array(), $this->wiki );
        }
 
        /**
+        * @param $property
         * @return string
         */
        private function getCacheKey( $property ) {
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
                $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
+
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
        }
 
@@ -737,4 +816,12 @@ class JobQueueDB extends JobQueue {
                        return false;
                }
        }
+
+       /**
+        * @param DBError $e
+        * @throws JobQueueError
+        */
+       protected function throwDBException( DBError $e ) {
+               throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
+       }
 }