From 266e46a5cace071d065e2a3243e489a000d5b370 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Tue, 14 May 2013 19:29:22 -0700 Subject: [PATCH] [JobQueue] Added a function to delete all jobs from a queue. Change-Id: Ibad122148bdd2f7baf528929e15bae803fccfeea --- includes/job/JobQueue.php | 22 +++++++++++++++++++ includes/job/JobQueueDB.php | 11 ++++++++++ includes/job/JobQueueFederated.php | 6 +++++ includes/job/JobQueueRedis.php | 20 +++++++++++++++++ .../includes/jobqueue/JobQueueTest.php | 16 +++++++++----- 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 08d4f3927c..3295c24917 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -507,6 +507,28 @@ abstract class JobQueue { return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); } + /** + * Deleted all unclaimed and delayed jobs from the queue + * + * @return bool Success + * @throws MWException + * @since 1.22 + */ + final public function delete() { + wfProfileIn( __METHOD__ ); + $res = $this->doDelete(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::delete() + * @return bool Success + */ + protected function doDelete() { + throw new MWException( "This method is not implemented." ); + } + /** * Wait for any slaves or backup servers to catch up. * diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 55872d1245..0e6835528a 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -509,6 +509,17 @@ class JobQueueDB extends JobQueue { return true; } + /** + * @see JobQueue::doDelete() + * @return bool + */ + protected function doDelete() { + list( $dbw, $scope ) = $this->getMasterDB(); + + $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); + return true; + } + /** * @see JobQueue::doWaitForBackups() * @return void diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index b517d559b4..db5b6862ad 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -298,6 +298,12 @@ class JobQueueFederated extends JobQueue { return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); } + protected function doDelete() { + foreach ( $this->partitionQueues as $queue ) { + $queue->doDelete(); + } + } + protected function doWaitForBackups() { foreach ( $this->partitionQueues as $queue ) { $queue->waitForBackups(); diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 249ba27724..8250d2b09b 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -484,6 +484,26 @@ LUA; return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); } + /** + * @see JobQueue::doDelete() + * @return bool + */ + protected function doDelete() { + static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', + 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); + + $conn = $this->getConnection(); + try { + $keys = array(); + foreach ( $props as $prop ) { + $keys[] = $this->getQueueKey( $prop ); + } + $res = ( $conn->delete( $keys ) !== false ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + /** * @see JobQueue::getAllQueuedJobs() * @return Iterator diff --git a/tests/phpunit/includes/jobqueue/JobQueueTest.php b/tests/phpunit/includes/jobqueue/JobQueueTest.php index 7b3d0eadc4..699015314f 100644 --- a/tests/phpunit/includes/jobqueue/JobQueueTest.php +++ b/tests/phpunit/includes/jobqueue/JobQueueTest.php @@ -62,12 +62,7 @@ class JobQueueTest extends MediaWikiTestCase { ) as $q ) { if ( $this->$q ) { - do { - $job = $this->$q->pop(); - if ( $job ) { - $this->$q->ack( $job ); - } - } while ( $job ); + $this->$q->delete(); } $this->$q = null; } @@ -149,6 +144,15 @@ class JobQueueTest extends MediaWikiTestCase { $queue->flushCaches(); $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" ); + + $this->assertTrue( $queue->batchPush( array( $this->newJob(), $this->newJob() ) ), + "Push worked ($desc)" ); + $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); + + $queue->delete(); + $queue->flushCaches(); + $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" ); + $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); } /** -- 2.20.1