return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
}
+ /**
+ * Deleted all unclaimed and delayed jobs from the queue
+ *
+ * @return bool Success
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function delete() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doDelete();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::delete()
+ * @return bool Success
+ */
+ protected function doDelete() {
+ throw new MWException( "This method is not implemented." );
+ }
+
/**
* Wait for any slaves or backup servers to catch up.
*
return true;
}
+ /**
+ * @see JobQueue::doDelete()
+ * @return bool
+ */
+ protected function doDelete() {
+ list( $dbw, $scope ) = $this->getMasterDB();
+
+ $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
+ return true;
+ }
+
/**
* @see JobQueue::doWaitForBackups()
* @return void
return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
}
+ protected function doDelete() {
+ foreach ( $this->partitionQueues as $queue ) {
+ $queue->doDelete();
+ }
+ }
+
protected function doWaitForBackups() {
foreach ( $this->partitionQueues as $queue ) {
$queue->waitForBackups();
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
}
+ /**
+ * @see JobQueue::doDelete()
+ * @return bool
+ */
+ protected function doDelete() {
+ static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
+ 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' );
+
+ $conn = $this->getConnection();
+ try {
+ $keys = array();
+ foreach ( $props as $prop ) {
+ $keys[] = $this->getQueueKey( $prop );
+ }
+ $res = ( $conn->delete( $keys ) !== false );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $this->server, $conn, $e );
+ }
+ }
+
/**
* @see JobQueue::getAllQueuedJobs()
* @return Iterator
) as $q
) {
if ( $this->$q ) {
- do {
- $job = $this->$q->pop();
- if ( $job ) {
- $this->$q->ack( $job );
- }
- } while ( $job );
+ $this->$q->delete();
}
$this->$q = null;
}
$queue->flushCaches();
$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
+
+ $this->assertTrue( $queue->batchPush( array( $this->newJob(), $this->newJob() ) ),
+ "Push worked ($desc)" );
+ $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
+
+ $queue->delete();
+ $queue->flushCaches();
+ $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
+ $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
}
/**