$this->io->truncate();
$schema = $this->getSchema( $record['channel'] );
$revId = $this->getSchemaRevisionId( $record['channel'] );
- if ( $schema === null ) {
+ if ( $schema === null || $revId === null ) {
trigger_error( "The schema for channel '{$record['channel']}' is not available" );
return null;
}
trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
return null;
}
- if ( $revId !== null ) {
- return chr( self::MAGIC ) . $this->encode_long( $revId ) . $this->io->string();
- }
- // @todo: remove backward compat code and do not send messages without rev id.
- return $this->io->string();
+ return chr( self::MAGIC ) . $this->encodeLong( $revId ) . $this->io->string();
}
/**
if ( !isset( $this->schemas[$channel] ) ) {
return null;
}
- $schemaDetails = &$this->schemas[$channel];
- $schema = null;
- if ( isset( $schemaDetails['revision'] ) && isset( $schemaDetails['schema'] ) ) {
- $schema = &$schemaDetails['schema'];
- } else {
- // @todo: Remove backward compat code
- $schema = &$schemaDetails;
+ if ( !isset( $this->schemas[$channel]['revision'], $this->schemas[$channel]['schema'] ) ) {
+ return null;
}
- if ( !$schema instanceof AvroSchema ) {
+ if ( !$this->schemas[$channel]['schema'] instanceof AvroSchema ) {
+ $schema = $this->schemas[$channel]['schema'];
if ( is_string( $schema ) ) {
- $schema = AvroSchema::parse( $schema );
+ $this->schemas[$channel]['schema'] = AvroSchema::parse( $schema );
} else {
- $schema = AvroSchema::real_parse(
- $this->schemas[$channel],
+ $this->schemas[$channel]['schema'] = AvroSchema::real_parse(
+ $schema,
null,
new AvroNamedSchemata()
);
}
}
- return $schema;
+ return $this->schemas[$channel]['schema'];
}
/**
* @return int|null
*/
public function getSchemaRevisionId( $channel ) {
- // @todo: remove backward compat code
- if ( isset( $this->schemas[$channel] )
- && is_array( $this->schemas[$channel] )
- && isset( $this->schemas[$channel]['revision'] ) ) {
+ if ( isset( $this->schemas[$channel]['revision'] ) ) {
return (int) $this->schemas[$channel]['revision'];
}
return null;
}
-
/**
* convert an integer to a 64bits big endian long (Java compatible)
* NOTE: certainly only compatible with PHP 64bits
* @param int $id
* @return string the binary representation of $id
*/
- private function encode_long( $id ) {
+ private function encodeLong( $id ) {
$high = ( $id & 0xffffffff00000000 ) >> 32;
$low = $id & 0x00000000ffffffff;
return pack( 'NN', $high, $low );
}
public function testDoesSomethingWhenSchemaAvailable() {
- $formatter = new AvroFormatter( array( 'string' => array( 'type' => 'string' ) ) );
+ $formatter = new AvroFormatter( array(
+ 'string' => array(
+ 'schema' => array( 'type' => 'string' ),
+ 'revision' => 1010101,
+ )
+ ) );
$res = $formatter->format( array(
'channel' => 'string',
'context' => 'better to be',
) );
$this->assertNotNull( $res );
- // basically just tell us if avro changes its string encoding
- $this->assertEquals( base64_decode( 'GGJldHRlciB0byBiZQ==' ), $res );
+ // basically just tell us if avro changes its string encoding, or if
+ // we completely fail to generate a log message.
+ $this->assertEquals( 'AAAAAAAAD2m1GGJldHRlciB0byBiZQ==', base64_encode( $res ) );
}
}