[JobQueue] Cleanups for JobQueueRedis.
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 8 Feb 2013 18:52:54 +0000 (10:52 -0800)
committerGerrit Code Review <gerrit@wikimedia.org>
Thu, 21 Feb 2013 05:38:01 +0000 (05:38 +0000)
* Cleaned up 'server' option to not fragment the pool.
  Also made it actually match the documentation.
* Made it use doGetPeriodicTasks() for job recycling.
* Made it so that other job queue classes can be tested.
* Renamed "redisConf" => "redisConfig".
* Tweaked comments about the "random" order option.

Change-Id: I7823d90010e6bc9d581435c3be92830c5ba68480

includes/DefaultSettings.php
includes/job/JobQueue.php
includes/job/JobQueueRedis.php
tests/phpunit/MediaWikiPHPUnitCommand.php
tests/phpunit/includes/jobqueue/JobQueueTest.php

index 1652031..5d07212 100644 (file)
@@ -6213,7 +6213,7 @@ $wgMaxShellTime = 180;
 $wgMaxShellWallClockTime = 180;
 
 /**
- * Under Linux: a cgroup directory used to constrain memory usage of shell 
+ * Under Linux: a cgroup directory used to constrain memory usage of shell
  * commands. The directory must be writable by the user which runs MediaWiki.
  *
  * If specified, this is used instead of ulimit, which is inaccurate, and
@@ -6221,7 +6221,7 @@ $wgMaxShellWallClockTime = 180;
  * them segfault or deadlock.
  *
  * A wrapper script will create a cgroup for each shell command that runs, as
- * a subgroup of the specified cgroup. If the memory limit is exceeded, the 
+ * a subgroup of the specified cgroup. If the memory limit is exceeded, the
  * kernel will send a SIGKILL signal to a process in the subgroup.
  *
  * @par Example:
@@ -6231,7 +6231,7 @@ $wgMaxShellWallClockTime = 180;
  *    echo '$wgShellCgroup = "/sys/fs/cgroup/memory/mediawiki/job";' >> LocalSettings.php
  * @endcode
  *
- * The reliability of cgroup cleanup can be improved by installing a 
+ * The reliability of cgroup cleanup can be improved by installing a
  * notify_on_release script in the root cgroup, see e.g.
  * https://gerrit.wikimedia.org/r/#/c/40784
  */
