use Kafka\MetaDataFromKafka;
use Kafka\Produce;
+use Kafka\Protocol\Decoder;
use MediaWiki\Logger\LoggerFactory;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\Logger;
'alias' => [], // map from monolog channel to kafka topic
'swallowExceptions' => false, // swallow exceptions sending records
'logExceptions' => null, // A PSR3 logger to inform about errors
+ 'requireAck' => 0,
];
/**
$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
}
+ if ( isset( $options['requireAck'] ) ) {
+ $produce->setRequireAck( $options['requireAck'] );
+ }
+
return new self( $produce, $options, $level, $bubble );
}
*/
protected function send() {
try {
- $this->produce->send();
+ $response = $this->produce->send();
} catch ( \Kafka\Exception $e ) {
$ignore = $this->warning(
'Error sending records to kafka: {exception}',
[ 'exception' => $e ] );
if ( !$ignore ) {
throw $e;
+ } else {
+ return;
+ }
+ }
+
+ if ( is_bool( $response ) ) {
+ return;
+ }
+
+ $errors = [];
+ foreach ( $response as $topicName => $partitionResponse ) {
+ foreach ( $partitionResponse as $partition => $info ) {
+ if ( $info['errCode'] === 0 ) {
+ // no error
+ continue;
+ }
+ $errors[] = sprintf(
+ 'Error producing to %s (errno %d): %s',
+ $topicName,
+ $info['errCode'],
+ Decoder::getError( $info['errCode'] )
+ );
+ }
+ }
+
+ if ( $errors ) {
+ $error = implode( "\n", $errors );
+ if ( !$this->warning( $error ) ) {
+ throw new \RuntimeException( $error );
}
}
}
$produce->expects( $this->once() )
->method( 'setMessages' )
->with( $expect, $this->anything(), $this->anything() );
+ $produce->expects( $this->any() )
+ ->method( 'send' )
+ ->will( $this->returnValue( true ) );
$handler = new KafkaHandler( $produce, $options );
$handler->handle( [
$produce->expects( $this->any() )
->method( 'getAvailablePartitions' )
->will( $this->throwException( new \Kafka\Exception ) );
+ $produce->expects( $this->any() )
+ ->method( 'send' )
+ ->will( $this->returnValue( true ) );
if ( $expectException ) {
$this->setExpectedException( 'Kafka\Exception' );
->will( $this->returnValue( [ 'A' ] ) );
$mockMethod = $produce->expects( $this->exactly( 2 ) )
->method( 'setMessages' );
+ $produce->expects( $this->any() )
+ ->method( 'send' )
+ ->will( $this->returnValue( true ) );
// evil hax
\TestingAccessWrapper::newFromObject( $mockMethod )->matcher->parametersMatcher =
new \PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters( [
$produce->expects( $this->once() )
->method( 'setMessages' )
->with( $this->anything(), $this->anything(), [ 'words', 'lines' ] );
+ $produce->expects( $this->any() )
+ ->method( 'send' )
+ ->will( $this->returnValue( true ) );
$formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' );
$formatter->expects( $this->any() )