* Cleaned up 'server' option to not fragment the pool.
Also made it actually match the documentation.
* Made it use doGetPeriodicTasks() for job recycling.
* Made it so that other job queue classes can be tested.
* Renamed "redisConf" => "redisConfig".
* Tweaked comments about the "random" order option.
Change-Id: I7823d90010e6bc9d581435c3be92830c5ba68480
$wgMaxShellWallClockTime = 180;
/**
- * Under Linux: a cgroup directory used to constrain memory usage of shell
+ * Under Linux: a cgroup directory used to constrain memory usage of shell
* commands. The directory must be writable by the user which runs MediaWiki.
*
* If specified, this is used instead of ulimit, which is inaccurate, and
* them segfault or deadlock.
*
* A wrapper script will create a cgroup for each shell command that runs, as
- * a subgroup of the specified cgroup. If the memory limit is exceeded, the
+ * a subgroup of the specified cgroup. If the memory limit is exceeded, the
* kernel will send a SIGKILL signal to a process in the subgroup.
*
* @par Example:
* echo '$wgShellCgroup = "/sys/fs/cgroup/memory/mediawiki/job";' >> LocalSettings.php
* @endcode
*
- * The reliability of cgroup cleanup can be improved by installing a
+ * The reliability of cgroup cleanup can be improved by installing a
* notify_on_release script in the root cgroup, see e.g.
* https://gerrit.wikimedia.org/r/#/c/40784
*/
* by timestamp, allowing for some jobs to be popped off out of order.
* If "random" is used, pop() will pick jobs in random order. This might be
* useful for improving concurrency depending on the queue storage medium.
+ * Note that "random" really means "don't care", so it may actually be FIFO
+ * or only weakly random (e.g. pop() takes one of the first X jobs randomly).
* - claimTTL : If supported, the queue will recycle jobs that have been popped
* but not acknowledged as completed after this many seconds. Recycling
* of jobs simple means re-inserting them into the queue. Jobs can be
* @return void
*/
protected function doFlushCaches() {}
+
+ /**
+ * Namespace the queue with a key to isolate it for testing
+ *
+ * @param $key string
+ * @return void
+ * @throws MWException
+ */
+ public function setTestingPrefix( $key ) {
+ throw new MWException( "Queue namespacing not support for this queue type." );
+ }
}
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
+ protected $key; // string; key to prefix the queue keys with (used for testing)
+
/**
* @params include:
- * - redisConf : An array of parameters to RedisConnectionPool::__construct().
- * - server : A hostname/port combination or the absolute path of a UNIX socket.
- * If a hostname is specified but no port, the standard port number
- * 6379 will be used. Required.
+ * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+ * - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
+ * If a hostname is specified but no port, the standard port number
+ * 6379 will be used. Required.
* @param array $params
*/
public function __construct( array $params ) {
parent::__construct( $params );
- $this->server = $params['redisConf']['server'];
- $this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] );
+ $this->server = $params['redisServer'];
+ $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
}
/**
* @throws MWException
*/
protected function doIsEmpty() {
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->doInternalMaintenance();
- }
-
$conn = $this->getConnection();
try {
return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
* @throws MWException
*/
protected function doGetSize() {
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->doInternalMaintenance();
- }
-
$conn = $this->getConnection();
try {
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
* @throws MWException
*/
protected function doGetAcquiredCount() {
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->doInternalMaintenance();
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
}
-
$conn = $this->getConnection();
try {
- if ( $this->claimTTL > 0 ) {
- return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
- } else {
- return 0;
- }
+ return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
protected function doPop() {
$job = false;
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->doInternalMaintenance();
+ if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
+ $this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list
}
$conn = $this->getConnection();
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
}
- /**
- * Do any job recycling or queue cleanup as needed
- *
- * @return void
- * @return integer Number of jobs recycled/deleted
- * @throws MWException
- */
- protected function doInternalMaintenance() {
- return ( $this->claimTTL > 0 ) ?
- $this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
- }
-
/**
* Recycle or destroy any jobs that have been claimed for too long
*
* @return integer Number of jobs recycled/deleted
* @throws MWException
*/
- protected function recycleAndDeleteStaleJobs() {
+ public function recycleAndDeleteStaleJobs() {
+ if ( $this->claimTTL <= 0 ) { // sanity
+ throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
+ }
$count = 0;
// For each job item that can be retried, we need to add it back to the
// main queue and remove it from the list of currenty claimed job items.
return $count;
}
+ /**
+ * @return Array
+ */
+ protected function doGetPeriodicTasks() {
+ if ( $this->claimTTL > 0 ) {
+ return array(
+ 'recycleAndDeleteStaleJobs' => array(
+ 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+ 'period' => ceil( $this->claimTTL / 2 )
+ )
+ );
+ } else {
+ return array();
+ }
+ }
+
/**
* @param $job Job
* @return array
*/
private function getQueueKey( $prop ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+ if ( strlen( $this->key ) ) { // namespaced queue (for testing)
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
+ } else {
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+ }
}
/**
}
return $res;
}
+
+ /**
+ * @param $key string
+ * @return void
+ */
+ public function setTestingPrefix( $key ) {
+ $this->key = $key;
+ }
}
'file=' => false,
'use-filebackend=' => false,
'use-bagostuff=' => false,
+ 'use-jobqueue=' => false,
'keep-uploads' => false,
'use-normal-tables' => false,
'reuse-db' => false,
* @group Database
*/
class JobQueueTest extends MediaWikiTestCase {
+ protected $key;
+ protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
protected $old = array();
function __construct( $name = null, array $data = array(), $dataName = '' ) {
}
protected function setUp() {
- global $wgMemc;
+ global $wgMemc, $wgJobTypeConf;
parent::setUp();
$this->old['wgMemc'] = $wgMemc;
$wgMemc = new HashBagOStuff();
- $this->queueRand = JobQueue::factory( array( 'class' => 'JobQueueDB',
- 'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random' ) );
- $this->queueRandTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
- 'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random', 'claimTTL' => 10 ) );
- $this->queueFifo = JobQueue::factory( array( 'class' => 'JobQueueDB',
- 'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo' ) );
- $this->queueFifoTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
- 'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo', 'claimTTL' => 10 ) );
+ if ( $this->getCliArg( 'use-jobqueue=' ) ) {
+ $name = $this->getCliArg( 'use-jobqueue=' );
+ if ( !isset( $wgJobTypeConf[$name] ) ) {
+ throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
+ }
+ $baseConfig = $wgJobTypeConf[$name];
+ } else {
+ $baseConfig = array( 'class' => 'JobQueueDB' );
+ }
+ $baseConfig['type'] = 'null';
+ $baseConfig['wiki'] = wfWikiID();
+ $this->queueRand = JobQueue::factory(
+ array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig );
+ $this->queueRandTTL = JobQueue::factory(
+ array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig );
+ $this->queueFifo = JobQueue::factory(
+ array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig );
+ $this->queueFifoTTL = JobQueue::factory(
+ array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig );
+ if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables
+ foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
+ $this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
+ }
+ }
}
protected function tearDown() {
$queue->ack( $job );
}
- $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
+ $this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
$queue->flushCaches();
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );