X-Git-Url: https://scripts.mit.edu/gitweb/autoinstallsdev/mediawiki.git/blobdiff_plain/19e297c21b10b1b8a3acad5e73fc71dcb35db44a..6932310fd58ebef145fa01eb76edf7150284d8ea:/includes/libs/eventrelayer/EventRelayerKafka.php diff --git a/includes/libs/eventrelayer/EventRelayerKafka.php b/includes/libs/eventrelayer/EventRelayerKafka.php new file mode 100644 index 00000000..999eb439 --- /dev/null +++ b/includes/libs/eventrelayer/EventRelayerKafka.php @@ -0,0 +1,62 @@ + [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ], + */ +class EventRelayerKafka extends EventRelayer { + /** + * Configuration. + * + * @var Config + */ + protected $config; + + /** + * Kafka producer. + * + * @var Produce + */ + protected $producer; + + /** + * Create Kafka producer. + * + * @param array $params + */ + public function __construct( array $params ) { + parent::__construct( $params ); + + $this->config = new HashConfig( $params ); + if ( !$this->config->has( 'KafkaEventHost' ) ) { + throw new InvalidArgumentException( "KafkaEventHost must be configured" ); + } + } + + /** + * Get the producer object from kafka-php. + * @return Produce + */ + protected function getKafkaProducer() { + if ( !$this->producer ) { + $this->producer = Produce::getInstance( + null, null, $this->config->get( 'KafkaEventHost' ) ); + } + return $this->producer; + } + + protected function doNotify( $channel, array $events ) { + $jsonEvents = array_map( 'json_encode', $events ); + try { + $producer = $this->getKafkaProducer(); + $producer->setMessages( $channel, 0, $jsonEvents ); + $producer->send(); + } catch ( \Kafka\Exception $e ) { + $this->logger->warning( "Sending events failed: $e" ); + return false; + } + return true; + } +}