* @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
*/
class AvroFormatter implements FormatterInterface {
+ /**
+ * @var Magic byte to encode schema revision id.
+ */
+ const MAGIC = 0x0;
/**
* @var array Map from schema name to schema definition
*/
public function format( array $record ) {
$this->io->truncate();
$schema = $this->getSchema( $record['channel'] );
+ $revId = $this->getSchemaRevisionId( $record['channel'] );
if ( $schema === null ) {
trigger_error( "The schema for channel '{$record['channel']}' is not available" );
return null;
trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
return null;
}
+ if ( $revId !== null ) {
+ return chr( self::MAGIC ) . $this->encode_long( $revId ) . $this->io->string();
+ }
+ // @todo: remove backward compat code and do not send messages without rev id.
return $this->io->string();
}
if ( !isset( $this->schemas[$channel] ) ) {
return null;
}
- if ( !$this->schemas[$channel] instanceof AvroSchema ) {
- if ( is_string( $this->schemas[$channel] ) ) {
- $this->schemas[$channel] = AvroSchema::parse( $this->schemas[$channel] );
+ $schemaDetails = &$this->schemas[$channel];
+ $schema = null;
+ if ( isset( $schemaDetails['revision'] ) && isset( $schemaDetails['schema'] ) ) {
+ $schema = &$schemaDetails['schema'];
+ } else {
+ // @todo: Remove backward compat code
+ $schema = &$schemaDetails;
+ }
+
+ if ( !$schema instanceof AvroSchema ) {
+ if ( is_string( $schema ) ) {
+ $schema = AvroSchema::parse( $schema );
} else {
- $this->schemas[$channel] = AvroSchema::real_parse(
+ $schema = AvroSchema::real_parse(
$this->schemas[$channel],
null,
new AvroNamedSchemata()
);
}
}
- return $this->schemas[$channel];
+ return $schema;
+ }
+
+ /**
+ * Get the writer for the named channel
+ *
+ * @var string $channel Name of the schema
+ * @return int|null
+ */
+ public function getSchemaRevisionId( $channel ) {
+ // @todo: remove backward compat code
+ if ( isset( $this->schemas[$channel] )
+ && is_array( $this->schemas[$channel] )
+ && isset( $this->schemas[$channel]['revision'] ) ) {
+ return (int) $this->schemas[$channel]['revision'];
+ }
+ return null;
+ }
+
+
+ /**
+ * convert an integer to a 64bits big endian long (Java compatible)
+ * NOTE: certainly only compatible with PHP 64bits
+ * @param int $id
+ * @return string the binary representation of $id
+ */
+ private function encode_long( $id ) {
+ $high = ( $id & 0xffffffff00000000 ) >> 32;
+ $low = $id & 0x00000000ffffffff;
+ return pack( 'NN', $high, $low );
}
}