X-Git-Url: https://scripts.mit.edu/gitweb/autoinstallsdev/mediawiki.git/blobdiff_plain/19e297c21b10b1b8a3acad5e73fc71dcb35db44a..6932310fd58ebef145fa01eb76edf7150284d8ea:/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php new file mode 100644 index 00000000..9fadb568 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php @@ -0,0 +1,119 @@ +client = $client; + } + + // }}} + // {{{ public function setGroup() + + /** + * set consumer group + * + * @access public + * @param $group + */ + public function setGroup($group) + { + $this->group = $group; + } + + // }}} + // {{{ public function onStreamEof() + + /** + * on stream eof call + * + * @param string $streamKey + * @access public + * @return void + */ + public function onStreamEof($streamKey) + { + } + + // }}} + // {{{ public function onTopicEof() + + /** + * on topic eof call + * + * @param string $topicName + * @access public + * @return void + */ + public function onTopicEof($topicName) + { + } + + // }}} + // {{{ public function onPartitionEof() + + /** + * on partition eof call + * + * @param \Kafka\Protocol\Fetch\Partition $partition + * @access public + * @return void + */ + public function onPartitionEof($partition) + { + $partitionId = $partition->key(); + $topicName = $partition->getTopicName(); + $offset = $partition->getMessageOffset(); + $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); + $offsetObject->setOffset($offset + 1); + } + + // }}} + // }}} +}