]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
MediaWiki 1.30.2-scripts2
[autoinstalls/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Protocol / Fetch / MessageSet.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka\Protocol\Fetch;
16
17 use \Kafka\Protocol\Decoder;
18
19 /**
20 +------------------------------------------------------------------------------
21 * Kafka protocol since Kafka v0.8
22 +------------------------------------------------------------------------------
23 *
24 * @package
25 * @version $_SWANBR_VERSION_$
26 * @copyright Copyleft
27 * @author $_SWANBR_AUTHOR_$
28 +------------------------------------------------------------------------------
29 */
30
31 class MessageSet implements \Iterator
32 {
33     // {{{ members
34
35     /**
36      * kafka socket object
37      *
38      * @var mixed
39      * @access private
40      */
41     private $stream = null;
42
43     /**
44      * messageSet size
45      *
46      * @var float
47      * @access private
48      */
49     private $messageSetSize = 0;
50
51     /**
52      * validByteCount
53      *
54      * @var float
55      * @access private
56      */
57     private $validByteCount = 0;
58
59     /**
60      * messageSet offset
61      *
62      * @var float
63      * @access private
64      */
65     private $offset = 0;
66
67     /**
68      * valid
69      *
70      * @var mixed
71      * @access private
72      */
73     private $valid = false;
74
75     /**
76      * partition object
77      *
78      * @var \Kafka\Protocol\Fetch\Partition
79      * @access private
80      */
81     private $partition = null;
82
83     /**
84      * request fetch context
85      *
86      * @var array
87      */
88     private $context = array();
89
90     /**
91      * @var Message
92      */
93     private $current = null;
94
95     // }}}
96     // {{{ functions
97     // {{{ public function __construct()
98
99     /**
100      * __construct
101      *
102      * @param Partition $partition
103      * @param array $context
104      * @access public
105      */
106     public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array())
107     {
108         $this->stream = $partition->getStream();
109         $this->partition = $partition;
110         $this->context   = $context;
111         $this->messageSetSize = $this->getMessageSetSize();
112         \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO);
113     }
114
115     // }}}
116     // {{{ public function current()
117
118     /**
119      * current
120      *
121      * @access public
122      * @return Message
123      */
124     public function current()
125     {
126         return $this->current;
127     }
128
129     // }}}
130     // {{{ public function key()
131
132     /**
133      * key
134      *
135      * @access public
136      * @return float
137      */
138     public function key()
139     {
140         return $this->validByteCount;
141     }
142
143     // }}}
144     // {{{ public function rewind()
145
146     /**
147      * implements Iterator function
148      *
149      * @access public
150      * @return integer
151      */
152     public function rewind()
153     {
154         $this->valid = $this->loadNextMessage();
155     }
156
157     // }}}
158     // {{{ public function valid()
159
160     /**
161      * implements Iterator function
162      *
163      * @access public
164      * @return integer
165      */
166     public function valid()
167     {
168         if (!$this->valid) {
169             $this->partition->setMessageOffset($this->offset);
170
171             // one partition iterator end
172             \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition);
173         }
174
175         return $this->valid;
176     }
177
178     // }}}
179     // {{{ public function next()
180
181     /**
182      * implements Iterator function
183      *
184      * @access public
185      * @return void
186      */
187     public function next()
188     {
189         $this->valid = $this->loadNextMessage();
190     }
191
192     // }}}
193     // {{{ protected function getMessageSetSize()
194
195     /**
196      * get message set size
197      *
198      * @access protected
199      * @return integer
200      */
201     protected function getMessageSetSize()
202     {
203         // read message size
204         $data = $this->stream->read(4, true);
205         $data = Decoder::unpack(Decoder::BIT_B32, $data);
206         $size = array_shift($data);
207         if ($size <= 0) {
208             throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size');
209         }
210
211         return $size;
212     }
213
214     // }}}
215     // {{{ public function loadNextMessage()
216
217     /**
218      * load next message
219      *
220      * @access public
221      * @return bool
222      */
223     public function loadNextMessage()
224     {
225         if ($this->validByteCount >= $this->messageSetSize) {
226             return false;
227         }
228
229         try {
230             if ($this->validByteCount + 12 > $this->messageSetSize) {
231                 // read socket buffer dirty data
232                 $this->stream->read($this->messageSetSize - $this->validByteCount);
233                 return false;
234             }
235             $offset = $this->stream->read(8, true);
236             $this->offset  = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset);
237             $messageSize = $this->stream->read(4, true);
238             $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize);
239             $messageSize = array_shift($messageSize);
240             $this->validByteCount += 12;
241             if (($this->validByteCount + $messageSize) > $this->messageSetSize) {
242                 // read socket buffer dirty data
243                 $this->stream->read($this->messageSetSize - $this->validByteCount);
244                 return false;
245             }
246             $msg  = $this->stream->read($messageSize, true);
247             $this->current = new Message($msg);
248         } catch (\Kafka\Exception $e) {
249             \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO);
250             return false;
251         }
252
253         $this->validByteCount += $messageSize;
254
255         return true;
256     }
257
258     // }}}
259     // {{{ public function messageOffset()
260
261     /**
262      * current message offset in producer
263      *
264      * @return float
265      */
266     public function messageOffset()
267     {
268         return $this->offset;
269     }
270
271     // }}}
272     // }}}
273 }