$helper) { if (method_exists($helper, 'onStreamEof')) { $helper->onStreamEof($streamKey); } } } // }}} // {{{ public static function onTopicEof() /** * on topic eof call * * @param string $topicName * @static * @access public * @return void */ public static function onTopicEof($topicName) { if (empty(self::$helpers)) { return; } foreach (self::$helpers as $key => $helper) { if (method_exists($helper, 'onTopicEof')) { $helper->onStreamEof($topicName); } } } // }}} // {{{ public static function onPartitionEof() /** * on partition eof call * * @param \Kafka\Protocol\Fetch\Partition $partition * @static * @access public * @return void */ public static function onPartitionEof($partition) { if (empty(self::$helpers)) { return; } foreach (self::$helpers as $key => $helper) { if (method_exists($helper, 'onPartitionEof')) { $helper->onPartitionEof($partition); } } } // }}} // }}} }