2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
15 namespace Kafka\Protocol\Fetch;
17 use \Kafka\Protocol\Decoder;
20 +------------------------------------------------------------------------------
21 * Kafka protocol since Kafka v0.8
22 +------------------------------------------------------------------------------
25 * @version $_SWANBR_VERSION_$
27 * @author $_SWANBR_AUTHOR_$
28 +------------------------------------------------------------------------------
31 class MessageSet implements \Iterator
41 private $stream = null;
49 private $messageSetSize = 0;
57 private $validByteCount = 0;
73 private $valid = false;
78 * @var \Kafka\Protocol\Fetch\Partition
81 private $partition = null;
84 * request fetch context
88 private $context = array();
93 private $current = null;
97 // {{{ public function __construct()
102 * @param Partition $partition
103 * @param array $context
106 public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array())
108 $this->stream = $partition->getStream();
109 $this->partition = $partition;
110 $this->context = $context;
111 $this->messageSetSize = $this->getMessageSetSize();
112 \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO);
116 // {{{ public function current()
124 public function current()
126 return $this->current;
130 // {{{ public function key()
138 public function key()
140 return $this->validByteCount;
144 // {{{ public function rewind()
147 * implements Iterator function
152 public function rewind()
154 $this->valid = $this->loadNextMessage();
158 // {{{ public function valid()
161 * implements Iterator function
166 public function valid()
169 $this->partition->setMessageOffset($this->offset);
171 // one partition iterator end
172 \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition);
179 // {{{ public function next()
182 * implements Iterator function
187 public function next()
189 $this->valid = $this->loadNextMessage();
193 // {{{ protected function getMessageSetSize()
196 * get message set size
201 protected function getMessageSetSize()
204 $data = $this->stream->read(4, true);
205 $data = Decoder::unpack(Decoder::BIT_B32, $data);
206 $size = array_shift($data);
208 throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size');
215 // {{{ public function loadNextMessage()
223 public function loadNextMessage()
225 if ($this->validByteCount >= $this->messageSetSize) {
230 if ($this->validByteCount + 12 > $this->messageSetSize) {
231 // read socket buffer dirty data
232 $this->stream->read($this->messageSetSize - $this->validByteCount);
235 $offset = $this->stream->read(8, true);
236 $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset);
237 $messageSize = $this->stream->read(4, true);
238 $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize);
239 $messageSize = array_shift($messageSize);
240 $this->validByteCount += 12;
241 if (($this->validByteCount + $messageSize) > $this->messageSetSize) {
242 // read socket buffer dirty data
243 $this->stream->read($this->messageSetSize - $this->validByteCount);
246 $msg = $this->stream->read($messageSize, true);
247 $this->current = new Message($msg);
248 } catch (\Kafka\Exception $e) {
249 \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO);
253 $this->validByteCount += $messageSize;
259 // {{{ public function messageOffset()
262 * current message offset in producer
266 public function messageOffset()
268 return $this->offset;