index acc0c49..67fe180 100644 (file)
@@ -62,6 +62,8 @@ abstract class JobQueue {
         *                by timestamp, allowing for some jobs to be popped off out of order.
         *                If "random" is used, pop() will pick jobs in random order. This might be
         *                useful for improving concurrency depending on the queue storage medium.
+        *                Note that "random" really means "don't care", so it may actually be FIFO
+        *                or only weakly random (e.g. pop() takes one of the first X jobs randomly).
         *   - claimTTL : If supported, the queue will recycle jobs that have been popped
         *                but not acknowledged as completed after this many seconds. Recycling
         *                of jobs simple means re-inserting them into the queue. Jobs can be
@@ -371,4 +373,15 @@ abstract class JobQueue {
         * @return void
         */
        protected function doFlushCaches() {}
+
+       /**
+        * Namespace the queue with a key to isolate it for testing
+        *
+        * @param $key string
+        * @return void
+        * @throws MWException
+        */
+       public function setTestingPrefix( $key ) {
+               throw new MWException( "Queue namespacing not support for this queue type." );
+       }
 }
index 3e7a47c..07a7410 100644 (file)
@@ -36,18 +36,20 @@ class JobQueueRedis extends JobQueue {
        const ROOTJOB_TTL   = 1209600; // integer; seconds to remember root jobs (14 days)
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
 
+       protected $key; // string; key to prefix the queue keys with (used for testing)
+
        /**
         * @params include:
-        *   - redisConf : An array of parameters to RedisConnectionPool::__construct().
-        *   - server    : A hostname/port combination or the absolute path of a UNIX socket.
-        *                 If a hostname is specified but no port, the standard port number
-        *                 6379 will be used. Required.
+        *   - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+        *   - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
+        *                   If a hostname is specified but no port, the standard port number
+        *                   6379 will be used. Required.
         * @param array $params
         */
        public function __construct( array $params ) {
                parent::__construct( $params );
-               $this->server = $params['redisConf']['server'];
-               $this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] );
+               $this->server = $params['redisServer'];
+               $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
        }
 
        /**
@@ -56,10 +58,6 @@ class JobQueueRedis extends JobQueue {
         * @throws MWException
         */
        protected function doIsEmpty() {
-               if ( mt_rand( 0, 99 ) == 0 ) {
-                       $this->doInternalMaintenance();
-               }
-
                $conn = $this->getConnection();
                try {
                        return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
@@ -74,10 +72,6 @@ class JobQueueRedis extends JobQueue {
         * @throws MWException
         */
        protected function doGetSize() {
-               if ( mt_rand( 0, 99 ) == 0 ) {
-                       $this->doInternalMaintenance();
-               }
-
                $conn = $this->getConnection();
                try {
                        return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
@@ -92,17 +86,12 @@ class JobQueueRedis extends JobQueue {
         * @throws MWException
         */
        protected function doGetAcquiredCount() {
-               if ( mt_rand( 0, 99 ) == 0 ) {
-                       $this->doInternalMaintenance();
+               if ( $this->claimTTL <= 0 ) {
+                       return 0; // no acknowledgements
                }
-
                $conn = $this->getConnection();
                try {
-                       if ( $this->claimTTL > 0 ) {
-                               return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
-                       } else {
-                               return 0;
-                       }
+                       return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -190,8 +179,8 @@ class JobQueueRedis extends JobQueue {
        protected function doPop() {
                $job = false;
 
-               if ( mt_rand( 0, 99 ) == 0 ) {
-                       $this->doInternalMaintenance();
+               if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
+                       $this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list
                }
 
                $conn = $this->getConnection();
@@ -340,25 +329,16 @@ class JobQueueRedis extends JobQueue {
                return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
        }
 
-       /**
-        * Do any job recycling or queue cleanup as needed
-        *
-        * @return void
-        * @return integer Number of jobs recycled/deleted
-        * @throws MWException
-        */
-       protected function doInternalMaintenance() {
-               return ( $this->claimTTL > 0 ) ?
-                       $this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
-       }
-
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *
         * @return integer Number of jobs recycled/deleted
         * @throws MWException
         */
-       protected function recycleAndDeleteStaleJobs() {
+       public function recycleAndDeleteStaleJobs() {
+               if ( $this->claimTTL <= 0 ) { // sanity
+                       throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
+               }
                $count = 0;
                // For each job item that can be retried, we need to add it back to the
                // main queue and remove it from the list of currenty claimed job items.
@@ -488,6 +468,22 @@ class JobQueueRedis extends JobQueue {
                return $count;
        }
 
+       /**
+        * @return Array
+        */
+       protected function doGetPeriodicTasks() {
+               if ( $this->claimTTL > 0 ) {
+                       return array(
+                               'recycleAndDeleteStaleJobs' => array(
+                                       'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+                                       'period'   => ceil( $this->claimTTL / 2 )
+                               )
+                       );
+               } else {
+                       return array();
+               }
+       }
+
        /**
         * @param $job Job
         * @return array
@@ -560,7 +556,11 @@ class JobQueueRedis extends JobQueue {
         */
        private function getQueueKey( $prop ) {
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+               if ( strlen( $this->key ) ) { // namespaced queue (for testing)
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
+               } else {
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+               }
        }
 
        /**
@@ -606,4 +606,12 @@ class JobQueueRedis extends JobQueue {
                }
                return $res;
        }
+
+       /**
+        * @param $key string
+        * @return void
+        */
+       public function setTestingPrefix( $key ) {
+               $this->key = $key;
+       }
 }
index c139f0b..12c2e00 100644 (file)
@@ -7,6 +7,7 @@ class MediaWikiPHPUnitCommand extends PHPUnit_TextUI_Command {
                'file=' => false,
                'use-filebackend=' => false,
                'use-bagostuff=' => false,
+               'use-jobqueue=' => false,
                'keep-uploads' => false,
                'use-normal-tables' => false,
                'reuse-db' => false,
index 367a8b3..453cec3 100644 (file)
@@ -6,6 +6,8 @@
  * @group Database
  */
 class JobQueueTest extends MediaWikiTestCase {
+       protected $key;
+       protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
        protected $old = array();
 
        function  __construct( $name = null, array $data = array(), $dataName = '' ) {
@@ -15,18 +17,34 @@ class JobQueueTest extends MediaWikiTestCase {
        }
 
        protected function setUp() {
-               global $wgMemc;
+               global $wgMemc, $wgJobTypeConf;
                parent::setUp();
                $this->old['wgMemc'] = $wgMemc;
                $wgMemc = new HashBagOStuff();
-               $this->queueRand = JobQueue::factory( array( 'class' => 'JobQueueDB',
-                       'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random' ) );
-               $this->queueRandTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
-                       'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random', 'claimTTL' => 10 ) );
-               $this->queueFifo = JobQueue::factory( array( 'class' => 'JobQueueDB',
-                       'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo' ) );
-               $this->queueFifoTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
-                       'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo', 'claimTTL' => 10 ) );
+               if ( $this->getCliArg( 'use-jobqueue=' ) ) {
+                       $name = $this->getCliArg( 'use-jobqueue=' );
+                       if ( !isset( $wgJobTypeConf[$name] ) ) {
+                               throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
+                       }
+                       $baseConfig = $wgJobTypeConf[$name];
+               } else {
+                       $baseConfig = array( 'class' => 'JobQueueDB' );
+               }
+               $baseConfig['type'] = 'null';
+               $baseConfig['wiki'] = wfWikiID();
+               $this->queueRand = JobQueue::factory(
+                       array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig );
+               $this->queueRandTTL = JobQueue::factory(
+                       array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig );
+               $this->queueFifo = JobQueue::factory(
+                       array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig );
+               $this->queueFifoTTL = JobQueue::factory(
+                       array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig );
+               if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables
+                       foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
+                               $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
+                       }
+               }
        }
 
        protected function tearDown() {
@@ -239,7 +257,7 @@ class JobQueueTest extends MediaWikiTestCase {
                        $queue->ack( $job );
                }
 
-               $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
+               $this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
 
                $queue->flushCaches();
                $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );