2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
15 namespace Kafka\Protocol\Fetch\Helper;
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
23 * @version $_SWANBR_VERSION_$
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
36 private static $helpers = array();
40 // {{{ public staitc function registerHelper()
46 * @param \Kafka\Protocol\Fetch\Helper\HelperAbstract $helper
51 public static function registerHelper($key, $helper = null)
53 if (is_null($helper)) {
54 $className = '\\Kafka\\Protocol\\Fetch\\Helper\\' . $key;
55 if (!class_exists($className)) {
56 throw new \Kafka\Exception('helper is not exists.');
58 $helper = new $className();
61 if ($helper instanceof \Kafka\Protocol\Fetch\Helper\HelperAbstract) {
62 self::$helpers[$key] = $helper;
64 throw new \Kafka\Exception('this helper not instance of `\Kafka\Protocol\Fetch\Helper\HelperAbstract`');
69 // {{{ public staitc function unRegisterHelper()
79 public static function unRegisterHelper($key)
81 if (isset(self::$helpers[$key])) {
82 unset(self::$helpers[$key]);
87 // {{{ public static function onStreamEof()
92 * @param string $streamKey
97 public static function onStreamEof($streamKey)
99 if (empty(self::$helpers)) {
103 foreach (self::$helpers as $key => $helper) {
104 if (method_exists($helper, 'onStreamEof')) {
105 $helper->onStreamEof($streamKey);
111 // {{{ public static function onTopicEof()
116 * @param string $topicName
121 public static function onTopicEof($topicName)
123 if (empty(self::$helpers)) {
127 foreach (self::$helpers as $key => $helper) {
128 if (method_exists($helper, 'onTopicEof')) {
129 $helper->onStreamEof($topicName);
135 // {{{ public static function onPartitionEof()
138 * on partition eof call
140 * @param \Kafka\Protocol\Fetch\Partition $partition
145 public static function onPartitionEof($partition)
147 if (empty(self::$helpers)) {
151 foreach (self::$helpers as $key => $helper) {
152 if (method_exists($helper, 'onPartitionEof')) {
153 $helper->onPartitionEof($partition);