Produce monolog messages through kafka+avro
authorErik Bernhardson <ebernhardson@wikimedia.org>
Tue, 4 Aug 2015 18:02:47 +0000 (11:02 -0700)
committerErik Bernhardson <ebernhardson@wikimedia.org>
Mon, 21 Sep 2015 19:45:23 +0000 (12:45 -0700)
This allows a logging channel to be configured to write
directly to kafka. Logs can be serialized either to json
blobs or the more compact apache avro format.

The Kafka handler for monolog needs a list of one of more
kafka servers to query cluster metadata from. This should be
able to use any monolog formatter, although some like
JsonFormatter require you to disable formatBatch as Kafka
protocol would prefer to encode each record independently in
the protocol.  This requires the nmred/kafka-php library,
version >= 1.3.0.

Adds a new formatter which serializes to the apache avro
format. This is a compact binary format which uses pre-
defined schemas. This initial implementation is very simple
and takes the plain schemas as a constructor argument.

Adds a new option to MonologSpi to wrap handlers in a
BufferHandler. This doesn't flush until the request shuts
down and prevents any network requests in the logger from
adding latency to web requests.

Related mediawiki/vendor update: Ibfe4bd2036ae8e998e2973f07bd9a6f057691578

The necessary config is something like:

array(
    'loggers' => array(
        'CirrusSearchRequests' => array(
            'handlers' => array( 'kafka' ),
        ),
    ),
    'handlers' => array(
        'kafka' => array(
            'factory' => '\\MediaWiki\\Logger\\Monolog\\KafkaHandler::factory',
            'args' => array( 'localhost:9092' ),
            'formatter' => 'avro',
            'buffer' => true,
        ),
    ),
    'formatters' => array(
        'avro' => array(
            'class' => '\\MediaWiki\\Logger\\Monolog\\AvroFormatter',
            'args' => array(
                array(
                    'CirrusSearchRequests' => array(
                        'type' => 'record',
                        'name' => 'CirrusSearchRequests'
                        'fields' => array( ... )
                    ),
                ),
            ),
        ),
    ),
)

Bug: T106256
Change-Id: I6ee744b3e5306af0bed70811b558a543eed22840

autoload.php
composer.json
includes/debug/logger/MonologSpi.php
includes/debug/logger/monolog/AvroFormatter.php [new file with mode: 0644]
includes/debug/logger/monolog/BufferHandler.php [new file with mode: 0644]
includes/debug/logger/monolog/KafkaHandler.php [new file with mode: 0644]
includes/utils/AvroValidator.php [new file with mode: 0644]
tests/phpunit/includes/ConsecutiveParametersMatcher.php [new file with mode: 0644]
tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php [new file with mode: 0644]
tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php [new file with mode: 0644]
tests/phpunit/includes/utils/AvroValidatorTest.php [new file with mode: 0644]

index 5adfbe5..03728e9 100644 (file)
@@ -149,6 +149,7 @@ $wgAutoloadLocalClasses = array(
        'AutoLoader' => __DIR__ . '/includes/AutoLoader.php',
        'AutoloadGenerator' => __DIR__ . '/includes/utils/AutoloadGenerator.php',
        'Autopromote' => __DIR__ . '/includes/Autopromote.php',
+       'AvroValidator' => __DIR__ . '/includes/utils/AvroValidator.php',
        'BacklinkCache' => __DIR__ . '/includes/cache/BacklinkCache.php',
        'BacklinkJobUtils' => __DIR__ . '/includes/jobqueue/utils/BacklinkJobUtils.php',
        'BackupDumper' => __DIR__ . '/maintenance/backup.inc',
@@ -753,6 +754,9 @@ $wgAutoloadLocalClasses = array(
        'MediaWiki\\Logger\\LegacySpi' => __DIR__ . '/includes/debug/logger/LegacySpi.php',
        'MediaWiki\\Logger\\LoggerFactory' => __DIR__ . '/includes/debug/logger/LoggerFactory.php',
        'MediaWiki\\Logger\\MonologSpi' => __DIR__ . '/includes/debug/logger/MonologSpi.php',
+       'MediaWiki\\Logger\\Monolog\\AvroFormatter' => __DIR__ . '/includes/debug/logger/monolog/AvroFormatter.php',
+       'MediaWiki\\Logger\\Monolog\\BufferHandler' => __DIR__ . '/includes/debug/logger/monolog/BufferHandler.php',
+       'MediaWiki\\Logger\\Monolog\\KafkaHandler' => __DIR__ . '/includes/debug/logger/monolog/KafkaHandler.php',
        'MediaWiki\\Logger\\Monolog\\LegacyFormatter' => __DIR__ . '/includes/debug/logger/monolog/LegacyFormatter.php',
        'MediaWiki\\Logger\\Monolog\\LegacyHandler' => __DIR__ . '/includes/debug/logger/monolog/LegacyHandler.php',
        'MediaWiki\\Logger\\Monolog\\LineFormatter' => __DIR__ . '/includes/debug/logger/monolog/LineFormatter.php',
index 1fe1e50..1ba5827 100644 (file)
                "ext-wikidiff2": "Diff accelerator",
                "ext-apc": "Local data and opcode cache",
                "monolog/monolog": "Flexible debug logging system",
+               "nmred/kafka-php": "Send debug log events to kafka",
                "pear/mail": "Mail sending support",
                "pear/mail_mime": "Mail sending support",
-               "pear/mail_mime-decode": "Mail sending support"
+               "pear/mail_mime-decode": "Mail sending support",
+               "wikimedia/avro": "Binary serialization format used with kafka"
        },
        "autoload": {
                "psr-0": {
index 7b54861..274e18e 100644 (file)
@@ -20,6 +20,7 @@
 
 namespace MediaWiki\Logger;
 
+use MediaWiki\Logger\Monolog\BufferHandler;
 use Monolog\Logger;
 use ObjectFactory;
 
@@ -84,6 +85,7 @@ use ObjectFactory;
  *                   'logstash'
  *               ),
  *               'formatter' => 'logstash',
+ *               'buffer' => true,
  *           ),
  *           'udp2log' => array(
  *               'class' => '\\MediaWiki\\Logger\\Monolog\\LegacyHandler',
@@ -247,6 +249,9 @@ class MonologSpi implements Spi {
                                        $this->getFormatter( $spec['formatter'] )
                                );
                        }
+                       if ( isset( $spec['buffer'] ) && $spec['buffer'] ) {
+                               $handler = new BufferHandler( $handler );
+                       }
                        $this->singletons['handlers'][$name] = $handler;
                }
                return $this->singletons['handlers'][$name];
