Move most User::clearAllNotifications() logic to WatchedItemStore
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueRedis.php
index 990248a..7dad014 100644 (file)
@@ -18,8 +18,8 @@
  * http://www.gnu.org/copyleft/gpl.html
  *
  * @file
- * @author Aaron Schulz
  */
+use Psr\Log\LoggerInterface;
 
 /**
  * Class to handle job queues stored in Redis
 class JobQueueRedis extends JobQueue {
        /** @var RedisConnectionPool */
        protected $redisPool;
+       /** @var LoggerInterface */
+       protected $logger;
 
        /** @var string Server address */
        protected $server;
        /** @var string Compression method to use */
        protected $compression;
 
-       const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
-
-       /** @var string Key to prefix the queue keys with (used for testing) */
-       protected $key;
+       const MAX_PUSH_SIZE = 25; // avoid tying up the server
 
        /**
         * @param array $params Possible keys:
@@ -101,6 +100,7 @@ class JobQueueRedis extends JobQueue {
                                "Non-daemonized mode is no longer supported. Please install the " .
                                "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
                }
+               $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' );
        }
 
        protected function supportedOrders() {
@@ -213,7 +213,7 @@ class JobQueueRedis extends JobQueue {
                        if ( $flags & self::QOS_ATOMIC ) {
                                $batches = [ $items ]; // all or nothing
                        } else {
-                               $batches = array_chunk( $items, 100 ); // avoid tying up the server
+                               $batches = array_chunk( $items, self::MAX_PUSH_SIZE );
                        }
                        $failed = 0;
                        $pushed = 0;
@@ -255,6 +255,7 @@ class JobQueueRedis extends JobQueue {
                        $args[] = (string)$this->serialize( $item );
                }
                static $script =
+               /** @lang Lua */
 <<<LUA
                local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
                -- First argument is the queue ID
@@ -344,6 +345,7 @@ LUA;
         */
        protected function popAndAcquireBlob( RedisConnRef $conn ) {
                static $script =
+               /** @lang Lua */
 <<<LUA
                local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
                local rTime = unpack(ARGV)
@@ -391,6 +393,7 @@ LUA;
                $conn = $this->getConnection();
                try {
                        static $script =
+                       /** @lang Lua */
 <<<LUA
                        local kClaimed, kAttempts, kData = unpack(KEYS)
                        local id = unpack(ARGV)
@@ -750,7 +753,7 @@ LUA;
         * @throws JobQueueConnectionError
         */
        protected function getConnection() {
-               $conn = $this->redisPool->getConnection( $this->server );
+               $conn = $this->redisPool->getConnection( $this->server, $this->logger );
                if ( !$conn ) {
                        throw new JobQueueConnectionError(
                                "Unable to connect to redis server {$this->server}." );
@@ -791,9 +794,9 @@ LUA;
        private function getGlobalKey( $name ) {
                $parts = [ 'global', 'jobqueue', $name ];
                foreach ( $parts as $part ) {
-                   if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
-                       throw new InvalidArgumentException( "Key part characters are out of range." );
-                   }
+                       if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
+                               throw new InvalidArgumentException( "Key part characters are out of range." );
+                       }
                }
 
                return implode( ':', $parts );