From f0ed13eec5b58820a4598c4f9ce193273d4eda42 Mon Sep 17 00:00:00 2001 From: dcausse Date: Fri, 20 Nov 2015 18:09:56 +0100 Subject: [PATCH] Supports schema revision id in avro binary formatter Avro formatter now supports a revision defined in schema configuration: $wmgMonologAvroSchemas = array( 'CirrusSearchRequestSet' => array( 'schema' => file_get_contents( __DIR__ . '/schema.avsc' ), 'revision' => 11144802, ), ); The formatter still supports old style configuration: $wmgMonologAvroSchemas = array( 'CirrusSearchRequestSet' => file_get_contents( __DIR__ . '/schema.avsc' ), ); Change-Id: Icc0f92be23305e77a69b92fee4d9f9de2edda81e --- .../debug/logger/monolog/AvroFormatter.php | 57 +++++++++++++++++-- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/includes/debug/logger/monolog/AvroFormatter.php b/includes/debug/logger/monolog/AvroFormatter.php index 019d0288d7..4a39be8b16 100644 --- a/includes/debug/logger/monolog/AvroFormatter.php +++ b/includes/debug/logger/monolog/AvroFormatter.php @@ -37,6 +37,10 @@ use Monolog\Formatter\FormatterInterface; * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation. */ class AvroFormatter implements FormatterInterface { + /** + * @var Magic byte to encode schema revision id. + */ + const MAGIC = 0x0; /** * @var array Map from schema name to schema definition */ @@ -80,6 +84,7 @@ class AvroFormatter implements FormatterInterface { public function format( array $record ) { $this->io->truncate(); $schema = $this->getSchema( $record['channel'] ); + $revId = $this->getSchemaRevisionId( $record['channel'] ); if ( $schema === null ) { trigger_error( "The schema for channel '{$record['channel']}' is not available" ); return null; @@ -92,6 +97,10 @@ class AvroFormatter implements FormatterInterface { trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" ); return null; } + if ( $revId !== null ) { + return chr( self::MAGIC ) . $this->encode_long( $revId ) . $this->io->string(); + } + // @todo: remove backward compat code and do not send messages without rev id. return $this->io->string(); } @@ -123,17 +132,55 @@ class AvroFormatter implements FormatterInterface { if ( !isset( $this->schemas[$channel] ) ) { return null; } - if ( !$this->schemas[$channel] instanceof AvroSchema ) { - if ( is_string( $this->schemas[$channel] ) ) { - $this->schemas[$channel] = AvroSchema::parse( $this->schemas[$channel] ); + $schemaDetails = &$this->schemas[$channel]; + $schema = null; + if ( isset( $schemaDetails['revision'] ) && isset( $schemaDetails['schema'] ) ) { + $schema = &$schemaDetails['schema']; + } else { + // @todo: Remove backward compat code + $schema = &$schemaDetails; + } + + if ( !$schema instanceof AvroSchema ) { + if ( is_string( $schema ) ) { + $schema = AvroSchema::parse( $schema ); } else { - $this->schemas[$channel] = AvroSchema::real_parse( + $schema = AvroSchema::real_parse( $this->schemas[$channel], null, new AvroNamedSchemata() ); } } - return $this->schemas[$channel]; + return $schema; + } + + /** + * Get the writer for the named channel + * + * @var string $channel Name of the schema + * @return int|null + */ + public function getSchemaRevisionId( $channel ) { + // @todo: remove backward compat code + if ( isset( $this->schemas[$channel] ) + && is_array( $this->schemas[$channel] ) + && isset( $this->schemas[$channel]['revision'] ) ) { + return (int) $this->schemas[$channel]['revision']; + } + return null; + } + + + /** + * convert an integer to a 64bits big endian long (Java compatible) + * NOTE: certainly only compatible with PHP 64bits + * @param int $id + * @return string the binary representation of $id + */ + private function encode_long( $id ) { + $high = ( $id & 0xffffffff00000000 ) >> 32; + $low = $id & 0x00000000ffffffff; + return pack( 'NN', $high, $low ); } } -- 2.20.1