From: Aaron Schulz Date: Thu, 21 Feb 2013 01:19:38 +0000 (-0800) Subject: [JobQueue] Added JobQueue::getAllQueuedJobs() function. X-Git-Tag: 1.31.0-rc.0~20427 X-Git-Url: http://git.cyclocoop.org/%22%20.%20generer_url_ecrire%28%22sites_tous%22%29%20.%20%22?a=commitdiff_plain;h=5cfcb3053d92925ddbf91167368e3ce08aacd16a;p=lhc%2Fweb%2Fwiklou.git [JobQueue] Added JobQueue::getAllQueuedJobs() function. * Also added a small MappedIterator class for convenience. Change-Id: I7dac1001a8e048cb524f7fbfee50a20a32e598ba --- diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 789538e8f4..3b5fbac7a3 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -168,6 +168,7 @@ $wgAutoloadLocalClasses = array( 'MagicWord' => 'includes/MagicWord.php', 'MagicWordArray' => 'includes/MagicWord.php', 'MailAddress' => 'includes/UserMailer.php', + 'MappedIterator' => 'includes/MappedIterator.php', 'MediaWiki' => 'includes/Wiki.php', 'MediaWiki_I18N' => 'includes/SkinTemplate.php', 'Message' => 'includes/Message.php', diff --git a/includes/MappedIterator.php b/includes/MappedIterator.php new file mode 100644 index 0000000000..243557ec57 --- /dev/null +++ b/includes/MappedIterator.php @@ -0,0 +1,97 @@ +baseIterator = new ArrayIterator( $iter ); + } elseif ( $iter instanceof Iterator ) { + $this->baseIterator = $iter; + } else { + throw new MWException( "Invalid base iterator provided." ); + } + $this->vCallback = $vCallback; + } + + /** + * @return void + */ + public function rewind() { + $this->baseIterator->rewind(); + } + + /** + * @return Mixed|null Returns null if out of range + */ + public function current() { + if ( !$this->baseIterator->valid() ) { + return null; // out of range + } + return call_user_func_array( $this->vCallback, array( $this->baseIterator->current() ) ); + } + + /** + * @return Mixed|null Returns null if out of range + */ + public function key() { + if ( !$this->baseIterator->valid() ) { + return null; // out of range + } + return $this->baseIterator->key(); + } + + /** + * @return Mixed|null Returns null if out of range + */ + public function next() { + $this->baseIterator->next(); + return $this->current(); // make sure callback is applied + } + + /** + * @return bool + */ + public function valid() { + return $this->baseIterator->valid(); + } +} diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index b7bbfe6170..8ade0d564a 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -378,6 +378,16 @@ abstract class JobQueue { */ protected function doFlushCaches() {} + /** + * Get an iterator to traverse over all of the jobs in this queue. + * This does not include jobs that are current acquired. In general, + * this should only be called on a queue that is no longer being popped. + * + * @return Iterator|Traversable|Array + * @throws MWException + */ + abstract public function getAllQueuedJobs(); + /** * Namespace the queue with a key to isolate it for testing * diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index fee1aaf2a6..ada0ac4a40 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -580,6 +580,27 @@ class JobQueueDB extends JobQueue { } } + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + list( $dbr, $scope ) = $this->getSlaveDB(); + return new MappedIterator( + $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), + function( $row ) use ( $scope ) { + $job = Job::factory( + $row->job_cmd, + Title::makeTitle( $row->job_namespace, $row->job_title ), + strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, + $row->job_id + ); + $job->id = $row->job_id; // XXX: work around broken subclasses + return $job; + } + ); + } + /** * @return Array (DatabaseBase, ScopedCallback) */ diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index a6b5bb321f..7dc9900dba 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -329,6 +329,52 @@ class JobQueueRedis extends JobQueue { return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); } + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + $conn = $this->getConnection(); + if ( !$conn ) { + throw new MWException( "Unable to connect to redis server." ); + } + try { + $that = $this; + return new MappedIterator( + $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), + function( $uid ) use ( $that, $conn ) { + return $that->getJobFromUidInternal( $uid, $conn ); + } + ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * This function should not be called outside RedisJobQueue + * + * @param $uid string + * @param $conn RedisConnRef + * @return Job + * @throws MWException + */ + public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { + try { + $fields = $conn->get( $this->prefixWithQueueKey( 'data', $uid ) ); + if ( !is_array( $fields ) ) { // wtf? + $conn->delete( $this->prefixWithQueueKey( 'data', $uid ) ); + throw new MWException( "Could not find job with UID '$uid'." ); + } + $title = Title::makeTitle( $fields['namespace'], $fields['title'] ); + $job = Job::factory( $fields['type'], $title, $fields['params'] ); + $job->metadata['sourceFields'] = $fields; + return $job; + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + /** * Recycle or destroy any jobs that have been claimed for too long *