[JobQueue] Added a function to delete all jobs from a queue.
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 15 May 2013 02:29:22 +0000 (19:29 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Wed, 15 May 2013 17:59:18 +0000 (10:59 -0700)
Change-Id: Ibad122148bdd2f7baf528929e15bae803fccfeea

includes/job/JobQueue.php
includes/job/JobQueueDB.php
includes/job/JobQueueFederated.php
includes/job/JobQueueRedis.php
tests/phpunit/includes/jobqueue/JobQueueTest.php

index 08d4f39..3295c24 100644 (file)
@@ -507,6 +507,28 @@ abstract class JobQueue {
                return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
        }
 
+       /**
+        * Deleted all unclaimed and delayed jobs from the queue
+        *
+        * @return bool Success
+        * @throws MWException
+        * @since 1.22
+        */
+       final public function delete() {
+               wfProfileIn( __METHOD__ );
+               $res = $this->doDelete();
+               wfProfileOut( __METHOD__ );
+               return $res;
+       }
+
+       /**
+        * @see JobQueue::delete()
+        * @return bool Success
+        */
+       protected function doDelete() {
+               throw new MWException( "This method is not implemented." );
+       }
+
        /**
         * Wait for any slaves or backup servers to catch up.
         *
index 55872d1..0e68355 100644 (file)
@@ -509,6 +509,17 @@ class JobQueueDB extends JobQueue {
                return true;
        }
 
+       /**
+        * @see JobQueue::doDelete()
+        * @return bool
+        */
+       protected function doDelete() {
+               list( $dbw, $scope ) = $this->getMasterDB();
+
+               $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
+               return true;
+       }
+
        /**
         * @see JobQueue::doWaitForBackups()
         * @return void
index b517d55..db5b686 100644 (file)
@@ -298,6 +298,12 @@ class JobQueueFederated extends JobQueue {
                return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
        }
 
+       protected function doDelete() {
+               foreach ( $this->partitionQueues as $queue ) {
+                       $queue->doDelete();
+               }
+       }
+
        protected function doWaitForBackups() {
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->waitForBackups();
index 249ba27..8250d2b 100644 (file)
@@ -484,6 +484,26 @@ LUA;
                return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
        }
 
+       /**
+        * @see JobQueue::doDelete()
+        * @return bool
+        */
+       protected function doDelete() {
+               static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
+                       'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' );
+
+               $conn = $this->getConnection();
+               try {
+                       $keys = array();
+                       foreach ( $props as $prop ) {
+                               $keys[] = $this->getQueueKey( $prop );
+                       }
+                       $res = ( $conn->delete( $keys ) !== false );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
        /**
         * @see JobQueue::getAllQueuedJobs()
         * @return Iterator
index 7b3d0ea..6990153 100644 (file)
@@ -62,12 +62,7 @@ class JobQueueTest extends MediaWikiTestCase {
                        ) as $q
                ) {
                        if ( $this->$q ) {
-                               do {
-                                       $job = $this->$q->pop();
-                                       if ( $job ) {
-                                               $this->$q->ack( $job );
-                                       }
-                               } while ( $job );
+                               $this->$q->delete();
                        }
                        $this->$q = null;
                }
@@ -149,6 +144,15 @@ class JobQueueTest extends MediaWikiTestCase {
 
                $queue->flushCaches();
                $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
+
+               $this->assertTrue( $queue->batchPush( array( $this->newJob(), $this->newJob() ) ),
+                       "Push worked ($desc)" );
+               $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
+
+               $queue->delete();
+               $queue->flushCaches();
+               $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
+               $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
        }
 
        /**