consumer = $consumer; } /** * @param \Kafka\Protocol\Fetch\Partition $partition */ public function onPartitionEof($partition) { $partitionId = $partition->key(); $topicName = $partition->getTopicName(); $offset = $partition->getMessageOffset(); $this->consumer->setFromOffset(true); $this->consumer->setPartition($topicName, $partitionId, ($offset +1)); } /** * @param string $streamKey */ public function onStreamEof($streamKey) { } /** * @param string $topicName */ public function onTopicEof($topicName) { } }