diff --git a/includes/debug/logger/monolog/AvroFormatter.php b/includes/debug/logger/monolog/AvroFormatter.php
new file mode 100644 (file)
index 0000000..b6adab4
--- /dev/null
@@ -0,0 +1,139 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use AvroIODatumWriter;
+use AvroIOBinaryEncoder;
+use AvroIOTypeException;
+use AvroNamedSchemata;
+use AvroSchema;
+use AvroStringIO;
+use AvroValidator;
+use Monolog\Formatter\FormatterInterface;
+
+/**
+ * Log message formatter that uses the apache Avro format.
+ *
+ * @since 1.26
+ * @author Erik Bernhardson <ebernhardson@wikimedia.org>
+ * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
+ */
+class AvroFormatter implements FormatterInterface {
+       /**
+        * @var array Map from schema name to schema definition
+        */
+       protected $schemas;
+
+       /**
+        * @var AvroStringIO
+        */
+       protected $io;
+
+       /**
+        * @var AvroIOBinaryEncoder
+        */
+       protected $encoder;
+
+       /**
+        * @var AvroIODatumWriter
+        */
+       protected $writer;
+
+       /**
+        * @var array $schemas Map from Monolog channel to Avro schema.
+        *  Each schema can be either the JSON string or decoded into PHP
+        *  arrays.
+        */
+       public function __construct( array $schemas ) {
+               $this->schemas = $schemas;
+               $this->io = new AvroStringIO( '' );
+               $this->encoder = new AvroIOBinaryEncoder( $this->io );
+               $this->writer = new AvroIODatumWriter();
+       }
+
+       /**
+        * Formats the record context into a binary string per the
+        * schema configured for the records channel.
+        *
+        * @param array $record
+        * @return string|null The serialized record, or null if
+        *  the record is not valid for the selected schema.
+        */
+       public function format( array $record ) {
+               $this->io->truncate();
+               $schema = $this->getSchema( $record['channel'] );
+               if ( $schema === null ) {
+                       trigger_error( "The schema for channel '{$record['channel']}' is not available" );
+                       return null;
+               }
+               try {
+                       $this->writer->write_data( $schema, $record['context'], $this->encoder );
+               } catch ( AvroIOTypeException $e ) {
+                       $errors = AvroValidator::getErrors( $schema, $record['context'] );
+                       $json = json_encode( $errors );
+                       trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
+                       return null;
+               }
+               return $this->io->string();
+       }
+
+       /**
+        * Format a set of records into a list of binary strings
+        * conforming to the configured schema.
+        *
+        * @param array $records
+        * @return string[]
+        */
+       public function formatBatch( array $records ) {
+               $result = array();
+               foreach ( $records as $record ) {
+                       $message = $this->format( $record );
+                       if ( $message !== null ) {
+                               $result[] = $message;
+                       }
+               }
+               return $result;
+       }
+
+       /**
+        * Get the writer for the named channel
+        *
+        * @var string $channel Name of the schema to fetch
+        * @return AvroSchema|null
+        */
+       protected function getSchema( $channel ) {
+               if ( !isset( $this->schemas[$channel] ) ) {
+                       return null;
+               }
+               if ( !$this->schemas[$channel] instanceof AvroSchema ) {
+                       if ( is_string( $this->schemas[$channel] ) ) {
+                               $this->schemas[$channel] = AvroSchema::parse( $this->schemas[$channel] );
+                       } else {
+                               $this->schemas[$channel] = AvroSchema::real_parse(
+                                       $this->schemas[$channel],
+                                       null,
+                                       new AvroNamedSchemata()
+                               );
+                       }
+               }
+               return $this->schemas[$channel];
+       }
+}
diff --git a/includes/debug/logger/monolog/BufferHandler.php b/includes/debug/logger/monolog/BufferHandler.php
new file mode 100644 (file)
index 0000000..3ebd0b1
--- /dev/null
@@ -0,0 +1,47 @@
+<?php
+/**
+ * Helper class for the index.php entry point.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use DeferredUpdates;
+use Monolog\Handler\BufferHandler as BaseBufferHandler;
+
+/**
+ * Updates the Monolog BufferHandler to use DeferredUpdates rather
+ * than register_shutdown_function. On supported platforms this will
+ * use register_postsend_function or fastcgi_finish_request() to delay
+ * until after the request has shutdown and we are no longer delaying
+ * the web request.
+ */
+class BufferHandler extends BaseBufferHandler {
+       /**
+        * {@inheritDoc}
+        */
+       public function handle( array $record ) {
+               if (!$this->initialized) {
+                       DeferredUpdates::addCallableUpdate( array( $this, 'close' ) );
+                       $this->initialized = true;
+               }
+               return parent::handle( $record );
+       }
+}
+
diff --git a/includes/debug/logger/monolog/KafkaHandler.php b/includes/debug/logger/monolog/KafkaHandler.php
new file mode 100644 (file)
index 0000000..59d7764
--- /dev/null
@@ -0,0 +1,224 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use Kafka\MetaDataFromKafka;
+use Kafka\Produce;
+use MediaWiki\Logger\LoggerFactory;
+use Monolog\Handler\AbstractProcessingHandler;
+use Monolog\Logger;
+use Psr\Log\LoggerInterface;
+
+/**
+ * Log handler sends log events to a kafka server.
+ *
+ * Constructor options array arguments:
+ * * alias: map from monolog channel to kafka topic name. When no
+ *       alias exists the topic "monolog_$channel" will be used.
+ * * swallowExceptions: Swallow exceptions that occur while talking to
+ *    kafka. Defaults to false.
+ * * logExceptions: Log exceptions talking to kafka here. Either null,
+ *    the name of a channel to log to, or an object implementing
+ *    FormatterInterface. Defaults to null.
+ *
+ * Requires the nmred/kafka-php library, version >= 1.3.0
+ *
+ * @since 1.26
+ * @author Erik Bernhardson <ebernhardson@wikimedia.org>
+ * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
+ */
+class KafkaHandler extends AbstractProcessingHandler {
+       /**
+        * @var Produce Sends requests to kafka
+        */
+       protected $produce;
+
+       /**
+        * @var array Optional handler configuration
+        */
+       protected $options;
+
+       /**
+        * @var array Map from topic name to partition this request produces to
+        */
+       protected $partitions = array();
+
+       /**
+        * @var array defaults for constructor options
+        */
+       private static $defaultOptions = array(
+               'alias' => array(), // map from monolog channel to kafka topic
+               'swallowExceptions' => false, // swallow exceptions sending records
+               'logExceptions' => null, // A PSR3 logger to inform about errors
+       );
+
+       /**
+        * @param Produce $produce Kafka instance to produce through
+        * @param array $options optional handler configuration
+        * @param int $level The minimum logging level at which this handler will be triggered
+        * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
+        */
+       public function __construct( Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true ) {
+               parent::__construct( $level, $bubble );
+               $this->produce = $produce;
+               $this->options = array_merge( self::$defaultOptions, $options );
+       }
+
+       /**
+        * Constructs the necessary support objects and returns a KafkaHandler
+        * instance.
+        *
+        * @param string[] $kafkaServers
+        * @param array $options
+        * @param int $level The minimum logging level at which this handle will be triggered
+        * @param bool $bubble Whether the messages that are handled can bubble the stack or not
+        * @return KafkaHandler
+        */
+       public static function factory( $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true ) {
+               $metadata = new MetaDataFromKafka( $kafkaServers );
+               $produce = new Produce( $metadata );
+               if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
+                       $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
+               }
+               return new self( $produce, $options, $level, $bubble );
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       protected function write( array $record ) {
+               if ( $record['formatted'] !== null ) {
+                       $this->addMessages( $record['channel'], array( $record['formatted'] ) );
+                       $this->send();
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public function handleBatch( array $batch ) {
+               $channels = array();
+               foreach ( $batch as $record ) {
+                       if ( $record['level'] < $this->level ) {
+                               continue;
+                       }
+                       $channels[$record['channel']][] = $this->processRecord( $record );
+               }
+
+               $formatter = $this->getFormatter();
+               foreach ( $channels as $channel => $records ) {
+                       $messages = array();
+                       foreach ( $records as $idx => $record ) {
+                               $message = $formatter->format( $record );
+                               if ( $message !== null ) {
+                                       $messages[] = $message;
+                               }
+                       }
+                       if ( $messages ) {
+                               $this->addMessages($channel, $messages);
+                       }
+               }
+
+               $this->send();
+       }
+
+       /**
+        * Send any records in the kafka client internal queue.
+        */
+       protected function send() {
+               try {
+                       $this->produce->send();
+               } catch ( \Kafka\Exception $e ) {
+                       $ignore = $this->warning(
+                               'Error sending records to kafka: {exception}',
+                               array( 'exception' => $e ) );
+                       if ( !$ignore ) {
+                               throw $e;
+                       }
+               }
+       }
+
+       /**
+        * @param string $topic Name of topic to get partition for
+        * @return int|null The random partition to produce to for this request,
+        *  or null if a partition could not be determined.
+        */
+       protected function getRandomPartition( $topic ) {
+               if ( !array_key_exists( $topic, $this->partitions ) ) {
+                       try {
+                               $partitions = $this->produce->getAvailablePartitions( $topic );
+                       } catch ( \Kafka\Exception $e ) {
+                               $ignore = $this->warning(
+                                       'Error getting metadata for kafka topic {topic}: {exception}',
+                                       array( 'topic' => $topic, 'exception' => $e ) );
+                               if ( $ignore ) {
+                                       return null;
+                               }
+                               throw $e;
+                       }
+                       if ( $partitions ) {
+                               $key = array_rand( $partitions );
+                               $this->partitions[$topic] = $partitions[$key];
+                       } else {
+                               $details = $this->produce->getClient()->getTopicDetail( $topic );
+                               $ignore = $this->warning(
+                                       'No partitions available for kafka topic {topic}',
+                                       array( 'topic' => $topic, 'kafka' => $details )
+                               );
+                               if ( !$ignore ) {
+                                       throw new \RuntimeException( "No partitions available for kafka topic $topic" );
+                               }
+                               $this->partitions[$topic] = null;
+                       }
+               }
+               return $this->partitions[$topic];
+       }
+
+       /**
+        * Adds records for a channel to the Kafka client internal queue.
+        *
+        * @param string $channel Name of Monolog channel records belong to
+        * @param array $records List of records to append
+        */
+       protected function addMessages( $channel, array $records ) {
+               if ( isset( $this->options['alias'][$channel] ) ) {
+                       $topic = $this->options['alias'][$channel];
+               } else {
+                       $topic = "monolog_$channel";
+               }
+               $partition = $this->getRandomPartition( $topic );
+               if ( $partition !== null ) {
+                       $this->produce->setMessages( $topic, $partition, $records );
+               }
+       }
+
+       /**
+        * @param string $message PSR3 compatible message string
+        * @param array $context PSR3 compatible log context
+        * @return bool true if caller should ignore warning
+        */
+       protected function warning( $message, array $context = array() ) {
+               if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
+                       $this->options['logExceptions']->warning( $message, $context );
+               }
+               return $this->options['swallowExceptions'];
+       }
+}
diff --git a/includes/utils/AvroValidator.php b/includes/utils/AvroValidator.php
new file mode 100644 (file)
index 0000000..4f8e0b1
--- /dev/null
@@ -0,0 +1,184 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+/**
+ * Generate error strings for data that doesn't match the specified
+ * Avro schema. This is very similar to AvroSchema::is_valid_datum(),
+ * but returns error messages instead of a boolean.
+ *
+ * @since 1.26
+ * @author Erik Bernhardson <ebernhardson@wikimedia.org>
+ * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
+ */
+class AvroValidator {
+       /**
+        * @param AvroSchema $schema The rules to conform to.
+        * @param mixed $datum The value to validate against $schema.
+        * @return string|string[] An error or list of errors in the
+        *  provided $datum. When no errors exist the empty array is
+        *  returned.
+        */
+       public static function getErrors( AvroSchema $schema, $datum ) {
+               switch ( $schema->type) {
+               case AvroSchema::NULL_TYPE:
+                       if ( !is_null($datum) ) {
+                               return self::wrongType( 'null', $datum );
+                       }
+                       return array();
+               case AvroSchema::BOOLEAN_TYPE:
+                       if ( !is_bool($datum) ) {
+                               return self::wrongType( 'boolean', $datum );
+                       }
+                       return array();
+               case AvroSchema::STRING_TYPE:
+               case AvroSchema::BYTES_TYPE:
+                       if ( !is_string($datum) ) {
+                               return self::wrongType( 'string', $datum );
+                       }
+                       return array();
+               case AvroSchema::INT_TYPE:
+                       if ( !is_int($datum) ) {
+                               return self::wrongType( 'integer', $datum );
+                       }
+                       if ( AvroSchema::INT_MIN_VALUE > $datum
+                               || $datum > AvroSchema::INT_MAX_VALUE
+                       ) {
+                               return self::outOfRange(
+                                       AvroSchema::INT_MIN_VALUE,
+                                       AvroSchema::INT_MAX_VALUE,
+                                       $datum
+                               );
+                       }
+                       return array();
+               case AvroSchema::LONG_TYPE:
+                       if ( !is_int($datum) ) {
+                               return self::wrongType( 'integer', $datum );
+                       }
+                       if ( AvroSchema::LONG_MIN_VALUE > $datum
+                               || $datum > AvroSchema::LONG_MAX_VALUE
+                       ) {
+                               return self::outOfRange(
+                                       AvroSchema::LONG_MIN_VALUE,
+                                       AvroSchema::LONG_MAX_VALUE,
+                                       $datum
+                               );
+                       }
+                       return array();
+               case AvroSchema::FLOAT_TYPE:
+               case AvroSchema::DOUBLE_TYPE:
+                       if ( !is_float($datum) && !is_int($datum) ) {
+                               return self::wrongType( 'float or integer', $datum );
+                       }
+                       return array();
+               case AvroSchema::ARRAY_SCHEMA:
+                       if (!is_array($datum)) {
+                               return self::wrongType( 'array', $datum );
+                       }
+                       $errors = array();
+                       foreach ($datum as $d) {
+                               $result = $this->validate( $schema->items(), $d );
+                               if ( $result ) {
+                                       $errors[] = $result;
+                               }
+                       }
+                       if ( $errors ) {
+                               return $errors;
+                       }
+                       return array();
+               case AvroSchema::MAP_SCHEMA:
+                       if (!is_array($datum)) {
+                               return self::wrongType( 'array', $datum );
+                       }
+                       $errors = array();
+                       foreach ($datum as $k => $v) {
+                       if ( !is_string($k) ) {
+                                       $errors[] = self::wrongType( 'string key', $k );
+                               }
+                               $result = self::getErrors( $schema->values(), $v );
+                               if ( $result ) {
+                                       $errors[$k] = $result;
+                               }
+                       }
+                       return $errors;
+               case AvroSchema::UNION_SCHEMA:
+                       $errors = array();
+                       foreach ($schema->schemas() as $schema) {
+                               $result = self::getErrors( $schema, $datum );
+                               if ( !$result ) {
+                                       return array();
+                               }
+                               $errors[] = $result;
+                       }
+                       if ( $errors ) {
+                               return array( "Expected any one of these to be true", $errors );
+                       }
+                       return "No schemas provided to union";
+               case AvroSchema::ENUM_SCHEMA:
+                       if ( !in_array( $datum, $schema->symbols() ) ) {
+                               $symbols = implode( ', ', $schema->symbols );
+                               return "Expected one of $symbols but recieved $datum";
+                       }
+                       return array();
+               case AvroSchema::FIXED_SCHEMA:
+                       if ( !is_string( $datum ) ) {
+                               return self::wrongType( 'string', $datum );
+                       }
+                       $len = strlen( $datum );
+                       if ( $len !== $schema->size() ) {
+                               return "Expected string of length {$schema->size()}, "
+                                       . "but recieved one of length $len";
+                       }
+                       return array();
+               case AvroSchema::RECORD_SCHEMA:
+               case AvroSchema::ERROR_SCHEMA:
+               case AvroSchema::REQUEST_SCHEMA:
+                       if ( !is_array( $datum ) ) {
+                               return self::wrongType( 'array', $datum );
+                       }
+                       $errors = array();
+                       foreach ( $schema->fields() as $field ) {
+                               $name = $field->name();
+                               if ( !array_key_exists( $name, $datum ) ) {
+                                       $errors[$name] = 'Missing expected field';
+                                       continue;
+                               }
+                               $result = self::getErrors( $field->type(), $datum[$name] );
+                               if ( $result ) {
+                                       $errors[$name] = $result;
+                               }
+                       }
+                       return $errors;
+               default:
+                       return "Unknown avro schema type: {$schema->type}";
+               }
+       }
+
+       public static function typeOf( $datum ) {
+               return is_object( $datum ) ? get_class( $datum ) : gettype( $datum );
+       }
+
+       public static function wrongType( $expected, $datum ) {
+               return "Expected $expected, but recieved " . self::typeOf( $datum );
+       }
+
+       public static function outOfRange( $min, $max, $datum ) {
+               return "Expected value between $min and $max, but recieved $datum";
+       }
+}
diff --git a/tests/phpunit/includes/ConsecutiveParametersMatcher.php b/tests/phpunit/includes/ConsecutiveParametersMatcher.php
new file mode 100644 (file)
index 0000000..adf74bb
--- /dev/null
@@ -0,0 +1,123 @@
+<?php
+/*
+ * This file is part of the PHPUnit_MockObject package.
+ *
+ * (c) Sebastian Bergmann <sebastian@phpunit.de>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+/**
+ * Invocation matcher which looks for sets of specific parameters in the invocations.
+ *
+ * Checks the parameters of the incoming invocations, the parameter list is
+ * checked against the defined constraints in $parameters. If the constraint
+ * is met it will return true in matches().
+ *
+ * It takes a list of match groups and and increases a call index after each invocation.
+ * So the first invocation uses the first group of constraints, the second the next and so on.
+ */
+class PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters extends PHPUnit_Framework_MockObject_Matcher_StatelessInvocation
+{
+    /**
+     * @var array
+     */
+    private $_parameterGroups = array();
+
+    /**
+     * @var array
+     */
+    private $_invocations = array();
+
+    /**
+     * @param array $parameterGroups
+     */
+    public function __construct(array $parameterGroups)
+    {
+        foreach ($parameterGroups as $index => $parameters) {
+            foreach ($parameters as $parameter) {
+                if (!($parameter instanceof \PHPUnit_Framework_Constraint)) {
+                    $parameter = new \PHPUnit_Framework_Constraint_IsEqual($parameter);
+                }
+                $this->_parameterGroups[$index][] = $parameter;
+            }
+        }
+    }
+
+    /**
+     * @return string
+     */
+    public function toString()
+    {
+        $text = 'with consecutive parameters';
+
+        return $text;
+    }
+
+    /**
+     * @param  PHPUnit_Framework_MockObject_Invocation $invocation
+     * @return bool
+     */
+    public function matches(PHPUnit_Framework_MockObject_Invocation $invocation)
+    {
+        $this->_invocations[] = $invocation;
+        $callIndex            = count($this->_invocations) - 1;
+        $this->verifyInvocation($invocation, $callIndex);
+
+        return false;
+    }
+
+    public function verify()
+    {
+        foreach ($this->_invocations as $callIndex => $invocation) {
+            $this->verifyInvocation($invocation, $callIndex);
+        }
+    }
+
+    /**
+     * Verify a single invocation
+     *
+     * @param  PHPUnit_Framework_MockObject_Invocation      $invocation
+     * @param  int                                          $callIndex
+     * @throws PHPUnit_Framework_ExpectationFailedException
+     */
+    private function verifyInvocation(PHPUnit_Framework_MockObject_Invocation $invocation, $callIndex)
+    {
+
+        if (isset($this->_parameterGroups[$callIndex])) {
+            $parameters = $this->_parameterGroups[$callIndex];
+        } else {
+          // no parameter assertion for this call index
+            return;
+        }
+
+        if ($invocation === null) {
+            throw new PHPUnit_Framework_ExpectationFailedException(
+                'Mocked method does not exist.'
+            );
+        }
+
+        if (count($invocation->parameters) < count($parameters)) {
+            throw new PHPUnit_Framework_ExpectationFailedException(
+                sprintf(
+                    'Parameter count for invocation %s is too low.',
+                    $invocation->toString()
+                )
+            );
+        }
+
+        foreach ($parameters as $i => $parameter) {
+            $parameter->evaluate(
+                $invocation->parameters[$i],
+                sprintf(
+                    'Parameter %s for invocation #%d %s does not match expected ' .
+                    'value.',
+                    $i,
+                    $callIndex,
+                    $invocation->toString()
+                )
+            );
+        }
+    }
+}
diff --git a/tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php b/tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php
new file mode 100644 (file)
index 0000000..4c6d25e
--- /dev/null
@@ -0,0 +1,64 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use MediaWikiTestCase;
+use PHPUnit_Framework_Error_Notice;
+
+class AvroFormatterTest extends MediaWikiTestCase {
+
+       public function setUo() {
+               if ( !class_exists( 'AvroStringIO' ) ) {
+                       $this->markTestSkipped( 'Avro is required for the AvroFormatterTest' );
+               }
+               parent::setUp();
+       }
+
+       public function testSchemaNotAvailable() {
+               $formatter = new AvroFormatter( array() );      
+               $this->setExpectedException( 'PHPUnit_Framework_Error_Notice', "The schema for channel 'marty' is not available" );
+               $formatter->format( array( 'channel' => 'marty' ) );
+       }
+
+       public function testSchemaNotAvailableReturnValue() {
+               $formatter = new AvroFormatter( array() );      
+               $noticeEnabled = PHPUnit_Framework_Error_Notice::$enabled;
+               // disable conversion of notices 
+               PHPUnit_Framework_Error_Notice::$enabled = false;
+               // have to keep the user notice from being output
+               wfSuppressWarnings();
+               $res = $formatter->format( array( 'channel' => 'marty' ) );
+               wfRestoreWarnings();
+               PHPUnit_Framework_Error_Notice::$enabled = $noticeEnabled;
+               $this->assertNull( $res );
+       }
+
+       public function testDoesSomethingWhenSchemaAvailable() {
+               $formatter = new AvroFormatter( array( 'string' => array( 'type' => 'string' ) ) );
+               $res = $formatter->format( array(
+                       'channel' => 'string',
+                       'context' => 'better to be',
+               ) );
+               $this->assertNotNull( $res );
+               // basically just tell us if avro changes its string encoding
+               $this->assertEquals( base64_decode( 'GGJldHRlciB0byBiZQ==' ), $res );
+       }
+}
diff --git a/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php b/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php
new file mode 100644 (file)
index 0000000..272f6e4
--- /dev/null
@@ -0,0 +1,204 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use MediaWikiTestCase;
+use Monolog\Logger;
+
+// not available in the version of phpunit mw uses, so copied into repo
+require_once __DIR__ . '/../../../ConsecutiveParametersMatcher.php';
+
+class KafkaHandlerTest extends MediaWikiTestCase {
+
+       public function setUo() {
+               if ( !class_exists( 'Monolog\Handler\AbstractProcessingHandler' ) ) {
+                       $this->markTestSkipped( 'Monolog is required for the KafkaHandlerTest' );
+               }
+               parent::setUp();
+       }
+
+       public function topicNamingProvider() {
+               return array(
+                       array( array(), 'monolog_foo' ),
+                       array( array( 'alias' => array( 'foo' => 'bar' ) ), 'bar' )
+               );
+       }
+
+       /**
+        * @dataProvider topicNamingProvider
+        */
+       public function testTopicNaming( $options, $expect ) {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects($this->any())
+                       ->method('getAvailablePartitions')
+                       ->will($this->returnValue( array( 'A' ) ) );
+               $produce->expects($this->once())
+                       ->method( 'setMessages' )
+                       ->with( $expect, $this->anything(), $this->anything() );
+
+               $handler = new KafkaHandler( $produce, $options );
+               $handler->handle( array(
+                       'channel' => 'foo',
+                       'level' => Logger::EMERGENCY,
+                       'extra' => array(),
+               ) );
+       }
+
+       public function swallowsExceptionsWhenRequested() {
+               return array(
+                       // defaults to false
+                       array( array(), true ),
+                       // also try false explicitly
+                       array( array( 'swallowExceptions' => false ), true ),
+                       // turn it on
+                       array( array( 'swallowExceptions' => true ), false ),
+               );
+       }
+
+       /**
+        * @dataProvider swallowsExceptionsWhenRequested
+        */
+       public function testGetAvailablePartitionsException( $options, $expectException ) {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects( $this->any() )
+                       ->method( 'getAvailablePartitions' )
+                       ->will( $this->throwException( new \Kafka\Exception ) );
+
+               if ( $expectException ) {
+                       $this->setExpectedException( 'Kafka\Exception' );
+               }
+
+               $handler = new KafkaHandler( $produce, $options );
+               $handler->handle( array(
+                       'channel' => 'foo',
+                       'level' => Logger::EMERGENCY,
+                       'extra' => array(),
+               ) );
+
+               if ( !$expectException ) {
+                       $this->assertTrue( true, 'no exception was thrown' );
+               }
+       }
+
+       /**
+        * @dataProvider swallowsExceptionsWhenRequested
+        */
+       public function testSendException( $options, $expectException ) {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects( $this->any() )
+                       ->method( 'getAvailablePartitions' )
+                       ->will( $this->returnValue( array( 'A' ) ) );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->throwException( new \Kafka\Exception ) );
+
+               if ( $expectException ) {
+                       $this->setExpectedException( 'Kafka\Exception' );
+               }
+
+               $handler = new KafkaHandler( $produce, $options );
+               $handler->handle( array(
+                       'channel' => 'foo',
+                       'level' => Logger::EMERGENCY,
+                       'extra' => array(),
+               ) );
+
+               if ( !$expectException ) {
+                       $this->assertTrue( true, 'no exception was thrown' );
+               }
+       }
+
+       public function testHandlesNullFormatterResult() {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects( $this->any() )
+                       ->method( 'getAvailablePartitions' )
+                       ->will( $this->returnValue( array( 'A' ) ) );
+               $mockMethod = $produce->expects( $this->exactly( 2 ) )
+                       ->method( 'setMessages' );
+               // evil hax
+               \TestingAccessWrapper::newFromObject( $mockMethod )->matcher->parametersMatcher =
+                       new \PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters( array(
+                               array( $this->anything(), $this->anything(), array( 'words' ) ),
+                               array( $this->anything(), $this->anything(), array( 'lines' ) )
+                       ) );
+
+               $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' );
+               $formatter->expects( $this->any() )
+                       ->method( 'format' )
+                       ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) );
+
+               $handler = new KafkaHandler( $produce, array() );
+               $handler->setFormatter( $formatter );
+               for ( $i = 0; $i < 3; ++$i ) {
+                       $handler->handle( array(
+                               'channel' => 'foo',
+                               'level' => Logger::EMERGENCY,
+                               'extra' => array(),
+                       ) );
+               }
+       }
+
+
+       public function testBatchHandlesNullFormatterResult() {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects( $this->any() )
+                       ->method( 'getAvailablePartitions' )
+                       ->will( $this->returnValue( array( 'A' ) ) );
+               $produce->expects( $this->once() )
+                       ->method( 'setMessages' )
+                       ->with( $this->anything(), $this->anything(), array( 'words', 'lines' ) );
+
+               $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' );
+               $formatter->expects( $this->any() )
+                       ->method( 'format' )
+                       ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) );
+
+               $handler = new KafkaHandler( $produce, array() );
+               $handler->setFormatter( $formatter );
+               $handler->handleBatch( array(
+                       array(
+                               'channel' => 'foo',
+                               'level' => Logger::EMERGENCY,
+                               'extra' => array(),
+                       ),
+                       array(
+                               'channel' => 'foo',
+                               'level' => Logger::EMERGENCY,
+                               'extra' => array(),
+                       ),
+                       array(
+                               'channel' => 'foo',
+                               'level' => Logger::EMERGENCY,
+                               'extra' => array(),
+                       ),
+               ) );
+       }
+}
diff --git a/tests/phpunit/includes/utils/AvroValidatorTest.php b/tests/phpunit/includes/utils/AvroValidatorTest.php
new file mode 100644 (file)
index 0000000..52c242c
--- /dev/null
@@ -0,0 +1,96 @@
+<?php
+/**
+ * Tests for IP validity functions.
+ *
+ * Ported from /t/inc/IP.t by avar.
+ *
+ * @group IP
+ * @todo Test methods in this call should be split into a method and a
+ * dataprovider.
+ */
+
+class AvroValidatorTest extends PHPUnit_Framework_TestCase {
+       public function setUp() {
+               if ( !class_exists( 'AvroSchema' ) ) {
+                       $this->markTestSkipped( 'Avro is required to run the AvroValidatorTest' );
+               }
+               parent::setUp();
+       }
+
+       public function getErrorsProvider() {
+               $stringSchema = AvroSchema::parse( json_encode( array( 'type' => 'string' ) ) );
+               $recordSchema = AvroSchema::parse( json_encode( array(
+                       'type' => 'record',
+                       'name' => 'ut',
+                       'fields' => array(
+                               array( 'name' => 'id', 'type' => 'int', 'required' => true ),
+                       ),
+               ) ) );
+               $enumSchema = AvroSchema::parse( json_encode( array(
+                       'type' => 'record',
+                       'name' => 'ut',
+                       'fields' => array(
+                               array( 'name' => 'count', 'type' => array( 'int', 'null' ) ),
+                       ),
+               ) ) );
+
+               return array(
+                       array(
+                               'No errors with a simple string serialization',
+                               $stringSchema, 'foobar', array(),
+                       ),
+
+                       array(
+                               'Cannot serialize integer into string',
+                               $stringSchema, 5, 'Expected string, but recieved integer',
+                       ),
+
+                       array(
+                               'Cannot serialize array into string',
+                               $stringSchema, array(), 'Expected string, but recieved array',
+                       ),
+
+                       array(
+                               'allows and ignores extra fields',
+                               $recordSchema, array( 'id' => 4, 'foo' => 'bar' ), array(),
+                       ),
+
+                       array(
+                               'detects missing fields',
+                               $recordSchema, array(), array( 'id' => 'Missing expected field' ),
+                       ),
+
+                       array(
+                               'handles first element in enum',
+                               $enumSchema, array( 'count' => 4 ), array(),
+                       ),
+
+                       array(
+                               'handles second element in enum',
+                               $enumSchema, array( 'count' => null ), array(),
+                       ),
+
+                       array(
+                               'rejects element not in union',
+                               $enumSchema, array( 'count' => 'invalid' ), array( 'count' => array(
+                                       'Expected any one of these to be true',
+                                       array(
+                                               'Expected integer, but recieved string',
+                                               'Expected null, but recieved string',
+                                       )
+                               ) )
+                       ),
+               );
+       }
+
+       /**
+        * @dataProvider getErrorsProvider
+        */
+       public function testGetErrors( $message, $schema, $datum, $expected ) {
+               $this->assertEquals(
+                       $expected,
+                       AvroValidator::getErrors( $schema, $datum ),
+                       $message
+               );
+       }
+}