[JobQueue] Added JobQueue::getAllQueuedJobs() function.
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 21 Feb 2013 01:19:38 +0000 (17:19 -0800)
committerGerrit Code Review <gerrit@wikimedia.org>
Fri, 8 Mar 2013 06:34:31 +0000 (06:34 +0000)
* Also added a small MappedIterator class for convenience.

Change-Id: I7dac1001a8e048cb524f7fbfee50a20a32e598ba

includes/AutoLoader.php
includes/MappedIterator.php [new file with mode: 0644]
includes/job/JobQueue.php
includes/job/JobQueueDB.php
includes/job/JobQueueRedis.php

index 789538e..3b5fbac 100644 (file)
@@ -168,6 +168,7 @@ $wgAutoloadLocalClasses = array(
        'MagicWord' => 'includes/MagicWord.php',
        'MagicWordArray' => 'includes/MagicWord.php',
        'MailAddress' => 'includes/UserMailer.php',
+       'MappedIterator' => 'includes/MappedIterator.php',
        'MediaWiki' => 'includes/Wiki.php',
        'MediaWiki_I18N' => 'includes/SkinTemplate.php',
        'Message' => 'includes/Message.php',
diff --git a/includes/MappedIterator.php b/includes/MappedIterator.php
new file mode 100644 (file)
index 0000000..243557e
--- /dev/null
@@ -0,0 +1,97 @@
+<?php
+/**
+ * Convenience class for generating iterators from iterators.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Convenience class for generating iterators from iterators.
+ *
+ * @since 1.21
+ */
+class MappedIterator implements Iterator {
+       /** @var Iterator */
+       protected $baseIterator;
+       /** @var Closure */
+       protected $vCallback;
+
+       /**
+        * Build an new iterator from a base iterator by having the former wrap the
+        * later, returning the result of "value" callback for each current() invocation.
+        * The callback takes the result of current() on the base iterator as an argument.
+        * The keys of the base iterator are reused verbatim.
+        *
+        * @param Iterator|Array $iter
+        * @param Closure $callback
+        * @throws MWException
+        */
+    public function __construct( $iter, Closure $vCallback ) {
+               if ( is_array( $iter ) ) {
+                       $this->baseIterator = new ArrayIterator( $iter );
+               } elseif ( $iter instanceof Iterator ) {
+                       $this->baseIterator = $iter;
+               } else {
+                       throw new MWException( "Invalid base iterator provided." );
+               }
+               $this->vCallback = $vCallback;
+    }
+
+       /**
+        * @return void
+        */
+       public function rewind() {
+               $this->baseIterator->rewind();
+       }
+
+       /**
+        * @return Mixed|null Returns null if out of range
+        */
+       public function current() {
+               if ( !$this->baseIterator->valid() ) {
+                       return null; // out of range
+               }
+               return call_user_func_array( $this->vCallback, array( $this->baseIterator->current() ) );
+       }
+
+       /**
+        * @return Mixed|null Returns null if out of range
+        */
+       public function key() {
+               if ( !$this->baseIterator->valid() ) {
+                       return null; // out of range
+               }
+               return $this->baseIterator->key();
+       }
+
+       /**
+        * @return Mixed|null Returns null if out of range
+        */
+       public function next() {
+               $this->baseIterator->next();
+               return $this->current(); // make sure callback is applied
+       }
+
+       /**
+        * @return bool
+        */
+       public function valid() {
+               return $this->baseIterator->valid();
+       }
+}
index b7bbfe6..8ade0d5 100644 (file)
@@ -378,6 +378,16 @@ abstract class JobQueue {
         */
        protected function doFlushCaches() {}
 
+       /**
+        * Get an iterator to traverse over all of the jobs in this queue.
+        * This does not include jobs that are current acquired. In general,
+        * this should only be called on a queue that is no longer being popped.
+        *
+        * @return Iterator|Traversable|Array
+        * @throws MWException
+        */
+       abstract public function getAllQueuedJobs();
+
        /**
         * Namespace the queue with a key to isolate it for testing
         *
index fee1aaf..ada0ac4 100644 (file)
@@ -580,6 +580,27 @@ class JobQueueDB extends JobQueue {
                }
        }
 
+       /**
+        * @see JobQueue::getAllQueuedJobs()
+        * @return Iterator
+        */
+       public function getAllQueuedJobs() {
+               list( $dbr, $scope ) = $this->getSlaveDB();
+               return new MappedIterator(
+                       $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
+                       function( $row ) use ( $scope ) {
+                               $job = Job::factory(
+                                       $row->job_cmd,
+                                       Title::makeTitle( $row->job_namespace, $row->job_title ),
+                                       strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
+                                       $row->job_id
+                               );
+                               $job->id = $row->job_id; // XXX: work around broken subclasses
+                               return $job;
+                       }
+               );
+       }
+
        /**
         * @return Array (DatabaseBase, ScopedCallback)
         */
index a6b5bb3..7dc9900 100644 (file)
@@ -329,6 +329,52 @@ class JobQueueRedis extends JobQueue {
                return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
        }
 
+       /**
+        * @see JobQueue::getAllQueuedJobs()
+        * @return Iterator
+        */
+       public function getAllQueuedJobs() {
+               $conn = $this->getConnection();
+               if ( !$conn ) {
+                       throw new MWException( "Unable to connect to redis server." );
+               }
+               try {
+                       $that = $this;
+                       return new MappedIterator(
+                               $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
+                               function( $uid ) use ( $that, $conn ) {
+                                       return $that->getJobFromUidInternal( $uid, $conn );
+                               }
+                       );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
+       /**
+        * This function should not be called outside RedisJobQueue
+        *
+        * @param $uid string
+        * @param $conn RedisConnRef
+        * @return Job
+        * @throws MWException
+        */
+       public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
+               try {
+                       $fields = $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
+                       if ( !is_array( $fields ) ) { // wtf?
+                               $conn->delete( $this->prefixWithQueueKey( 'data', $uid ) );
+                               throw new MWException( "Could not find job with UID '$uid'." );
+                       }
+                       $title = Title::makeTitle( $fields['namespace'], $fields['title'] );
+                       $job = Job::factory( $fields['type'], $title, $fields['params'] );
+                       $job->metadata['sourceFields'] = $fields;
+                       return $job;
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *