client = $client; } // }}} // {{{ public function setGroup() /** * set consumer group * * @access public * @param $group */ public function setGroup($group) { $this->group = $group; } // }}} // {{{ public function onStreamEof() /** * on stream eof call * * @param string $streamKey * @access public * @return void */ public function onStreamEof($streamKey) { } // }}} // {{{ public function onTopicEof() /** * on topic eof call * * @param string $topicName * @access public * @return void */ public function onTopicEof($topicName) { } // }}} // {{{ public function onPartitionEof() /** * on partition eof call * * @param \Kafka\Protocol\Fetch\Partition $partition * @access public * @return void */ public function onPartitionEof($partition) { $partitionId = $partition->key(); $topicName = $partition->getTopicName(); $offset = $partition->getMessageOffset(); $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); $offsetObject->setOffset($offset + 1); } // }}} // }}} }