5 * Event relayer for Apache Kafka.
6 * Configuring for WANCache:
7 * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
9 class EventRelayerKafka
extends EventRelayer
{
26 * Create Kafka producer.
28 * @param Config $config
30 public function __construct( array $params ) {
31 $this->config
= new HashConfig( $params );
32 if ( !$this->config
->has( 'KafkaEventHost' ) ) {
33 throw new InvalidArgumentException( "KafkaEventHost must be configured" );
38 * Get the producer object from kafka-php.
41 protected function getKafkaProducer() {
42 if ( !$this->producer
) {
43 $this->producer
= Produce
::getInstance( null, null, $this->config
->get( 'KafkaEventHost' ) );
45 return $this->producer
;
51 * @see EventRelayer::doNotify()
54 protected function doNotify( $channel, array $events ) {
55 $jsonEvents = array_map( 'json_encode', $events );
57 $producer = $this->getKafkaProducer();
58 $producer->setMessages( $channel, 0, $jsonEvents );
60 } catch ( \Kafka\Exception
$e ) {
61 $this->logger
->warning( "Sending events failed: $e" );