From 9cc7e6a39ab2c5d2beedd8d3ecbcddf41e06c6b6 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Fri, 8 Feb 2013 10:52:54 -0800 Subject: [PATCH] [JobQueue] Cleanups for JobQueueRedis. * Cleaned up 'server' option to not fragment the pool. Also made it actually match the documentation. * Made it use doGetPeriodicTasks() for job recycling. * Made it so that other job queue classes can be tested. * Renamed "redisConf" => "redisConfig". * Tweaked comments about the "random" order option. Change-Id: I7823d90010e6bc9d581435c3be92830c5ba68480 --- includes/DefaultSettings.php | 6 +- includes/job/JobQueue.php | 13 +++ includes/job/JobQueueRedis.php | 84 ++++++++++--------- tests/phpunit/MediaWikiPHPUnitCommand.php | 1 + .../includes/jobqueue/JobQueueTest.php | 38 ++++++--- 5 files changed, 91 insertions(+), 51 deletions(-) diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index 1652031660..5d07212452 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -6213,7 +6213,7 @@ $wgMaxShellTime = 180; $wgMaxShellWallClockTime = 180; /** - * Under Linux: a cgroup directory used to constrain memory usage of shell + * Under Linux: a cgroup directory used to constrain memory usage of shell * commands. The directory must be writable by the user which runs MediaWiki. * * If specified, this is used instead of ulimit, which is inaccurate, and @@ -6221,7 +6221,7 @@ $wgMaxShellWallClockTime = 180; * them segfault or deadlock. * * A wrapper script will create a cgroup for each shell command that runs, as - * a subgroup of the specified cgroup. If the memory limit is exceeded, the + * a subgroup of the specified cgroup. If the memory limit is exceeded, the * kernel will send a SIGKILL signal to a process in the subgroup. * * @par Example: @@ -6231,7 +6231,7 @@ $wgMaxShellWallClockTime = 180; * echo '$wgShellCgroup = "/sys/fs/cgroup/memory/mediawiki/job";' >> LocalSettings.php * @endcode * - * The reliability of cgroup cleanup can be improved by installing a + * The reliability of cgroup cleanup can be improved by installing a * notify_on_release script in the root cgroup, see e.g. * https://gerrit.wikimedia.org/r/#/c/40784 */ diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index acc0c49200..67fe180e01 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -62,6 +62,8 @@ abstract class JobQueue { * by timestamp, allowing for some jobs to be popped off out of order. * If "random" is used, pop() will pick jobs in random order. This might be * useful for improving concurrency depending on the queue storage medium. + * Note that "random" really means "don't care", so it may actually be FIFO + * or only weakly random (e.g. pop() takes one of the first X jobs randomly). * - claimTTL : If supported, the queue will recycle jobs that have been popped * but not acknowledged as completed after this many seconds. Recycling * of jobs simple means re-inserting them into the queue. Jobs can be @@ -371,4 +373,15 @@ abstract class JobQueue { * @return void */ protected function doFlushCaches() {} + + /** + * Namespace the queue with a key to isolate it for testing + * + * @param $key string + * @return void + * @throws MWException + */ + public function setTestingPrefix( $key ) { + throw new MWException( "Queue namespacing not support for this queue type." ); + } } diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 3e7a47c2df..07a7410dda 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -36,18 +36,20 @@ class JobQueueRedis extends JobQueue { const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days) const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) + protected $key; // string; key to prefix the queue keys with (used for testing) + /** * @params include: - * - redisConf : An array of parameters to RedisConnectionPool::__construct(). - * - server : A hostname/port combination or the absolute path of a UNIX socket. - * If a hostname is specified but no port, the standard port number - * 6379 will be used. Required. + * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). + * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, the standard port number + * 6379 will be used. Required. * @param array $params */ public function __construct( array $params ) { parent::__construct( $params ); - $this->server = $params['redisConf']['server']; - $this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] ); + $this->server = $params['redisServer']; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); } /** @@ -56,10 +58,6 @@ class JobQueueRedis extends JobQueue { * @throws MWException */ protected function doIsEmpty() { - if ( mt_rand( 0, 99 ) == 0 ) { - $this->doInternalMaintenance(); - } - $conn = $this->getConnection(); try { return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 ); @@ -74,10 +72,6 @@ class JobQueueRedis extends JobQueue { * @throws MWException */ protected function doGetSize() { - if ( mt_rand( 0, 99 ) == 0 ) { - $this->doInternalMaintenance(); - } - $conn = $this->getConnection(); try { return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); @@ -92,17 +86,12 @@ class JobQueueRedis extends JobQueue { * @throws MWException */ protected function doGetAcquiredCount() { - if ( mt_rand( 0, 99 ) == 0 ) { - $this->doInternalMaintenance(); + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements } - $conn = $this->getConnection(); try { - if ( $this->claimTTL > 0 ) { - return $conn->lSize( $this->getQueueKey( 'l-claimed' ) ); - } else { - return 0; - } + return $conn->lSize( $this->getQueueKey( 'l-claimed' ) ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } @@ -190,8 +179,8 @@ class JobQueueRedis extends JobQueue { protected function doPop() { $job = false; - if ( mt_rand( 0, 99 ) == 0 ) { - $this->doInternalMaintenance(); + if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) { + $this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list } $conn = $this->getConnection(); @@ -340,25 +329,16 @@ class JobQueueRedis extends JobQueue { return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); } - /** - * Do any job recycling or queue cleanup as needed - * - * @return void - * @return integer Number of jobs recycled/deleted - * @throws MWException - */ - protected function doInternalMaintenance() { - return ( $this->claimTTL > 0 ) ? - $this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs(); - } - /** * Recycle or destroy any jobs that have been claimed for too long * * @return integer Number of jobs recycled/deleted * @throws MWException */ - protected function recycleAndDeleteStaleJobs() { + public function recycleAndDeleteStaleJobs() { + if ( $this->claimTTL <= 0 ) { // sanity + throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." ); + } $count = 0; // For each job item that can be retried, we need to add it back to the // main queue and remove it from the list of currenty claimed job items. @@ -488,6 +468,22 @@ class JobQueueRedis extends JobQueue { return $count; } + /** + * @return Array + */ + protected function doGetPeriodicTasks() { + if ( $this->claimTTL > 0 ) { + return array( + 'recycleAndDeleteStaleJobs' => array( + 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), + 'period' => ceil( $this->claimTTL / 2 ) + ) + ); + } else { + return array(); + } + } + /** * @param $job Job * @return array @@ -560,7 +556,11 @@ class JobQueueRedis extends JobQueue { */ private function getQueueKey( $prop ) { list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop ); + if ( strlen( $this->key ) ) { // namespaced queue (for testing) + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop ); + } else { + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop ); + } } /** @@ -606,4 +606,12 @@ class JobQueueRedis extends JobQueue { } return $res; } + + /** + * @param $key string + * @return void + */ + public function setTestingPrefix( $key ) { + $this->key = $key; + } } diff --git a/tests/phpunit/MediaWikiPHPUnitCommand.php b/tests/phpunit/MediaWikiPHPUnitCommand.php index c139f0b166..12c2e0035c 100644 --- a/tests/phpunit/MediaWikiPHPUnitCommand.php +++ b/tests/phpunit/MediaWikiPHPUnitCommand.php @@ -7,6 +7,7 @@ class MediaWikiPHPUnitCommand extends PHPUnit_TextUI_Command { 'file=' => false, 'use-filebackend=' => false, 'use-bagostuff=' => false, + 'use-jobqueue=' => false, 'keep-uploads' => false, 'use-normal-tables' => false, 'reuse-db' => false, diff --git a/tests/phpunit/includes/jobqueue/JobQueueTest.php b/tests/phpunit/includes/jobqueue/JobQueueTest.php index 367a8b35d0..453cec3118 100644 --- a/tests/phpunit/includes/jobqueue/JobQueueTest.php +++ b/tests/phpunit/includes/jobqueue/JobQueueTest.php @@ -6,6 +6,8 @@ * @group Database */ class JobQueueTest extends MediaWikiTestCase { + protected $key; + protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL; protected $old = array(); function __construct( $name = null, array $data = array(), $dataName = '' ) { @@ -15,18 +17,34 @@ class JobQueueTest extends MediaWikiTestCase { } protected function setUp() { - global $wgMemc; + global $wgMemc, $wgJobTypeConf; parent::setUp(); $this->old['wgMemc'] = $wgMemc; $wgMemc = new HashBagOStuff(); - $this->queueRand = JobQueue::factory( array( 'class' => 'JobQueueDB', - 'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random' ) ); - $this->queueRandTTL = JobQueue::factory( array( 'class' => 'JobQueueDB', - 'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random', 'claimTTL' => 10 ) ); - $this->queueFifo = JobQueue::factory( array( 'class' => 'JobQueueDB', - 'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo' ) ); - $this->queueFifoTTL = JobQueue::factory( array( 'class' => 'JobQueueDB', - 'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo', 'claimTTL' => 10 ) ); + if ( $this->getCliArg( 'use-jobqueue=' ) ) { + $name = $this->getCliArg( 'use-jobqueue=' ); + if ( !isset( $wgJobTypeConf[$name] ) ) { + throw new MWException( "No \$wgJobTypeConf entry for '$name'." ); + } + $baseConfig = $wgJobTypeConf[$name]; + } else { + $baseConfig = array( 'class' => 'JobQueueDB' ); + } + $baseConfig['type'] = 'null'; + $baseConfig['wiki'] = wfWikiID(); + $this->queueRand = JobQueue::factory( + array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig ); + $this->queueRandTTL = JobQueue::factory( + array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig ); + $this->queueFifo = JobQueue::factory( + array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig ); + $this->queueFifoTTL = JobQueue::factory( + array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig ); + if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables + foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) { + $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) ); + } + } } protected function tearDown() { @@ -239,7 +257,7 @@ class JobQueueTest extends MediaWikiTestCase { $queue->ack( $job ); } - $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); + $this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" ); $queue->flushCaches(); $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); -- 2.20.1