* Also added a small MappedIterator class for convenience.
Change-Id: I7dac1001a8e048cb524f7fbfee50a20a32e598ba
'MagicWord' => 'includes/MagicWord.php',
'MagicWordArray' => 'includes/MagicWord.php',
'MailAddress' => 'includes/UserMailer.php',
+ 'MappedIterator' => 'includes/MappedIterator.php',
'MediaWiki' => 'includes/Wiki.php',
'MediaWiki_I18N' => 'includes/SkinTemplate.php',
'Message' => 'includes/Message.php',
--- /dev/null
+<?php
+/**
+ * Convenience class for generating iterators from iterators.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Convenience class for generating iterators from iterators.
+ *
+ * @since 1.21
+ */
+class MappedIterator implements Iterator {
+ /** @var Iterator */
+ protected $baseIterator;
+ /** @var Closure */
+ protected $vCallback;
+
+ /**
+ * Build an new iterator from a base iterator by having the former wrap the
+ * later, returning the result of "value" callback for each current() invocation.
+ * The callback takes the result of current() on the base iterator as an argument.
+ * The keys of the base iterator are reused verbatim.
+ *
+ * @param Iterator|Array $iter
+ * @param Closure $callback
+ * @throws MWException
+ */
+ public function __construct( $iter, Closure $vCallback ) {
+ if ( is_array( $iter ) ) {
+ $this->baseIterator = new ArrayIterator( $iter );
+ } elseif ( $iter instanceof Iterator ) {
+ $this->baseIterator = $iter;
+ } else {
+ throw new MWException( "Invalid base iterator provided." );
+ }
+ $this->vCallback = $vCallback;
+ }
+
+ /**
+ * @return void
+ */
+ public function rewind() {
+ $this->baseIterator->rewind();
+ }
+
+ /**
+ * @return Mixed|null Returns null if out of range
+ */
+ public function current() {
+ if ( !$this->baseIterator->valid() ) {
+ return null; // out of range
+ }
+ return call_user_func_array( $this->vCallback, array( $this->baseIterator->current() ) );
+ }
+
+ /**
+ * @return Mixed|null Returns null if out of range
+ */
+ public function key() {
+ if ( !$this->baseIterator->valid() ) {
+ return null; // out of range
+ }
+ return $this->baseIterator->key();
+ }
+
+ /**
+ * @return Mixed|null Returns null if out of range
+ */
+ public function next() {
+ $this->baseIterator->next();
+ return $this->current(); // make sure callback is applied
+ }
+
+ /**
+ * @return bool
+ */
+ public function valid() {
+ return $this->baseIterator->valid();
+ }
+}
*/
protected function doFlushCaches() {}
+ /**
+ * Get an iterator to traverse over all of the jobs in this queue.
+ * This does not include jobs that are current acquired. In general,
+ * this should only be called on a queue that is no longer being popped.
+ *
+ * @return Iterator|Traversable|Array
+ * @throws MWException
+ */
+ abstract public function getAllQueuedJobs();
+
/**
* Namespace the queue with a key to isolate it for testing
*
}
}
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllQueuedJobs() {
+ list( $dbr, $scope ) = $this->getSlaveDB();
+ return new MappedIterator(
+ $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
+ function( $row ) use ( $scope ) {
+ $job = Job::factory(
+ $row->job_cmd,
+ Title::makeTitle( $row->job_namespace, $row->job_title ),
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
+ $row->job_id
+ );
+ $job->id = $row->job_id; // XXX: work around broken subclasses
+ return $job;
+ }
+ );
+ }
+
/**
* @return Array (DatabaseBase, ScopedCallback)
*/
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
}
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllQueuedJobs() {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ throw new MWException( "Unable to connect to redis server." );
+ }
+ try {
+ $that = $this;
+ return new MappedIterator(
+ $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
+ function( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ }
+ );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ }
+
+ /**
+ * This function should not be called outside RedisJobQueue
+ *
+ * @param $uid string
+ * @param $conn RedisConnRef
+ * @return Job
+ * @throws MWException
+ */
+ public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
+ try {
+ $fields = $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
+ if ( !is_array( $fields ) ) { // wtf?
+ $conn->delete( $this->prefixWithQueueKey( 'data', $uid ) );
+ throw new MWException( "Could not find job with UID '$uid'." );
+ }
+ $title = Title::makeTitle( $fields['namespace'], $fields['title'] );
+ $job = Job::factory( $fields['type'], $title, $fields['params'] );
+ $job->metadata['sourceFields'] = $fields;
+ return $job;
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ }
+
/**
* Recycle or destroy any jobs that have been claimed for too long
*