]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blobdiff - vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Protocol / Fetch / Helper / Consumer.php
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
new file mode 100644 (file)
index 0000000..49560b9
--- /dev/null
@@ -0,0 +1,50 @@
+<?php
+namespace Kafka\Protocol\Fetch\Helper;
+/**
+ * Description of Consumer
+ *
+ * @author daniel
+ */
+class Consumer extends HelperAbstract
+{
+    protected $consumer;
+
+    protected $offsetStrategy;
+
+    /**
+     * Consumer constructor.
+     * @param \Kafka\Consumer $consumer
+     */
+    public function __construct(\Kafka\Consumer $consumer)
+    {
+        $this->consumer = $consumer;
+    }
+
+    /**
+     * @param \Kafka\Protocol\Fetch\Partition $partition
+     */
+    public function onPartitionEof($partition)
+    {
+        $partitionId = $partition->key();
+        $topicName = $partition->getTopicName();
+        $offset    = $partition->getMessageOffset();
+        $this->consumer->setFromOffset(true);
+        $this->consumer->setPartition($topicName, $partitionId, ($offset +1));
+    }
+
+    /**
+     * @param string $streamKey
+     */
+    public function onStreamEof($streamKey)
+    {
+
+    }
+
+    /**
+     * @param string $topicName
+     */
+    public function onTopicEof($topicName)
+    {
+
+    }
+}