Merge "kafka: Implement ack handling"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Wed, 29 Jun 2016 12:18:45 +0000 (12:18 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Wed, 29 Jun 2016 12:18:45 +0000 (12:18 +0000)
includes/debug/logger/monolog/KafkaHandler.php
tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php

index 0fd3b08..432a9e1 100644 (file)
@@ -22,6 +22,7 @@ namespace MediaWiki\Logger\Monolog;
 
 use Kafka\MetaDataFromKafka;
 use Kafka\Produce;
+use Kafka\Protocol\Decoder;
 use MediaWiki\Logger\LoggerFactory;
 use Monolog\Handler\AbstractProcessingHandler;
 use Monolog\Logger;
@@ -68,6 +69,7 @@ class KafkaHandler extends AbstractProcessingHandler {
                'alias' => [], // map from monolog channel to kafka topic
                'swallowExceptions' => false, // swallow exceptions sending records
                'logExceptions' => null, // A PSR3 logger to inform about errors
+               'requireAck' => 0,
        ];
 
        /**
@@ -118,6 +120,10 @@ class KafkaHandler extends AbstractProcessingHandler {
                        $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
                }
 
+               if ( isset( $options['requireAck'] ) ) {
+                       $produce->setRequireAck( $options['requireAck'] );
+               }
+
                return new self( $produce, $options, $level, $bubble );
        }
 
@@ -165,13 +171,42 @@ class KafkaHandler extends AbstractProcessingHandler {
         */
        protected function send() {
                try {
-                       $this->produce->send();
+                       $response = $this->produce->send();
                } catch ( \Kafka\Exception $e ) {
                        $ignore = $this->warning(
                                'Error sending records to kafka: {exception}',
                                [ 'exception' => $e ] );
                        if ( !$ignore ) {
                                throw $e;
+                       } else {
+                               return;
+                       }
+               }
+
+               if ( is_bool( $response ) ) {
+                       return;
+               }
+
+               $errors = [];
+               foreach ( $response as $topicName => $partitionResponse ) {
+                       foreach ( $partitionResponse as $partition => $info ) {
+                               if ( $info['errCode'] === 0 ) {
+                                       // no error
+                                       continue;
+                               }
+                               $errors[] = sprintf(
+                                       'Error producing to %s (errno %d): %s',
+                                       $topicName,
+                                       $info['errCode'],
+                                       Decoder::getError( $info['errCode'] )
+                               );
+                       }
+               }
+
+               if ( $errors ) {
+                       $error = implode( "\n", $errors );
+                       if ( !$this->warning( $error ) ) {
+                               throw new \RuntimeException( $error );
                        }
                }
        }
index 68ce640..d6249bb 100644 (file)
@@ -55,6 +55,9 @@ class KafkaHandlerTest extends MediaWikiTestCase {
                $produce->expects( $this->once() )
                        ->method( 'setMessages' )
                        ->with( $expect, $this->anything(), $this->anything() );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->returnValue( true ) );
 
                $handler = new KafkaHandler( $produce, $options );
                $handler->handle( [
@@ -86,6 +89,9 @@ class KafkaHandlerTest extends MediaWikiTestCase {
                $produce->expects( $this->any() )
                        ->method( 'getAvailablePartitions' )
                        ->will( $this->throwException( new \Kafka\Exception ) );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->returnValue( true ) );
 
                if ( $expectException ) {
                        $this->setExpectedException( 'Kafka\Exception' );
@@ -144,6 +150,9 @@ class KafkaHandlerTest extends MediaWikiTestCase {
                        ->will( $this->returnValue( [ 'A' ] ) );
                $mockMethod = $produce->expects( $this->exactly( 2 ) )
                        ->method( 'setMessages' );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->returnValue( true ) );
                // evil hax
                \TestingAccessWrapper::newFromObject( $mockMethod )->matcher->parametersMatcher =
                        new \PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters( [
@@ -178,6 +187,9 @@ class KafkaHandlerTest extends MediaWikiTestCase {
                $produce->expects( $this->once() )
                        ->method( 'setMessages' )
                        ->with( $this->anything(), $this->anything(), [ 'words', 'lines' ] );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->returnValue( true ) );
 
                $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' );
                $formatter->expects( $this->any() )