X-Git-Url: http://git.cyclocoop.org/?a=blobdiff_plain;f=includes%2Fjobqueue%2Faggregator%2FJobQueueAggregatorRedis.php;h=6ae883712e25f41eaef2c266eb3a10f3bcefe1d8;hb=531ed101ccd14dc7e2cf2858a67b2523ef6a79ff;hp=9b4e7e90e2f6891e1980cb4e308972a145f57e9c;hpb=3cab263105feb5a64f996ab1b74802ba87dac8db;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php index 9b4e7e90e2..6ae883712e 100644 --- a/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php +++ b/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php @@ -20,10 +20,13 @@ * @file * @author Aaron Schulz */ +use Psr\Log\LoggerInterface; /** * Class to handle tracking information about all queues using PhpRedis * + * The mediawiki/services/jobrunner background service must be set up and running. + * * @ingroup JobQueue * @ingroup Redis * @since 1.21 @@ -31,10 +34,10 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { /** @var RedisConnectionPool */ protected $redisPool; + /** @var LoggerInterface */ + protected $logger; /** @var array List of Redis server addresses */ protected $servers; - /** @var bool */ - protected $registeredQueue = false; /** * @param array $params Possible keys: @@ -49,118 +52,51 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { parent::__construct( $params ); $this->servers = isset( $params['redisServers'] ) ? $params['redisServers'] - : array( $params['redisServer'] ); // b/c + : [ $params['redisServer'] ]; // b/c $params['redisConfig']['serializer'] = 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' ); } protected function doNotifyQueueEmpty( $wiki, $type ) { - $conn = $this->getConnection(); - if ( !$conn ) { - return false; - } - try { - $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); - - return true; - } catch ( RedisException $e ) { - $this->handleException( $conn, $e ); - - return false; - } + return true; // managed by the service } protected function doNotifyQueueNonEmpty( $wiki, $type ) { - $conn = $this->getConnection(); - if ( !$conn ) { - return false; - } - try { - $conn->multi( Redis::PIPELINE ); - if ( !$this->registeredQueue ) { - // Make sure the queue is registered as existing - $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); - $conn->sAdd( $this->getWikiSetKey(), $wiki ); - } - $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); - $conn->exec(); - - $this->registeredQueue = true; - - return true; - } catch ( RedisException $e ) { - $this->handleException( $conn, $e ); - - return false; - } + return true; // managed by the service } protected function doGetAllReadyWikiQueues() { $conn = $this->getConnection(); if ( !$conn ) { - return array(); + return []; } try { $map = $conn->hGetAll( $this->getReadyQueueKey() ); if ( is_array( $map ) && isset( $map['_epoch'] ) ) { unset( $map['_epoch'] ); // ignore - $pendingDBs = array(); // (type => list of wikis) + $pendingDBs = []; // (type => list of wikis) foreach ( $map as $key => $time ) { - list( $type, $wiki ) = $this->dencQueueName( $key ); + list( $type, $wiki ) = $this->decodeQueueName( $key ); $pendingDBs[$type][] = $wiki; } } else { - // Avoid duplicated effort - $rand = wfRandomString( 32 ); - $conn->multi( Redis::MULTI ); - $conn->setex( "{$rand}:lock", 3600, 1 ); - $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" ); - if ( $conn->exec() !== array( true, true ) ) { // lock - $conn->delete( "{$rand}:lock" ); - return array(); // already in progress - } - - $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) - - $conn->multi( Redis::PIPELINE ); - $now = time(); - $map = array( '_epoch' => time() ); // dummy key for empty Redis collections - foreach ( $pendingDBs as $type => $wikis ) { - $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); - foreach ( $wikis as $wiki ) { - $map[$this->encQueueName( $type, $wiki )] = $now; - } - } - $conn->hMSet( $this->getReadyQueueKey(), $map ); - $conn->exec(); - - $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock + throw new UnexpectedValueException( + "No queue listing found; make sure redisJobChronService is running." + ); } return $pendingDBs; } catch ( RedisException $e ) { - $this->handleException( $conn, $e ); + $this->redisPool->handleError( $conn, $e ); - return array(); + return []; } } protected function doPurge() { - $conn = $this->getConnection(); - if ( !$conn ) { - return false; - } - try { - $conn->delete( $this->getReadyQueueKey() ); - // leave key at getQueueTypesKey() alone - } catch ( RedisException $e ) { - $this->handleException( $conn, $e ); - - return false; - } - - return true; + return true; // fully and only refreshed by the service } /** @@ -172,7 +108,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { protected function getConnection() { $conn = false; foreach ( $this->servers as $server ) { - $conn = $this->redisPool->getConnection( $server ); + $conn = $this->redisPool->getConnection( $server, $this->logger ); if ( $conn ) { break; } @@ -181,15 +117,6 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { return $conn; } - /** - * @param RedisConnRef $conn - * @param RedisException $e - * @return void - */ - protected function handleException( RedisConnRef $conn, $e ) { - $this->redisPool->handleError( $conn, $e ); - } - /** * @return string */ @@ -197,36 +124,13 @@ class JobQueueAggregatorRedis extends JobQueueAggregator { return "jobqueue:aggregator:h-ready-queues:v2"; // global } - /** - * @return string - */ - private function getQueueTypesKey() { - return "jobqueue:aggregator:h-queue-types:v2"; // global - } - - /** - * @return string - */ - private function getWikiSetKey() { - return "jobqueue:aggregator:s-wikis:v2"; // global - } - - /** - * @param string $type - * @param string $wiki - * @return string - */ - private function encQueueName( $type, $wiki ) { - return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); - } - /** * @param string $name * @return string */ - private function dencQueueName( $name ) { + private function decodeQueueName( $name ) { list( $type, $wiki ) = explode( '/', $name, 2 ); - return array( rawurldecode( $type ), rawurldecode( $wiki ) ); + return [ rawurldecode( $type ), rawurldecode( $wiki ) ]; } }