From f66559b616f47c35dcfe464fe61835c0cdcd7591 Mon Sep 17 00:00:00 2001 From: Erik Bernhardson Date: Tue, 4 Aug 2015 11:02:47 -0700 Subject: [PATCH] Produce monolog messages through kafka+avro This allows a logging channel to be configured to write directly to kafka. Logs can be serialized either to json blobs or the more compact apache avro format. The Kafka handler for monolog needs a list of one of more kafka servers to query cluster metadata from. This should be able to use any monolog formatter, although some like JsonFormatter require you to disable formatBatch as Kafka protocol would prefer to encode each record independently in the protocol. This requires the nmred/kafka-php library, version >= 1.3.0. Adds a new formatter which serializes to the apache avro format. This is a compact binary format which uses pre- defined schemas. This initial implementation is very simple and takes the plain schemas as a constructor argument. Adds a new option to MonologSpi to wrap handlers in a BufferHandler. This doesn't flush until the request shuts down and prevents any network requests in the logger from adding latency to web requests. Related mediawiki/vendor update: Ibfe4bd2036ae8e998e2973f07bd9a6f057691578 The necessary config is something like: array( 'loggers' => array( 'CirrusSearchRequests' => array( 'handlers' => array( 'kafka' ), ), ), 'handlers' => array( 'kafka' => array( 'factory' => '\\MediaWiki\\Logger\\Monolog\\KafkaHandler::factory', 'args' => array( 'localhost:9092' ), 'formatter' => 'avro', 'buffer' => true, ), ), 'formatters' => array( 'avro' => array( 'class' => '\\MediaWiki\\Logger\\Monolog\\AvroFormatter', 'args' => array( array( 'CirrusSearchRequests' => array( 'type' => 'record', 'name' => 'CirrusSearchRequests' 'fields' => array( ... ) ), ), ), ), ), ) Bug: T106256 Change-Id: I6ee744b3e5306af0bed70811b558a543eed22840 --- autoload.php | 4 + composer.json | 4 +- includes/debug/logger/MonologSpi.php | 5 + .../debug/logger/monolog/AvroFormatter.php | 139 +++++++++++ .../debug/logger/monolog/BufferHandler.php | 47 ++++ .../debug/logger/monolog/KafkaHandler.php | 224 ++++++++++++++++++ includes/utils/AvroValidator.php | 184 ++++++++++++++ .../includes/ConsecutiveParametersMatcher.php | 123 ++++++++++ .../logger/monolog/AvroFormatterTest.php | 64 +++++ .../debug/logger/monolog/KafkaHandlerTest.php | 204 ++++++++++++++++ .../includes/utils/AvroValidatorTest.php | 96 ++++++++ 11 files changed, 1093 insertions(+), 1 deletion(-) create mode 100644 includes/debug/logger/monolog/AvroFormatter.php create mode 100644 includes/debug/logger/monolog/BufferHandler.php create mode 100644 includes/debug/logger/monolog/KafkaHandler.php create mode 100644 includes/utils/AvroValidator.php create mode 100644 tests/phpunit/includes/ConsecutiveParametersMatcher.php create mode 100644 tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php create mode 100644 tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php create mode 100644 tests/phpunit/includes/utils/AvroValidatorTest.php diff --git a/autoload.php b/autoload.php index 5adfbe5104..03728e9d2a 100644 --- a/autoload.php +++ b/autoload.php @@ -149,6 +149,7 @@ $wgAutoloadLocalClasses = array( 'AutoLoader' => __DIR__ . '/includes/AutoLoader.php', 'AutoloadGenerator' => __DIR__ . '/includes/utils/AutoloadGenerator.php', 'Autopromote' => __DIR__ . '/includes/Autopromote.php', + 'AvroValidator' => __DIR__ . '/includes/utils/AvroValidator.php', 'BacklinkCache' => __DIR__ . '/includes/cache/BacklinkCache.php', 'BacklinkJobUtils' => __DIR__ . '/includes/jobqueue/utils/BacklinkJobUtils.php', 'BackupDumper' => __DIR__ . '/maintenance/backup.inc', @@ -753,6 +754,9 @@ $wgAutoloadLocalClasses = array( 'MediaWiki\\Logger\\LegacySpi' => __DIR__ . '/includes/debug/logger/LegacySpi.php', 'MediaWiki\\Logger\\LoggerFactory' => __DIR__ . '/includes/debug/logger/LoggerFactory.php', 'MediaWiki\\Logger\\MonologSpi' => __DIR__ . '/includes/debug/logger/MonologSpi.php', + 'MediaWiki\\Logger\\Monolog\\AvroFormatter' => __DIR__ . '/includes/debug/logger/monolog/AvroFormatter.php', + 'MediaWiki\\Logger\\Monolog\\BufferHandler' => __DIR__ . '/includes/debug/logger/monolog/BufferHandler.php', + 'MediaWiki\\Logger\\Monolog\\KafkaHandler' => __DIR__ . '/includes/debug/logger/monolog/KafkaHandler.php', 'MediaWiki\\Logger\\Monolog\\LegacyFormatter' => __DIR__ . '/includes/debug/logger/monolog/LegacyFormatter.php', 'MediaWiki\\Logger\\Monolog\\LegacyHandler' => __DIR__ . '/includes/debug/logger/monolog/LegacyHandler.php', 'MediaWiki\\Logger\\Monolog\\LineFormatter' => __DIR__ . '/includes/debug/logger/monolog/LineFormatter.php', diff --git a/composer.json b/composer.json index 1fe1e5068d..1ba5827927 100644 --- a/composer.json +++ b/composer.json @@ -45,9 +45,11 @@ "ext-wikidiff2": "Diff accelerator", "ext-apc": "Local data and opcode cache", "monolog/monolog": "Flexible debug logging system", + "nmred/kafka-php": "Send debug log events to kafka", "pear/mail": "Mail sending support", "pear/mail_mime": "Mail sending support", - "pear/mail_mime-decode": "Mail sending support" + "pear/mail_mime-decode": "Mail sending support", + "wikimedia/avro": "Binary serialization format used with kafka" }, "autoload": { "psr-0": { diff --git a/includes/debug/logger/MonologSpi.php b/includes/debug/logger/MonologSpi.php index 7b54861127..274e18e1d0 100644 --- a/includes/debug/logger/MonologSpi.php +++ b/includes/debug/logger/MonologSpi.php @@ -20,6 +20,7 @@ namespace MediaWiki\Logger; +use MediaWiki\Logger\Monolog\BufferHandler; use Monolog\Logger; use ObjectFactory; @@ -84,6 +85,7 @@ use ObjectFactory; * 'logstash' * ), * 'formatter' => 'logstash', + * 'buffer' => true, * ), * 'udp2log' => array( * 'class' => '\\MediaWiki\\Logger\\Monolog\\LegacyHandler', @@ -247,6 +249,9 @@ class MonologSpi implements Spi { $this->getFormatter( $spec['formatter'] ) ); } + if ( isset( $spec['buffer'] ) && $spec['buffer'] ) { + $handler = new BufferHandler( $handler ); + } $this->singletons['handlers'][$name] = $handler; } return $this->singletons['handlers'][$name]; diff --git a/includes/debug/logger/monolog/AvroFormatter.php b/includes/debug/logger/monolog/AvroFormatter.php new file mode 100644 index 0000000000..b6adab4a05 --- /dev/null +++ b/includes/debug/logger/monolog/AvroFormatter.php @@ -0,0 +1,139 @@ + + * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation. + */ +class AvroFormatter implements FormatterInterface { + /** + * @var array Map from schema name to schema definition + */ + protected $schemas; + + /** + * @var AvroStringIO + */ + protected $io; + + /** + * @var AvroIOBinaryEncoder + */ + protected $encoder; + + /** + * @var AvroIODatumWriter + */ + protected $writer; + + /** + * @var array $schemas Map from Monolog channel to Avro schema. + * Each schema can be either the JSON string or decoded into PHP + * arrays. + */ + public function __construct( array $schemas ) { + $this->schemas = $schemas; + $this->io = new AvroStringIO( '' ); + $this->encoder = new AvroIOBinaryEncoder( $this->io ); + $this->writer = new AvroIODatumWriter(); + } + + /** + * Formats the record context into a binary string per the + * schema configured for the records channel. + * + * @param array $record + * @return string|null The serialized record, or null if + * the record is not valid for the selected schema. + */ + public function format( array $record ) { + $this->io->truncate(); + $schema = $this->getSchema( $record['channel'] ); + if ( $schema === null ) { + trigger_error( "The schema for channel '{$record['channel']}' is not available" ); + return null; + } + try { + $this->writer->write_data( $schema, $record['context'], $this->encoder ); + } catch ( AvroIOTypeException $e ) { + $errors = AvroValidator::getErrors( $schema, $record['context'] ); + $json = json_encode( $errors ); + trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" ); + return null; + } + return $this->io->string(); + } + + /** + * Format a set of records into a list of binary strings + * conforming to the configured schema. + * + * @param array $records + * @return string[] + */ + public function formatBatch( array $records ) { + $result = array(); + foreach ( $records as $record ) { + $message = $this->format( $record ); + if ( $message !== null ) { + $result[] = $message; + } + } + return $result; + } + + /** + * Get the writer for the named channel + * + * @var string $channel Name of the schema to fetch + * @return AvroSchema|null + */ + protected function getSchema( $channel ) { + if ( !isset( $this->schemas[$channel] ) ) { + return null; + } + if ( !$this->schemas[$channel] instanceof AvroSchema ) { + if ( is_string( $this->schemas[$channel] ) ) { + $this->schemas[$channel] = AvroSchema::parse( $this->schemas[$channel] ); + } else { + $this->schemas[$channel] = AvroSchema::real_parse( + $this->schemas[$channel], + null, + new AvroNamedSchemata() + ); + } + } + return $this->schemas[$channel]; + } +} diff --git a/includes/debug/logger/monolog/BufferHandler.php b/includes/debug/logger/monolog/BufferHandler.php new file mode 100644 index 0000000000..3ebd0b1f30 --- /dev/null +++ b/includes/debug/logger/monolog/BufferHandler.php @@ -0,0 +1,47 @@ +initialized) { + DeferredUpdates::addCallableUpdate( array( $this, 'close' ) ); + $this->initialized = true; + } + return parent::handle( $record ); + } +} + diff --git a/includes/debug/logger/monolog/KafkaHandler.php b/includes/debug/logger/monolog/KafkaHandler.php new file mode 100644 index 0000000000..59d7764a95 --- /dev/null +++ b/includes/debug/logger/monolog/KafkaHandler.php @@ -0,0 +1,224 @@ += 1.3.0 + * + * @since 1.26 + * @author Erik Bernhardson + * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation. + */ +class KafkaHandler extends AbstractProcessingHandler { + /** + * @var Produce Sends requests to kafka + */ + protected $produce; + + /** + * @var array Optional handler configuration + */ + protected $options; + + /** + * @var array Map from topic name to partition this request produces to + */ + protected $partitions = array(); + + /** + * @var array defaults for constructor options + */ + private static $defaultOptions = array( + 'alias' => array(), // map from monolog channel to kafka topic + 'swallowExceptions' => false, // swallow exceptions sending records + 'logExceptions' => null, // A PSR3 logger to inform about errors + ); + + /** + * @param Produce $produce Kafka instance to produce through + * @param array $options optional handler configuration + * @param int $level The minimum logging level at which this handler will be triggered + * @param bool $bubble Whether the messages that are handled can bubble up the stack or not + */ + public function __construct( Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true ) { + parent::__construct( $level, $bubble ); + $this->produce = $produce; + $this->options = array_merge( self::$defaultOptions, $options ); + } + + /** + * Constructs the necessary support objects and returns a KafkaHandler + * instance. + * + * @param string[] $kafkaServers + * @param array $options + * @param int $level The minimum logging level at which this handle will be triggered + * @param bool $bubble Whether the messages that are handled can bubble the stack or not + * @return KafkaHandler + */ + public static function factory( $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true ) { + $metadata = new MetaDataFromKafka( $kafkaServers ); + $produce = new Produce( $metadata ); + if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) { + $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] ); + } + return new self( $produce, $options, $level, $bubble ); + } + + /** + * {@inheritDoc} + */ + protected function write( array $record ) { + if ( $record['formatted'] !== null ) { + $this->addMessages( $record['channel'], array( $record['formatted'] ) ); + $this->send(); + } + } + + /** + * {@inheritDoc} + */ + public function handleBatch( array $batch ) { + $channels = array(); + foreach ( $batch as $record ) { + if ( $record['level'] < $this->level ) { + continue; + } + $channels[$record['channel']][] = $this->processRecord( $record ); + } + + $formatter = $this->getFormatter(); + foreach ( $channels as $channel => $records ) { + $messages = array(); + foreach ( $records as $idx => $record ) { + $message = $formatter->format( $record ); + if ( $message !== null ) { + $messages[] = $message; + } + } + if ( $messages ) { + $this->addMessages($channel, $messages); + } + } + + $this->send(); + } + + /** + * Send any records in the kafka client internal queue. + */ + protected function send() { + try { + $this->produce->send(); + } catch ( \Kafka\Exception $e ) { + $ignore = $this->warning( + 'Error sending records to kafka: {exception}', + array( 'exception' => $e ) ); + if ( !$ignore ) { + throw $e; + } + } + } + + /** + * @param string $topic Name of topic to get partition for + * @return int|null The random partition to produce to for this request, + * or null if a partition could not be determined. + */ + protected function getRandomPartition( $topic ) { + if ( !array_key_exists( $topic, $this->partitions ) ) { + try { + $partitions = $this->produce->getAvailablePartitions( $topic ); + } catch ( \Kafka\Exception $e ) { + $ignore = $this->warning( + 'Error getting metadata for kafka topic {topic}: {exception}', + array( 'topic' => $topic, 'exception' => $e ) ); + if ( $ignore ) { + return null; + } + throw $e; + } + if ( $partitions ) { + $key = array_rand( $partitions ); + $this->partitions[$topic] = $partitions[$key]; + } else { + $details = $this->produce->getClient()->getTopicDetail( $topic ); + $ignore = $this->warning( + 'No partitions available for kafka topic {topic}', + array( 'topic' => $topic, 'kafka' => $details ) + ); + if ( !$ignore ) { + throw new \RuntimeException( "No partitions available for kafka topic $topic" ); + } + $this->partitions[$topic] = null; + } + } + return $this->partitions[$topic]; + } + + /** + * Adds records for a channel to the Kafka client internal queue. + * + * @param string $channel Name of Monolog channel records belong to + * @param array $records List of records to append + */ + protected function addMessages( $channel, array $records ) { + if ( isset( $this->options['alias'][$channel] ) ) { + $topic = $this->options['alias'][$channel]; + } else { + $topic = "monolog_$channel"; + } + $partition = $this->getRandomPartition( $topic ); + if ( $partition !== null ) { + $this->produce->setMessages( $topic, $partition, $records ); + } + } + + /** + * @param string $message PSR3 compatible message string + * @param array $context PSR3 compatible log context + * @return bool true if caller should ignore warning + */ + protected function warning( $message, array $context = array() ) { + if ( $this->options['logExceptions'] instanceof LoggerInterface ) { + $this->options['logExceptions']->warning( $message, $context ); + } + return $this->options['swallowExceptions']; + } +} diff --git a/includes/utils/AvroValidator.php b/includes/utils/AvroValidator.php new file mode 100644 index 0000000000..4f8e0b177b --- /dev/null +++ b/includes/utils/AvroValidator.php @@ -0,0 +1,184 @@ + + * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation. + */ +class AvroValidator { + /** + * @param AvroSchema $schema The rules to conform to. + * @param mixed $datum The value to validate against $schema. + * @return string|string[] An error or list of errors in the + * provided $datum. When no errors exist the empty array is + * returned. + */ + public static function getErrors( AvroSchema $schema, $datum ) { + switch ( $schema->type) { + case AvroSchema::NULL_TYPE: + if ( !is_null($datum) ) { + return self::wrongType( 'null', $datum ); + } + return array(); + case AvroSchema::BOOLEAN_TYPE: + if ( !is_bool($datum) ) { + return self::wrongType( 'boolean', $datum ); + } + return array(); + case AvroSchema::STRING_TYPE: + case AvroSchema::BYTES_TYPE: + if ( !is_string($datum) ) { + return self::wrongType( 'string', $datum ); + } + return array(); + case AvroSchema::INT_TYPE: + if ( !is_int($datum) ) { + return self::wrongType( 'integer', $datum ); + } + if ( AvroSchema::INT_MIN_VALUE > $datum + || $datum > AvroSchema::INT_MAX_VALUE + ) { + return self::outOfRange( + AvroSchema::INT_MIN_VALUE, + AvroSchema::INT_MAX_VALUE, + $datum + ); + } + return array(); + case AvroSchema::LONG_TYPE: + if ( !is_int($datum) ) { + return self::wrongType( 'integer', $datum ); + } + if ( AvroSchema::LONG_MIN_VALUE > $datum + || $datum > AvroSchema::LONG_MAX_VALUE + ) { + return self::outOfRange( + AvroSchema::LONG_MIN_VALUE, + AvroSchema::LONG_MAX_VALUE, + $datum + ); + } + return array(); + case AvroSchema::FLOAT_TYPE: + case AvroSchema::DOUBLE_TYPE: + if ( !is_float($datum) && !is_int($datum) ) { + return self::wrongType( 'float or integer', $datum ); + } + return array(); + case AvroSchema::ARRAY_SCHEMA: + if (!is_array($datum)) { + return self::wrongType( 'array', $datum ); + } + $errors = array(); + foreach ($datum as $d) { + $result = $this->validate( $schema->items(), $d ); + if ( $result ) { + $errors[] = $result; + } + } + if ( $errors ) { + return $errors; + } + return array(); + case AvroSchema::MAP_SCHEMA: + if (!is_array($datum)) { + return self::wrongType( 'array', $datum ); + } + $errors = array(); + foreach ($datum as $k => $v) { + if ( !is_string($k) ) { + $errors[] = self::wrongType( 'string key', $k ); + } + $result = self::getErrors( $schema->values(), $v ); + if ( $result ) { + $errors[$k] = $result; + } + } + return $errors; + case AvroSchema::UNION_SCHEMA: + $errors = array(); + foreach ($schema->schemas() as $schema) { + $result = self::getErrors( $schema, $datum ); + if ( !$result ) { + return array(); + } + $errors[] = $result; + } + if ( $errors ) { + return array( "Expected any one of these to be true", $errors ); + } + return "No schemas provided to union"; + case AvroSchema::ENUM_SCHEMA: + if ( !in_array( $datum, $schema->symbols() ) ) { + $symbols = implode( ', ', $schema->symbols ); + return "Expected one of $symbols but recieved $datum"; + } + return array(); + case AvroSchema::FIXED_SCHEMA: + if ( !is_string( $datum ) ) { + return self::wrongType( 'string', $datum ); + } + $len = strlen( $datum ); + if ( $len !== $schema->size() ) { + return "Expected string of length {$schema->size()}, " + . "but recieved one of length $len"; + } + return array(); + case AvroSchema::RECORD_SCHEMA: + case AvroSchema::ERROR_SCHEMA: + case AvroSchema::REQUEST_SCHEMA: + if ( !is_array( $datum ) ) { + return self::wrongType( 'array', $datum ); + } + $errors = array(); + foreach ( $schema->fields() as $field ) { + $name = $field->name(); + if ( !array_key_exists( $name, $datum ) ) { + $errors[$name] = 'Missing expected field'; + continue; + } + $result = self::getErrors( $field->type(), $datum[$name] ); + if ( $result ) { + $errors[$name] = $result; + } + } + return $errors; + default: + return "Unknown avro schema type: {$schema->type}"; + } + } + + public static function typeOf( $datum ) { + return is_object( $datum ) ? get_class( $datum ) : gettype( $datum ); + } + + public static function wrongType( $expected, $datum ) { + return "Expected $expected, but recieved " . self::typeOf( $datum ); + } + + public static function outOfRange( $min, $max, $datum ) { + return "Expected value between $min and $max, but recieved $datum"; + } +} diff --git a/tests/phpunit/includes/ConsecutiveParametersMatcher.php b/tests/phpunit/includes/ConsecutiveParametersMatcher.php new file mode 100644 index 0000000000..adf74bb4c3 --- /dev/null +++ b/tests/phpunit/includes/ConsecutiveParametersMatcher.php @@ -0,0 +1,123 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +/** + * Invocation matcher which looks for sets of specific parameters in the invocations. + * + * Checks the parameters of the incoming invocations, the parameter list is + * checked against the defined constraints in $parameters. If the constraint + * is met it will return true in matches(). + * + * It takes a list of match groups and and increases a call index after each invocation. + * So the first invocation uses the first group of constraints, the second the next and so on. + */ +class PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters extends PHPUnit_Framework_MockObject_Matcher_StatelessInvocation +{ + /** + * @var array + */ + private $_parameterGroups = array(); + + /** + * @var array + */ + private $_invocations = array(); + + /** + * @param array $parameterGroups + */ + public function __construct(array $parameterGroups) + { + foreach ($parameterGroups as $index => $parameters) { + foreach ($parameters as $parameter) { + if (!($parameter instanceof \PHPUnit_Framework_Constraint)) { + $parameter = new \PHPUnit_Framework_Constraint_IsEqual($parameter); + } + $this->_parameterGroups[$index][] = $parameter; + } + } + } + + /** + * @return string + */ + public function toString() + { + $text = 'with consecutive parameters'; + + return $text; + } + + /** + * @param PHPUnit_Framework_MockObject_Invocation $invocation + * @return bool + */ + public function matches(PHPUnit_Framework_MockObject_Invocation $invocation) + { + $this->_invocations[] = $invocation; + $callIndex = count($this->_invocations) - 1; + $this->verifyInvocation($invocation, $callIndex); + + return false; + } + + public function verify() + { + foreach ($this->_invocations as $callIndex => $invocation) { + $this->verifyInvocation($invocation, $callIndex); + } + } + + /** + * Verify a single invocation + * + * @param PHPUnit_Framework_MockObject_Invocation $invocation + * @param int $callIndex + * @throws PHPUnit_Framework_ExpectationFailedException + */ + private function verifyInvocation(PHPUnit_Framework_MockObject_Invocation $invocation, $callIndex) + { + + if (isset($this->_parameterGroups[$callIndex])) { + $parameters = $this->_parameterGroups[$callIndex]; + } else { + // no parameter assertion for this call index + return; + } + + if ($invocation === null) { + throw new PHPUnit_Framework_ExpectationFailedException( + 'Mocked method does not exist.' + ); + } + + if (count($invocation->parameters) < count($parameters)) { + throw new PHPUnit_Framework_ExpectationFailedException( + sprintf( + 'Parameter count for invocation %s is too low.', + $invocation->toString() + ) + ); + } + + foreach ($parameters as $i => $parameter) { + $parameter->evaluate( + $invocation->parameters[$i], + sprintf( + 'Parameter %s for invocation #%d %s does not match expected ' . + 'value.', + $i, + $callIndex, + $invocation->toString() + ) + ); + } + } +} diff --git a/tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php b/tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php new file mode 100644 index 0000000000..4c6d25e0ca --- /dev/null +++ b/tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php @@ -0,0 +1,64 @@ +markTestSkipped( 'Avro is required for the AvroFormatterTest' ); + } + parent::setUp(); + } + + public function testSchemaNotAvailable() { + $formatter = new AvroFormatter( array() ); + $this->setExpectedException( 'PHPUnit_Framework_Error_Notice', "The schema for channel 'marty' is not available" ); + $formatter->format( array( 'channel' => 'marty' ) ); + } + + public function testSchemaNotAvailableReturnValue() { + $formatter = new AvroFormatter( array() ); + $noticeEnabled = PHPUnit_Framework_Error_Notice::$enabled; + // disable conversion of notices + PHPUnit_Framework_Error_Notice::$enabled = false; + // have to keep the user notice from being output + wfSuppressWarnings(); + $res = $formatter->format( array( 'channel' => 'marty' ) ); + wfRestoreWarnings(); + PHPUnit_Framework_Error_Notice::$enabled = $noticeEnabled; + $this->assertNull( $res ); + } + + public function testDoesSomethingWhenSchemaAvailable() { + $formatter = new AvroFormatter( array( 'string' => array( 'type' => 'string' ) ) ); + $res = $formatter->format( array( + 'channel' => 'string', + 'context' => 'better to be', + ) ); + $this->assertNotNull( $res ); + // basically just tell us if avro changes its string encoding + $this->assertEquals( base64_decode( 'GGJldHRlciB0byBiZQ==' ), $res ); + } +} diff --git a/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php b/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php new file mode 100644 index 0000000000..272f6e4827 --- /dev/null +++ b/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php @@ -0,0 +1,204 @@ +markTestSkipped( 'Monolog is required for the KafkaHandlerTest' ); + } + parent::setUp(); + } + + public function topicNamingProvider() { + return array( + array( array(), 'monolog_foo' ), + array( array( 'alias' => array( 'foo' => 'bar' ) ), 'bar' ) + ); + } + + /** + * @dataProvider topicNamingProvider + */ + public function testTopicNaming( $options, $expect ) { + $produce = $this->getMockBuilder( 'Kafka\Produce' ) + ->disableOriginalConstructor() + ->getMock(); + $produce->expects($this->any()) + ->method('getAvailablePartitions') + ->will($this->returnValue( array( 'A' ) ) ); + $produce->expects($this->once()) + ->method( 'setMessages' ) + ->with( $expect, $this->anything(), $this->anything() ); + + $handler = new KafkaHandler( $produce, $options ); + $handler->handle( array( + 'channel' => 'foo', + 'level' => Logger::EMERGENCY, + 'extra' => array(), + ) ); + } + + public function swallowsExceptionsWhenRequested() { + return array( + // defaults to false + array( array(), true ), + // also try false explicitly + array( array( 'swallowExceptions' => false ), true ), + // turn it on + array( array( 'swallowExceptions' => true ), false ), + ); + } + + /** + * @dataProvider swallowsExceptionsWhenRequested + */ + public function testGetAvailablePartitionsException( $options, $expectException ) { + $produce = $this->getMockBuilder( 'Kafka\Produce' ) + ->disableOriginalConstructor() + ->getMock(); + $produce->expects( $this->any() ) + ->method( 'getAvailablePartitions' ) + ->will( $this->throwException( new \Kafka\Exception ) ); + + if ( $expectException ) { + $this->setExpectedException( 'Kafka\Exception' ); + } + + $handler = new KafkaHandler( $produce, $options ); + $handler->handle( array( + 'channel' => 'foo', + 'level' => Logger::EMERGENCY, + 'extra' => array(), + ) ); + + if ( !$expectException ) { + $this->assertTrue( true, 'no exception was thrown' ); + } + } + + /** + * @dataProvider swallowsExceptionsWhenRequested + */ + public function testSendException( $options, $expectException ) { + $produce = $this->getMockBuilder( 'Kafka\Produce' ) + ->disableOriginalConstructor() + ->getMock(); + $produce->expects( $this->any() ) + ->method( 'getAvailablePartitions' ) + ->will( $this->returnValue( array( 'A' ) ) ); + $produce->expects( $this->any() ) + ->method( 'send' ) + ->will( $this->throwException( new \Kafka\Exception ) ); + + if ( $expectException ) { + $this->setExpectedException( 'Kafka\Exception' ); + } + + $handler = new KafkaHandler( $produce, $options ); + $handler->handle( array( + 'channel' => 'foo', + 'level' => Logger::EMERGENCY, + 'extra' => array(), + ) ); + + if ( !$expectException ) { + $this->assertTrue( true, 'no exception was thrown' ); + } + } + + public function testHandlesNullFormatterResult() { + $produce = $this->getMockBuilder( 'Kafka\Produce' ) + ->disableOriginalConstructor() + ->getMock(); + $produce->expects( $this->any() ) + ->method( 'getAvailablePartitions' ) + ->will( $this->returnValue( array( 'A' ) ) ); + $mockMethod = $produce->expects( $this->exactly( 2 ) ) + ->method( 'setMessages' ); + // evil hax + \TestingAccessWrapper::newFromObject( $mockMethod )->matcher->parametersMatcher = + new \PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters( array( + array( $this->anything(), $this->anything(), array( 'words' ) ), + array( $this->anything(), $this->anything(), array( 'lines' ) ) + ) ); + + $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' ); + $formatter->expects( $this->any() ) + ->method( 'format' ) + ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) ); + + $handler = new KafkaHandler( $produce, array() ); + $handler->setFormatter( $formatter ); + for ( $i = 0; $i < 3; ++$i ) { + $handler->handle( array( + 'channel' => 'foo', + 'level' => Logger::EMERGENCY, + 'extra' => array(), + ) ); + } + } + + + public function testBatchHandlesNullFormatterResult() { + $produce = $this->getMockBuilder( 'Kafka\Produce' ) + ->disableOriginalConstructor() + ->getMock(); + $produce->expects( $this->any() ) + ->method( 'getAvailablePartitions' ) + ->will( $this->returnValue( array( 'A' ) ) ); + $produce->expects( $this->once() ) + ->method( 'setMessages' ) + ->with( $this->anything(), $this->anything(), array( 'words', 'lines' ) ); + + $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' ); + $formatter->expects( $this->any() ) + ->method( 'format' ) + ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) ); + + $handler = new KafkaHandler( $produce, array() ); + $handler->setFormatter( $formatter ); + $handler->handleBatch( array( + array( + 'channel' => 'foo', + 'level' => Logger::EMERGENCY, + 'extra' => array(), + ), + array( + 'channel' => 'foo', + 'level' => Logger::EMERGENCY, + 'extra' => array(), + ), + array( + 'channel' => 'foo', + 'level' => Logger::EMERGENCY, + 'extra' => array(), + ), + ) ); + } +} diff --git a/tests/phpunit/includes/utils/AvroValidatorTest.php b/tests/phpunit/includes/utils/AvroValidatorTest.php new file mode 100644 index 0000000000..52c242c17d --- /dev/null +++ b/tests/phpunit/includes/utils/AvroValidatorTest.php @@ -0,0 +1,96 @@ +markTestSkipped( 'Avro is required to run the AvroValidatorTest' ); + } + parent::setUp(); + } + + public function getErrorsProvider() { + $stringSchema = AvroSchema::parse( json_encode( array( 'type' => 'string' ) ) ); + $recordSchema = AvroSchema::parse( json_encode( array( + 'type' => 'record', + 'name' => 'ut', + 'fields' => array( + array( 'name' => 'id', 'type' => 'int', 'required' => true ), + ), + ) ) ); + $enumSchema = AvroSchema::parse( json_encode( array( + 'type' => 'record', + 'name' => 'ut', + 'fields' => array( + array( 'name' => 'count', 'type' => array( 'int', 'null' ) ), + ), + ) ) ); + + return array( + array( + 'No errors with a simple string serialization', + $stringSchema, 'foobar', array(), + ), + + array( + 'Cannot serialize integer into string', + $stringSchema, 5, 'Expected string, but recieved integer', + ), + + array( + 'Cannot serialize array into string', + $stringSchema, array(), 'Expected string, but recieved array', + ), + + array( + 'allows and ignores extra fields', + $recordSchema, array( 'id' => 4, 'foo' => 'bar' ), array(), + ), + + array( + 'detects missing fields', + $recordSchema, array(), array( 'id' => 'Missing expected field' ), + ), + + array( + 'handles first element in enum', + $enumSchema, array( 'count' => 4 ), array(), + ), + + array( + 'handles second element in enum', + $enumSchema, array( 'count' => null ), array(), + ), + + array( + 'rejects element not in union', + $enumSchema, array( 'count' => 'invalid' ), array( 'count' => array( + 'Expected any one of these to be true', + array( + 'Expected integer, but recieved string', + 'Expected null, but recieved string', + ) + ) ) + ), + ); + } + + /** + * @dataProvider getErrorsProvider + */ + public function testGetErrors( $message, $schema, $datum, $expected ) { + $this->assertEquals( + $expected, + AvroValidator::getErrors( $schema, $datum ), + $message + ); + } +} -- 2.20.1