From: Erik Bernhardson Date: Fri, 3 Jun 2016 00:44:43 +0000 (-0700) Subject: kafka: Implement ack handling X-Git-Tag: 1.31.0-rc.0~6501^2 X-Git-Url: http://git.cyclocoop.org/fichier?a=commitdiff_plain;h=e761c8847ac7b5e192a038df5a2d06abeeb8e9a5;p=lhc%2Fweb%2Fwiklou.git kafka: Implement ack handling By default the kafka implementation we use doesn't require any kind of acknowledgment, it just throws messages into the wind and lets them sit where they may. Implement an option for KafkaHandler to specify the number of acks (number of replicas that must record the message) and some error handling to throw exceptions as necessary when there is a problem. Bug: T135159 Change-Id: I859dc791072db407f908b2f36be0d6704f1a6256 --- diff --git a/includes/debug/logger/monolog/KafkaHandler.php b/includes/debug/logger/monolog/KafkaHandler.php index 0fd3b086af..432a9e1490 100644 --- a/includes/debug/logger/monolog/KafkaHandler.php +++ b/includes/debug/logger/monolog/KafkaHandler.php @@ -22,6 +22,7 @@ namespace MediaWiki\Logger\Monolog; use Kafka\MetaDataFromKafka; use Kafka\Produce; +use Kafka\Protocol\Decoder; use MediaWiki\Logger\LoggerFactory; use Monolog\Handler\AbstractProcessingHandler; use Monolog\Logger; @@ -68,6 +69,7 @@ class KafkaHandler extends AbstractProcessingHandler { 'alias' => [], // map from monolog channel to kafka topic 'swallowExceptions' => false, // swallow exceptions sending records 'logExceptions' => null, // A PSR3 logger to inform about errors + 'requireAck' => 0, ]; /** @@ -118,6 +120,10 @@ class KafkaHandler extends AbstractProcessingHandler { $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] ); } + if ( isset( $options['requireAck'] ) ) { + $produce->setRequireAck( $options['requireAck'] ); + } + return new self( $produce, $options, $level, $bubble ); } @@ -165,13 +171,42 @@ class KafkaHandler extends AbstractProcessingHandler { */ protected function send() { try { - $this->produce->send(); + $response = $this->produce->send(); } catch ( \Kafka\Exception $e ) { $ignore = $this->warning( 'Error sending records to kafka: {exception}', [ 'exception' => $e ] ); if ( !$ignore ) { throw $e; + } else { + return; + } + } + + if ( is_bool( $response ) ) { + return; + } + + $errors = []; + foreach ( $response as $topicName => $partitionResponse ) { + foreach ( $partitionResponse as $partition => $info ) { + if ( $info['errCode'] === 0 ) { + // no error + continue; + } + $errors[] = sprintf( + 'Error producing to %s (errno %d): %s', + $topicName, + $info['errCode'], + Decoder::getError( $info['errCode'] ) + ); + } + } + + if ( $errors ) { + $error = implode( "\n", $errors ); + if ( !$this->warning( $error ) ) { + throw new \RuntimeException( $error ); } } } diff --git a/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php b/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php index e29d2071c5..cf97071ed6 100644 --- a/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php +++ b/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php @@ -58,6 +58,9 @@ class KafkaHandlerTest extends MediaWikiTestCase { $produce->expects( $this->once() ) ->method( 'setMessages' ) ->with( $expect, $this->anything(), $this->anything() ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->returnValue( true ) ); $handler = new KafkaHandler( $produce, $options ); $handler->handle( [ @@ -89,6 +92,9 @@ class KafkaHandlerTest extends MediaWikiTestCase { $produce->expects( $this->any() ) ->method( 'getAvailablePartitions' ) ->will( $this->throwException( new \Kafka\Exception ) ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->returnValue( true ) ); if ( $expectException ) { $this->setExpectedException( 'Kafka\Exception' ); @@ -147,6 +153,9 @@ class KafkaHandlerTest extends MediaWikiTestCase { ->will( $this->returnValue( [ 'A' ] ) ); $mockMethod = $produce->expects( $this->exactly( 2 ) ) ->method( 'setMessages' ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->returnValue( true ) ); // evil hax \TestingAccessWrapper::newFromObject( $mockMethod )->matcher->parametersMatcher = new \PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters( [ @@ -181,6 +190,9 @@ class KafkaHandlerTest extends MediaWikiTestCase { $produce->expects( $this->once() ) ->method( 'setMessages' ) ->with( $this->anything(), $this->anything(), [ 'words', 'lines' ] ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->returnValue( true ) ); $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' ); $formatter->expects( $this->any() )