$this->io->truncate();
$schema = $this->getSchema( $record['channel'] );
$revId = $this->getSchemaRevisionId( $record['channel'] );
- if ( $schema === null ) {
+ if ( $schema === null || $revId === null ) {
trigger_error( "The schema for channel '{$record['channel']}' is not available" );
return null;
}
trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
return null;
}
- if ( $revId !== null ) {
- return chr( self::MAGIC ) . $this->encode_long( $revId ) . $this->io->string();
- }
- // @todo: remove backward compat code and do not send messages without rev id.
- return $this->io->string();
+ return chr( self::MAGIC ) . $this->encodeLong( $revId ) . $this->io->string();
}
/**
* @return string[]
*/
public function formatBatch( array $records ) {
- $result = array();
+ $result = [];
foreach ( $records as $record ) {
$message = $this->format( $record );
if ( $message !== null ) {
if ( !isset( $this->schemas[$channel] ) ) {
return null;
}
- $schemaDetails = &$this->schemas[$channel];
- $schema = null;
- if ( isset( $schemaDetails['revision'] ) && isset( $schemaDetails['schema'] ) ) {
- $schema = &$schemaDetails['schema'];
- } else {
- // @todo: Remove backward compat code
- $schema = &$schemaDetails;
+ if ( !isset( $this->schemas[$channel]['revision'], $this->schemas[$channel]['schema'] ) ) {
+ return null;
}
- if ( !$schema instanceof AvroSchema ) {
+ if ( !$this->schemas[$channel]['schema'] instanceof AvroSchema ) {
+ $schema = $this->schemas[$channel]['schema'];
if ( is_string( $schema ) ) {
- $schema = AvroSchema::parse( $schema );
+ $this->schemas[$channel]['schema'] = AvroSchema::parse( $schema );
} else {
- $schema = AvroSchema::real_parse(
- $this->schemas[$channel],
+ $this->schemas[$channel]['schema'] = AvroSchema::real_parse(
+ $schema,
null,
new AvroNamedSchemata()
);
}
}
- return $schema;
+ return $this->schemas[$channel]['schema'];
}
/**
* @return int|null
*/
public function getSchemaRevisionId( $channel ) {
- // @todo: remove backward compat code
- if ( isset( $this->schemas[$channel] )
- && is_array( $this->schemas[$channel] )
- && isset( $this->schemas[$channel]['revision'] ) ) {
- return (int) $this->schemas[$channel]['revision'];
+ if ( isset( $this->schemas[$channel]['revision'] ) ) {
+ return (int)$this->schemas[$channel]['revision'];
}
return null;
}
* @param int $id
* @return string the binary representation of $id
*/
- private function encode_long( $id ) {
+ private function encodeLong( $id ) {
$high = ( $id & 0xffffffff00000000 ) >> 32;
$low = $id & 0x00000000ffffffff;
return pack( 'NN', $high, $low );