From 49eb12efa4a5ccbde90235b3c6ef47f7e9de15a6 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 15 Apr 2016 14:01:23 -0700 Subject: [PATCH] Create Kafka event relayer Bug: T125138 Change-Id: I9d7705cb164bc975c3a0ddf4a33ac54fe7de931c --- autoload.php | 1 + includes/libs/eventrelayer/EventRelayer.php | 17 ++++- .../libs/eventrelayer/EventRelayerKafka.php | 66 +++++++++++++++++++ includes/libs/objectcache/WANObjectCache.php | 4 ++ 4 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 includes/libs/eventrelayer/EventRelayerKafka.php diff --git a/autoload.php b/autoload.php index 22411cd7e9..e941fc1e5e 100644 --- a/autoload.php +++ b/autoload.php @@ -397,6 +397,7 @@ $wgAutoloadLocalClasses = [ 'ErrorPageError' => __DIR__ . '/includes/exception/ErrorPageError.php', 'EventRelayer' => __DIR__ . '/includes/libs/eventrelayer/EventRelayer.php', 'EventRelayerGroup' => __DIR__ . '/includes/EventRelayerGroup.php', + 'EventRelayerKafka' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerKafka.php', 'EventRelayerNull' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerNull.php', 'Exif' => __DIR__ . '/includes/media/Exif.php', 'ExifBitmapHandler' => __DIR__ . '/includes/media/ExifBitmap.php', diff --git a/includes/libs/eventrelayer/EventRelayer.php b/includes/libs/eventrelayer/EventRelayer.php index b61cae77b7..f28a4c0c26 100644 --- a/includes/libs/eventrelayer/EventRelayer.php +++ b/includes/libs/eventrelayer/EventRelayer.php @@ -18,15 +18,22 @@ * @file * @author Aaron Schulz */ +use Psr\Log\LoggerInterface; +use Psr\Log\LoggerAwareInterface; +use Psr\Log\NullLogger; /** * Base class for reliable event relays */ -abstract class EventRelayer { +abstract class EventRelayer implements LoggerAwareInterface { + /** @var LoggerInterface */ + protected $logger; + /** * @param array $params */ public function __construct( array $params ) { + $this->logger = new NullLogger(); } /** @@ -47,6 +54,14 @@ abstract class EventRelayer { return $this->doNotify( $channel, $events ); } + /** + * Set logger instance. + * @param LoggerInterface $logger + */ + public function setLogger( LoggerInterface $logger ) { + $this->logger = $logger; + } + /** * @param string $channel * @param array $events List of event data maps diff --git a/includes/libs/eventrelayer/EventRelayerKafka.php b/includes/libs/eventrelayer/EventRelayerKafka.php new file mode 100644 index 0000000000..3555a232a8 --- /dev/null +++ b/includes/libs/eventrelayer/EventRelayerKafka.php @@ -0,0 +1,66 @@ + [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ], + */ +class EventRelayerKafka extends EventRelayer { + + /** + * Configuration. + * + * @var Config + */ + protected $config; + + /** + * Kafka producer. + * + * @var Produce + */ + protected $producer; + + /** + * Create Kafka producer. + * + * @param Config $config + */ + public function __construct( array $params ) { + $this->config = new HashConfig( $params ); + if ( !$this->config->has( 'KafkaEventHost' ) ) { + throw new InvalidArgumentException( "KafkaEventHost must be configured" ); + } + } + + /** + * Get the producer object from kafka-php. + * @return Produce + */ + protected function getKafkaProducer() { + if ( !$this->producer ) { + $this->producer = Produce::getInstance( null, null, $this->config->get( 'KafkaEventHost' ) ); + } + return $this->producer; + } + + /** + * (non-PHPdoc) + * + * @see EventRelayer::doNotify() + * + */ + protected function doNotify( $channel, array $events ) { + $jsonEvents = array_map( 'json_encode', $events ); + try { + $producer = $this->getKafkaProducer(); + $producer->setMessages( $channel, 0, $jsonEvents ); + $producer->send(); + } catch ( \Kafka\Exception $e ) { + $this->logger->warning( "Sending events failed: $e" ); + return false; + } + return true; + } +} diff --git a/includes/libs/objectcache/WANObjectCache.php b/includes/libs/objectcache/WANObjectCache.php index b212e9779d..dd2e0d5d4b 100644 --- a/includes/libs/objectcache/WANObjectCache.php +++ b/includes/libs/objectcache/WANObjectCache.php @@ -149,6 +149,10 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface { $this->setLogger( isset( $params['logger'] ) ? $params['logger'] : new NullLogger() ); } + /** + * Set logger instance. + * @param LoggerInterface $logger + */ public function setLogger( LoggerInterface $logger ) { $this->logger = $logger; } -- 2.20.1