dépôts
/
lhc
/
web
/
wiklou.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Move most User::clearAllNotifications() logic to WatchedItemStore
[lhc/web/wiklou.git]
/
includes
/
jobqueue
/
JobQueueRedis.php
diff --git
a/includes/jobqueue/JobQueueRedis.php
b/includes/jobqueue/JobQueueRedis.php
index
990248a
..
7dad014
100644
(file)
--- a/
includes/jobqueue/JobQueueRedis.php
+++ b/
includes/jobqueue/JobQueueRedis.php
@@
-18,8
+18,8
@@
* http://www.gnu.org/copyleft/gpl.html
*
* @file
* http://www.gnu.org/copyleft/gpl.html
*
* @file
- * @author Aaron Schulz
*/
*/
+use Psr\Log\LoggerInterface;
/**
* Class to handle job queues stored in Redis
/**
* Class to handle job queues stored in Redis
@@
-66,16
+66,15
@@
class JobQueueRedis extends JobQueue {
/** @var RedisConnectionPool */
protected $redisPool;
class JobQueueRedis extends JobQueue {
/** @var RedisConnectionPool */
protected $redisPool;
+ /** @var LoggerInterface */
+ protected $logger;
/** @var string Server address */
protected $server;
/** @var string Compression method to use */
protected $compression;
/** @var string Server address */
protected $server;
/** @var string Compression method to use */
protected $compression;
- const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
-
- /** @var string Key to prefix the queue keys with (used for testing) */
- protected $key;
+ const MAX_PUSH_SIZE = 25; // avoid tying up the server
/**
* @param array $params Possible keys:
/**
* @param array $params Possible keys:
@@
-101,6
+100,7
@@
class JobQueueRedis extends JobQueue {
"Non-daemonized mode is no longer supported. Please install the " .
"mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
}
"Non-daemonized mode is no longer supported. Please install the " .
"mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
}
+ $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' );
}
protected function supportedOrders() {
}
protected function supportedOrders() {
@@
-213,7
+213,7
@@
class JobQueueRedis extends JobQueue {
if ( $flags & self::QOS_ATOMIC ) {
$batches = [ $items ]; // all or nothing
} else {
if ( $flags & self::QOS_ATOMIC ) {
$batches = [ $items ]; // all or nothing
} else {
- $batches = array_chunk( $items,
100 ); // avoid tying up the server
+ $batches = array_chunk( $items,
self::MAX_PUSH_SIZE );
}
$failed = 0;
$pushed = 0;
}
$failed = 0;
$pushed = 0;
@@
-255,6
+255,7
@@
class JobQueueRedis extends JobQueue {
$args[] = (string)$this->serialize( $item );
}
static $script =
$args[] = (string)$this->serialize( $item );
}
static $script =
+ /** @lang Lua */
<<<LUA
local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
-- First argument is the queue ID
<<<LUA
local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
-- First argument is the queue ID
@@
-344,6
+345,7
@@
LUA;
*/
protected function popAndAcquireBlob( RedisConnRef $conn ) {
static $script =
*/
protected function popAndAcquireBlob( RedisConnRef $conn ) {
static $script =
+ /** @lang Lua */
<<<LUA
local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
local rTime = unpack(ARGV)
<<<LUA
local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
local rTime = unpack(ARGV)
@@
-391,6
+393,7
@@
LUA;
$conn = $this->getConnection();
try {
static $script =
$conn = $this->getConnection();
try {
static $script =
+ /** @lang Lua */
<<<LUA
local kClaimed, kAttempts, kData = unpack(KEYS)
local id = unpack(ARGV)
<<<LUA
local kClaimed, kAttempts, kData = unpack(KEYS)
local id = unpack(ARGV)
@@
-750,7
+753,7
@@
LUA;
* @throws JobQueueConnectionError
*/
protected function getConnection() {
* @throws JobQueueConnectionError
*/
protected function getConnection() {
- $conn = $this->redisPool->getConnection( $this->server );
+ $conn = $this->redisPool->getConnection( $this->server
, $this->logger
);
if ( !$conn ) {
throw new JobQueueConnectionError(
"Unable to connect to redis server {$this->server}." );
if ( !$conn ) {
throw new JobQueueConnectionError(
"Unable to connect to redis server {$this->server}." );
@@
-791,9
+794,9
@@
LUA;
private function getGlobalKey( $name ) {
$parts = [ 'global', 'jobqueue', $name ];
foreach ( $parts as $part ) {
private function getGlobalKey( $name ) {
$parts = [ 'global', 'jobqueue', $name ];
foreach ( $parts as $part ) {
- if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
- throw new InvalidArgumentException( "Key part characters are out of range." );
- }
+
if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
+
throw new InvalidArgumentException( "Key part characters are out of range." );
+
}
}
return implode( ':', $parts );
}
return implode( ':', $parts );