2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
15 namespace Kafka\Protocol;
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
23 * @version $_SWANBR_VERSION_$
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
29 class Decoder extends Protocol
32 // {{{ public function produceResponse()
35 * decode produce response
40 public function produceResponse()
43 $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
44 $dataLen = array_shift($dataLen);
46 throw new \Kafka\Exception\Protocol('produce response invalid.');
48 $data = $this->stream->read($dataLen, true);
52 $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
53 $topicCount = array_shift($topicCount);
55 for ($i = 0; $i < $topicCount; $i++) {
56 $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
57 $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
59 $topicName = substr($data, $offset, $topicLen);
61 $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
62 $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
64 $result[$topicName] = array();
65 for ($j = 0; $j < $partitionCount; $j++) {
66 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
68 $errCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
70 $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
72 $result[$topicName][$partitionId[1]] = array(
73 'errCode' => $errCode[1],
74 'offset' => $partitionOffset
83 // {{{ public function fetchResponse()
86 * decode fetch response
91 public function fetchResponse()
93 return new \Kafka\Protocol\Fetch\Topic($this->stream);
97 // {{{ public function metadataResponse()
100 * decode metadata response
105 public function metadataResponse()
109 $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
110 $dataLen = array_shift($dataLen);
112 throw new \Kafka\Exception\Protocol('metaData response invalid.');
114 $data = $this->stream->read($dataLen, true);
116 $brokerCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
118 $brokerCount = isset($brokerCount[1]) ? $brokerCount[1] : 0;
119 for ($i = 0; $i < $brokerCount; $i++) {
120 $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
121 $nodeId = $nodeId[1];
123 $hostNameLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 host name length
124 $hostNameLen = isset($hostNameLen[1]) ? $hostNameLen[1] : 0;
126 $hostName = substr($data, $offset, $hostNameLen);
127 $offset += $hostNameLen;
128 $port = self::unpack(self::BIT_B32, substr($data, $offset, 4));
130 $broker[$nodeId] = array(
136 $topicMetaCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
138 $topicMetaCount = isset($topicMetaCount[1]) ? $topicMetaCount[1] : 0;
139 for ($i = 0; $i < $topicMetaCount; $i++) {
140 $topicErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
142 $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
144 $topicName = substr($data, $offset, $topicLen[1]);
145 $offset += $topicLen[1];
146 $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
148 $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
149 $topic[$topicName]['errCode'] = $topicErrCode[1];
150 $partitions = array();
151 for ($j = 0; $j < $partitionCount; $j++) {
152 $partitionErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
154 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
155 $partitionId = isset($partitionId[1]) ? $partitionId[1] : 0;
157 $leaderId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
159 $repliasCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
161 $repliasCount = isset($repliasCount[1]) ? $repliasCount[1] : 0;
163 for ($z = 0; $z < $repliasCount; $z++) {
164 $repliaId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
166 $replias[] = $repliaId[1];
168 $isrCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
170 $isrCount = isset($isrCount[1]) ? $isrCount[1] : 0;
172 for ($z = 0; $z < $isrCount; $z++) {
173 $isrId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
178 $partitions[$partitionId] = array(
179 'errCode' => $partitionErrCode[1],
180 'leader' => $leaderId[1],
181 'replicas' => $replias,
185 $topic[$topicName]['partitions'] = $partitions;
189 'brokers' => $broker,
196 // {{{ public function offsetResponse()
199 * decode offset response
204 public function offsetResponse()
207 $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
208 $dataLen = array_shift($dataLen);
210 throw new \Kafka\Exception\Protocol('offset response invalid.');
212 $data = $this->stream->read($dataLen, true);
214 $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
216 $topicCount = array_shift($topicCount);
217 for ($i = 0; $i < $topicCount; $i++) {
218 $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
219 $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
221 $topicName = substr($data, $offset, $topicLen);
222 $offset += $topicLen;
223 $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
224 $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
226 $result[$topicName] = array();
227 for ($j = 0; $j < $partitionCount; $j++) {
228 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
230 $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
232 $offsetCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
234 $offsetCount = array_shift($offsetCount);
235 $offsetArr = array();
236 for ($z = 0; $z < $offsetCount; $z++) {
237 $offsetArr[] = self::unpack(self::BIT_B64, substr($data, $offset, 8));
240 $result[$topicName][$partitionId[1]] = array(
241 'errCode' => $errCode[1],
242 'offset' => $offsetArr
250 // {{{ public function commitOffsetResponse()
253 * decode commit offset response
258 public function commitOffsetResponse()
261 $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
262 $dataLen = array_shift($dataLen);
264 throw new \Kafka\Exception\Protocol('commit offset response invalid.');
266 $data = $this->stream->read($dataLen, true);
268 $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
270 $topicCount = array_shift($topicCount);
271 for ($i = 0; $i < $topicCount; $i++) {
272 $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
273 $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
275 $topicName = substr($data, $offset, $topicLen);
276 $offset += $topicLen;
277 $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
278 $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
280 $result[$topicName] = array();
281 for ($j = 0; $j < $partitionCount; $j++) {
282 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
284 $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
286 $result[$topicName][$partitionId[1]] = array(
287 'errCode' => $errCode[1],
295 // {{{ public function fetchOffsetResponse()
298 * decode fetch offset response
303 public function fetchOffsetResponse()
306 $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
307 $dataLen = array_shift($dataLen);
309 throw new \Kafka\Exception\Protocol('fetch offset response invalid.');
311 $data = $this->stream->read($dataLen, true);
313 $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
315 $topicCount = array_shift($topicCount);
316 for ($i = 0; $i < $topicCount; $i++) {
317 $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
318 $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
320 $topicName = substr($data, $offset, $topicLen);
321 $offset += $topicLen;
322 $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
323 $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
325 $result[$topicName] = array();
326 for ($j = 0; $j < $partitionCount; $j++) {
327 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
329 $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
331 $metaLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
332 $metaLen = array_shift($metaLen);
336 $metaData = substr($data, $offset, $metaLen);
339 $errCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
341 $result[$topicName][$partitionId[1]] = array(
342 'offset' => $partitionOffset,
343 'metadata' => $metaData,
344 'errCode' => $errCode[1],
352 // {{{ public static function getError()
357 * @param integer $errCode
362 public static function getError($errCode)
366 $error = 'No error--it worked!';
369 $error = 'An unexpected server error';
372 $error = 'The requested offset is outside the range of offsets maintained by the server for the given topic/partition.';
375 $error = 'This indicates that a message contents does not match its CRC';
378 $error = 'This request is for a topic or partition that does not exist on this broker.';
381 $error = 'The message has a negative size';
384 $error = 'This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes';
387 $error = 'This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.';
390 $error = 'This error is thrown if the request exceeds the user-specified time limit in the request.';
393 $error = 'This is not a client facing error and is used only internally by intra-cluster broker communication.';
396 $error = 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.';
399 $error = 'Internal error code for broker-to-broker communication.';
402 $error = 'If you specify a string larger than configured maximum for offset metadata';
405 $error = 'The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).';
408 $error = 'The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.';
411 $error = 'The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.';
414 $error = 'Unknown error';