3 use MediaWiki\MediaWikiServices
;
10 class JobQueueTest
extends MediaWikiTestCase
{
12 protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
14 function __construct( $name = null, array $data = [], $dataName = '' ) {
15 parent
::__construct( $name, $data, $dataName );
17 $this->tablesUsed
[] = 'job';
20 protected function setUp() {
21 global $wgJobTypeConf;
24 if ( $this->getCliArg( 'use-jobqueue' ) ) {
25 $name = $this->getCliArg( 'use-jobqueue' );
26 if ( !isset( $wgJobTypeConf[$name] ) ) {
27 throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
29 $baseConfig = $wgJobTypeConf[$name];
31 $baseConfig = [ 'class' => JobQueueDBSingle
::class ];
33 $baseConfig['type'] = 'null';
34 $baseConfig['domain'] = WikiMap
::getCurrentWikiDbDomain()->getId();
35 $baseConfig['stash'] = new HashBagOStuff();
36 $baseConfig['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] );
38 'queueRand' => [ 'order' => 'random', 'claimTTL' => 0 ],
39 'queueRandTTL' => [ 'order' => 'random', 'claimTTL' => 10 ],
40 'queueTimestamp' => [ 'order' => 'timestamp', 'claimTTL' => 0 ],
41 'queueTimestampTTL' => [ 'order' => 'timestamp', 'claimTTL' => 10 ],
42 'queueFifo' => [ 'order' => 'fifo', 'claimTTL' => 0 ],
43 'queueFifoTTL' => [ 'order' => 'fifo', 'claimTTL' => 10 ],
45 foreach ( $variants as $q => $settings ) {
47 $this->$q = JobQueue
::factory( $settings +
$baseConfig );
48 } catch ( MWException
$e ) {
50 // @todo What if it was another error?
55 protected function tearDown() {
59 'queueRand', 'queueRandTTL', 'queueTimestamp', 'queueTimestampTTL',
60 'queueFifo', 'queueFifoTTL'
71 * @dataProvider provider_queueLists
72 * @covers JobQueue::getWiki
74 public function testGetWiki( $queue, $recycles, $desc ) {
75 $queue = $this->$queue;
77 $this->markTestSkipped( $desc );
79 $this->assertEquals( wfWikiID(), $queue->getWiki(), "Proper wiki ID ($desc)" );
81 WikiMap
::getCurrentWikiDbDomain()->getId(),
83 "Proper wiki ID ($desc)" );
87 * @dataProvider provider_queueLists
88 * @covers JobQueue::getType
90 public function testGetType( $queue, $recycles, $desc ) {
91 $queue = $this->$queue;
93 $this->markTestSkipped( $desc );
95 $this->assertEquals( 'null', $queue->getType(), "Proper job type ($desc)" );
99 * @dataProvider provider_queueLists
102 public function testBasicOperations( $queue, $recycles, $desc ) {
103 $queue = $this->$queue;
105 $this->markTestSkipped( $desc );
108 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
110 $queue->flushCaches();
111 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
112 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
114 $this->assertNull( $queue->push( $this->newJob() ), "Push worked ($desc)" );
115 $this->assertNull( $queue->batchPush( [ $this->newJob() ] ), "Push worked ($desc)" );
117 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
119 $queue->flushCaches();
120 $this->assertEquals( 2, $queue->getSize(), "Queue size is correct ($desc)" );
121 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
122 $jobs = iterator_to_array( $queue->getAllQueuedJobs() );
123 $this->assertEquals( 2, count( $jobs ), "Queue iterator size is correct ($desc)" );
125 $job1 = $queue->pop();
126 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
128 $queue->flushCaches();
129 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
131 $queue->flushCaches();
133 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
136 $job2 = $queue->pop();
137 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
138 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
140 $queue->flushCaches();
142 $this->assertEquals( 2, $queue->getAcquiredCount(), "Active job count ($desc)" );
145 $queue->ack( $job1 );
147 $queue->flushCaches();
149 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
152 $queue->ack( $job2 );
154 $queue->flushCaches();
155 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
157 $this->assertNull( $queue->batchPush( [ $this->newJob(), $this->newJob() ] ),
158 "Push worked ($desc)" );
159 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
162 $queue->flushCaches();
163 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
164 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
168 * @dataProvider provider_queueLists
171 public function testBasicDeduplication( $queue, $recycles, $desc ) {
172 $queue = $this->$queue;
174 $this->markTestSkipped( $desc );
177 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
179 $queue->flushCaches();
180 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
181 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
185 [ $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ]
187 "Push worked ($desc)" );
189 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
191 $queue->flushCaches();
192 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
193 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
197 [ $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ]
199 "Push worked ($desc)"
202 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
204 $queue->flushCaches();
205 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
206 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
208 $job1 = $queue->pop();
209 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
211 $queue->flushCaches();
212 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
214 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
217 $queue->ack( $job1 );
219 $queue->flushCaches();
220 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
224 * @dataProvider provider_queueLists
227 public function testDeduplicationWhileClaimed( $queue, $recycles, $desc ) {
228 $queue = $this->$queue;
230 $this->markTestSkipped( $desc );
233 $job = $this->newDedupedJob();
234 $queue->push( $job );
236 // De-duplication does not apply to already-claimed jobs
238 $queue->push( $job );
242 // Make sure ack() of the twin did not delete the sibling data
243 $this->assertType( NullJob
::class, $j );
247 * @dataProvider provider_queueLists
250 public function testRootDeduplication( $queue, $recycles, $desc ) {
251 $queue = $this->$queue;
253 $this->markTestSkipped( $desc );
256 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
258 $queue->flushCaches();
259 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
260 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
262 $id = wfRandomString( 32 );
263 $root1 = Job
::newRootJobParams( "nulljobspam:$id" ); // task ID/timestamp
264 for ( $i = 0; $i < 5; ++
$i ) {
265 $this->assertNull( $queue->push( $this->newJob( 0, $root1 ) ), "Push worked ($desc)" );
267 $queue->deduplicateRootJob( $this->newJob( 0, $root1 ) );
270 # Add a second to UNIX epoch and format back to TS_MW
271 $root2_ts = strtotime( $root2['rootJobTimestamp'] );
273 $root2['rootJobTimestamp'] = wfTimestamp( TS_MW
, $root2_ts );
275 $this->assertNotEquals( $root1['rootJobTimestamp'], $root2['rootJobTimestamp'],
276 "Root job signatures have different timestamps." );
277 for ( $i = 0; $i < 5; ++
$i ) {
278 $this->assertNull( $queue->push( $this->newJob( 0, $root2 ) ), "Push worked ($desc)" );
280 $queue->deduplicateRootJob( $this->newJob( 0, $root2 ) );
282 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
284 $queue->flushCaches();
285 $this->assertEquals( 10, $queue->getSize(), "Queue size is correct ($desc)" );
286 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
291 $job = $queue->pop();
296 if ( $job instanceof DuplicateJob
) {
301 $this->assertEquals( 10, count( $jobs ), "Correct number of jobs popped ($desc)" );
302 $this->assertEquals( 5, $dupcount, "Correct number of duplicate jobs popped ($desc)" );
306 * @dataProvider provider_fifoQueueLists
309 public function testJobOrder( $queue, $recycles, $desc ) {
310 $queue = $this->$queue;
312 $this->markTestSkipped( $desc );
315 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
317 $queue->flushCaches();
318 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
319 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
321 for ( $i = 0; $i < 10; ++
$i ) {
322 $this->assertNull( $queue->push( $this->newJob( $i ) ), "Push worked ($desc)" );
325 for ( $i = 0; $i < 10; ++
$i ) {
326 $job = $queue->pop();
327 $this->assertTrue( $job instanceof Job
, "Jobs popped from queue ($desc)" );
328 $params = $job->getParams();
329 $this->assertEquals( $i, $params['i'], "Job popped from queue is FIFO ($desc)" );
333 $this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
335 $queue->flushCaches();
336 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
337 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
343 public function testQueueAggregateTable() {
344 $queue = $this->queueFifo
;
345 if ( !$queue ||
!method_exists( $queue, 'getServerQueuesWithJobs' ) ) {
346 $this->markTestSkipped();
349 $this->assertNotContains(
350 [ $queue->getType(), $queue->getWiki() ],
351 $queue->getServerQueuesWithJobs(),
352 "Null queue not in listing"
355 $queue->push( $this->newJob( 0 ) );
357 $this->assertContains(
358 [ $queue->getType(), $queue->getWiki() ],
359 $queue->getServerQueuesWithJobs(),
360 "Null queue in listing"
364 public static function provider_queueLists() {
366 [ 'queueRand', false, 'Random queue without ack()' ],
367 [ 'queueRandTTL', true, 'Random queue with ack()' ],
368 [ 'queueTimestamp', false, 'Time ordered queue without ack()' ],
369 [ 'queueTimestampTTL', true, 'Time ordered queue with ack()' ],
370 [ 'queueFifo', false, 'FIFO ordered queue without ack()' ],
371 [ 'queueFifoTTL', true, 'FIFO ordered queue with ack()' ]
375 public static function provider_fifoQueueLists() {
377 [ 'queueFifo', false, 'Ordered queue without ack()' ],
378 [ 'queueFifoTTL', true, 'Ordered queue with ack()' ]
382 function newJob( $i = 0, $rootJob = [] ) {
383 return new NullJob( Title
::newMainPage(),
384 [ 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 0, 'i' => $i ] +
$rootJob );
387 function newDedupedJob( $i = 0, $rootJob = [] ) {
388 return new NullJob( Title
::newMainPage(),
389 [ 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 1, 'i' => $i ] +
$rootJob );
393 class JobQueueDBSingle
extends JobQueueDB
{
394 protected function getDB( $index ) {
395 $lb = MediaWikiServices
::getInstance()->getDBLoadBalancer();
396 // Override to not use CONN_TRX_AUTOCOMMIT so that we see the same temporary `job` table
397 return $lb->getConnection( $index, [], $this->domain
);