From e761c8847ac7b5e192a038df5a2d06abeeb8e9a5 Mon Sep 17 00:00:00 2001 From: Erik Bernhardson Date: Thu, 2 Jun 2016 17:44:43 -0700 Subject: [PATCH] kafka: Implement ack handling By default the kafka implementation we use doesn't require any kind of acknowledgment, it just throws messages into the wind and lets them sit where they may. Implement an option for KafkaHandler to specify the number of acks (number of replicas that must record the message) and some error handling to throw exceptions as necessary when there is a problem. Bug: T135159 Change-Id: I859dc791072db407f908b2f36be0d6704f1a6256 --- .../debug/logger/monolog/KafkaHandler.php | 37 ++++++++++++++++++- .../debug/logger/monolog/KafkaHandlerTest.php | 12 ++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/includes/debug/logger/monolog/KafkaHandler.php b/includes/debug/logger/monolog/KafkaHandler.php index 0fd3b086af..432a9e1490 100644 --- a/includes/debug/logger/monolog/KafkaHandler.php +++ b/includes/debug/logger/monolog/KafkaHandler.php @@ -22,6 +22,7 @@ namespace MediaWiki\Logger\Monolog; use Kafka\MetaDataFromKafka; use Kafka\Produce; +use Kafka\Protocol\Decoder; use MediaWiki\Logger\LoggerFactory; use Monolog\Handler\AbstractProcessingHandler; use Monolog\Logger; @@ -68,6 +69,7 @@ class KafkaHandler extends AbstractProcessingHandler { 'alias' => [], // map from monolog channel to kafka topic 'swallowExceptions' => false, // swallow exceptions sending records 'logExceptions' => null, // A PSR3 logger to inform about errors + 'requireAck' => 0, ]; /** @@ -118,6 +120,10 @@ class KafkaHandler extends AbstractProcessingHandler { $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] ); } + if ( isset( $options['requireAck'] ) ) { + $produce->setRequireAck( $options['requireAck'] ); + } + return new self( $produce, $options, $level, $bubble ); } @@ -165,13 +171,42 @@ class KafkaHandler extends AbstractProcessingHandler { */ protected function send() { try { - $this->produce->send(); + $response = $this->produce->send(); } catch ( \Kafka\Exception $e ) { $ignore = $this->warning( 'Error sending records to kafka: {exception}', [ 'exception' => $e ] ); if ( !$ignore ) { throw $e; + } else { + return; + } + } + + if ( is_bool( $response ) ) { + return; + } + + $errors = []; + foreach ( $response as $topicName => $partitionResponse ) { + foreach ( $partitionResponse as $partition => $info ) { + if ( $info['errCode'] === 0 ) { + // no error + continue; + } + $errors[] = sprintf( + 'Error producing to %s (errno %d): %s', + $topicName, + $info['errCode'], + Decoder::getError( $info['errCode'] ) + ); + } + } + + if ( $errors ) { + $error = implode( "\n", $errors ); + if ( !$this->warning( $error ) ) { + throw new \RuntimeException( $error ); } } } diff --git a/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php b/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php index e29d2071c5..cf97071ed6 100644 --- a/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php +++ b/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php @@ -58,6 +58,9 @@ class KafkaHandlerTest extends MediaWikiTestCase { $produce->expects( $this->once() ) ->method( 'setMessages' ) ->with( $expect, $this->anything(), $this->anything() ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->returnValue( true ) ); $handler = new KafkaHandler( $produce, $options ); $handler->handle( [ @@ -89,6 +92,9 @@ class KafkaHandlerTest extends MediaWikiTestCase { $produce->expects( $this->any() ) ->method( 'getAvailablePartitions' ) ->will( $this->throwException( new \Kafka\Exception ) ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->returnValue( true ) ); if ( $expectException ) { $this->setExpectedException( 'Kafka\Exception' ); @@ -147,6 +153,9 @@ class KafkaHandlerTest extends MediaWikiTestCase { ->will( $this->returnValue( [ 'A' ] ) ); $mockMethod = $produce->expects( $this->exactly( 2 ) ) ->method( 'setMessages' ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->returnValue( true ) ); // evil hax \TestingAccessWrapper::newFromObject( $mockMethod )->matcher->parametersMatcher = new \PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters( [ @@ -181,6 +190,9 @@ class KafkaHandlerTest extends MediaWikiTestCase { $produce->expects( $this->once() ) ->method( 'setMessages' ) ->with( $this->anything(), $this->anything(), [ 'words', 'lines' ] ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->returnValue( true ) ); $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' ); $formatter->expects( $this->any() ) -- 2.20.1