]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Protocol / Decoder.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka\Protocol;
16
17 /**
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
21 *
22 * @package
23 * @version $_SWANBR_VERSION_$
24 * @copyright Copyleft
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
27 */
28
29 class Decoder extends Protocol
30 {
31     // {{{ functions
32     // {{{ public function produceResponse()
33
34     /**
35      * decode produce response
36      *
37      * @access public
38      * @return array
39      */
40     public function produceResponse()
41     {
42         $result = array();
43         $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
44         $dataLen = array_shift($dataLen);
45         if (!$dataLen) {
46             throw new \Kafka\Exception\Protocol('produce response invalid.');
47         }
48         $data = $this->stream->read($dataLen, true);
49
50         // parse data struct
51         $offset = 4;
52         $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
53         $topicCount = array_shift($topicCount);
54         $offset += 4;
55         for ($i = 0; $i < $topicCount; $i++) {
56             $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
57             $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
58             $offset += 2;
59             $topicName = substr($data, $offset, $topicLen);
60             $offset += $topicLen;
61             $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
62             $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
63             $offset += 4;
64             $result[$topicName] = array();
65             for ($j = 0; $j < $partitionCount; $j++) {
66                 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
67                 $offset += 4;
68                 $errCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
69                 $offset += 2;
70                 $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
71                 $offset += 8;
72                 $result[$topicName][$partitionId[1]] = array(
73                     'errCode' => $errCode[1],
74                     'offset'  => $partitionOffset
75                 );
76             }
77         }
78
79         return $result;
80     }
81
82     // }}}
83     // {{{ public function fetchResponse()
84
85     /**
86      * decode fetch response
87      *
88      * @access public
89      * @return \Iterator
90      */
91     public function fetchResponse()
92     {
93         return new \Kafka\Protocol\Fetch\Topic($this->stream);
94     }
95
96     // }}}
97     // {{{ public function metadataResponse()
98
99     /**
100      * decode metadata response
101      *
102      * @access public
103      * @return array
104      */
105     public function metadataResponse()
106     {
107         $broker = array();
108         $topic = array();
109         $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
110         $dataLen = array_shift($dataLen);
111         if (!$dataLen) {
112             throw new \Kafka\Exception\Protocol('metaData response invalid.');
113         }
114         $data = $this->stream->read($dataLen, true);
115         $offset = 4;
116         $brokerCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
117         $offset += 4;
118         $brokerCount = isset($brokerCount[1]) ? $brokerCount[1] : 0;
119         for ($i = 0; $i < $brokerCount; $i++) {
120             $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
121             $nodeId = $nodeId[1];
122             $offset += 4;
123             $hostNameLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 host name length
124             $hostNameLen = isset($hostNameLen[1]) ? $hostNameLen[1] : 0;
125             $offset += 2;
126             $hostName = substr($data, $offset, $hostNameLen);
127             $offset += $hostNameLen;
128             $port = self::unpack(self::BIT_B32, substr($data, $offset, 4));
129             $offset += 4;
130             $broker[$nodeId] = array(
131                 'host' => $hostName,
132                 'port' => $port[1],
133             );
134         }
135
136         $topicMetaCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
137         $offset += 4;
138         $topicMetaCount = isset($topicMetaCount[1]) ? $topicMetaCount[1] : 0;
139         for ($i = 0; $i < $topicMetaCount; $i++) {
140             $topicErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
141             $offset += 2;
142             $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
143             $offset += 2;
144             $topicName = substr($data, $offset, $topicLen[1]);
145             $offset += $topicLen[1];
146             $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
147             $offset += 4;
148             $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
149             $topic[$topicName]['errCode'] = $topicErrCode[1];
150             $partitions = array();
151             for ($j = 0; $j < $partitionCount; $j++) {
152                 $partitionErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
153                 $offset += 2;
154                 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
155                 $partitionId = isset($partitionId[1]) ? $partitionId[1] : 0;
156                 $offset += 4;
157                 $leaderId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
158                 $offset += 4;
159                 $repliasCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
160                 $offset += 4;
161                 $repliasCount = isset($repliasCount[1]) ? $repliasCount[1] : 0;
162                 $replias = array();
163                 for ($z = 0; $z < $repliasCount; $z++) {
164                     $repliaId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
165                     $offset += 4;
166                     $replias[] = $repliaId[1];
167                 }
168                 $isrCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
169                 $offset += 4;
170                 $isrCount = isset($isrCount[1]) ? $isrCount[1] : 0;
171                 $isrs = array();
172                 for ($z = 0; $z < $isrCount; $z++) {
173                     $isrId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
174                     $offset += 4;
175                     $isrs[] = $isrId[1];
176                 }
177
178                 $partitions[$partitionId] = array(
179                     'errCode'  => $partitionErrCode[1],
180                     'leader'   => $leaderId[1],
181                     'replicas' => $replias,
182                     'isr'      => $isrs,
183                 );
184             }
185             $topic[$topicName]['partitions'] = $partitions;
186         }
187
188         $result = array(
189             'brokers' => $broker,
190             'topics'  => $topic,
191         );
192         return $result;
193     }
194
195     // }}}
196     // {{{ public function offsetResponse()
197
198     /**
199      * decode offset response
200      *
201      * @access public
202      * @return array
203      */
204     public function offsetResponse()
205     {
206         $result = array();
207         $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
208         $dataLen = array_shift($dataLen);
209         if (!$dataLen) {
210             throw new \Kafka\Exception\Protocol('offset response invalid.');
211         }
212         $data = $this->stream->read($dataLen, true);
213         $offset = 4;
214         $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
215         $offset += 4;
216         $topicCount = array_shift($topicCount);
217         for ($i = 0; $i < $topicCount; $i++) {
218             $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
219             $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
220             $offset += 2;
221             $topicName = substr($data, $offset, $topicLen);
222             $offset += $topicLen;
223             $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
224             $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
225             $offset += 4;
226             $result[$topicName] = array();
227             for ($j = 0; $j < $partitionCount; $j++) {
228                 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
229                 $offset += 4;
230                 $errCode     = self::unpack(self::BIT_B16, substr($data, $offset, 2));
231                 $offset += 2;
232                 $offsetCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
233                 $offset += 4;
234                 $offsetCount = array_shift($offsetCount);
235                 $offsetArr = array();
236                 for ($z = 0; $z < $offsetCount; $z++) {
237                     $offsetArr[] = self::unpack(self::BIT_B64, substr($data, $offset, 8));
238                     $offset += 8;
239                 }
240                 $result[$topicName][$partitionId[1]] = array(
241                     'errCode' => $errCode[1],
242                     'offset'  => $offsetArr
243                 );
244             }
245         }
246         return $result;
247     }
248
249     // }}}
250     // {{{ public function commitOffsetResponse()
251
252     /**
253      * decode commit offset response
254      *
255      * @access public
256      * @return array
257      */
258     public function commitOffsetResponse()
259     {
260         $result = array();
261         $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
262         $dataLen = array_shift($dataLen);
263         if (!$dataLen) {
264             throw new \Kafka\Exception\Protocol('commit offset response invalid.');
265         }
266         $data = $this->stream->read($dataLen, true);
267         $offset = 4;
268         $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
269         $offset += 4;
270         $topicCount = array_shift($topicCount);
271         for ($i = 0; $i < $topicCount; $i++) {
272             $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
273             $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
274             $offset += 2;
275             $topicName = substr($data, $offset, $topicLen);
276             $offset += $topicLen;
277             $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
278             $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
279             $offset += 4;
280             $result[$topicName] = array();
281             for ($j = 0; $j < $partitionCount; $j++) {
282                 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
283                 $offset += 4;
284                 $errCode     = self::unpack(self::BIT_B16, substr($data, $offset, 2));
285                 $offset += 2;
286                 $result[$topicName][$partitionId[1]] = array(
287                     'errCode' => $errCode[1],
288                 );
289             }
290         }
291         return $result;
292     }
293
294     // }}}
295     // {{{ public function fetchOffsetResponse()
296
297     /**
298      * decode fetch offset response
299      *
300      * @access public
301      * @return array
302      */
303     public function fetchOffsetResponse()
304     {
305         $result = array();
306         $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
307         $dataLen = array_shift($dataLen);
308         if (!$dataLen) {
309             throw new \Kafka\Exception\Protocol('fetch offset response invalid.');
310         }
311         $data = $this->stream->read($dataLen, true);
312         $offset = 4;
313         $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
314         $offset += 4;
315         $topicCount = array_shift($topicCount);
316         for ($i = 0; $i < $topicCount; $i++) {
317             $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
318             $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
319             $offset += 2;
320             $topicName = substr($data, $offset, $topicLen);
321             $offset += $topicLen;
322             $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
323             $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
324             $offset += 4;
325             $result[$topicName] = array();
326             for ($j = 0; $j < $partitionCount; $j++) {
327                 $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
328                 $offset += 4;
329                 $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
330                 $offset += 8;
331                 $metaLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
332                 $metaLen = array_shift($metaLen);
333                 $offset += 2;
334                 $metaData = '';
335                 if ($metaLen) {
336                     $metaData = substr($data, $offset, $metaLen);
337                     $offset += $metaLen;
338                 }
339                 $errCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
340                 $offset += 2;
341                 $result[$topicName][$partitionId[1]] = array(
342                     'offset'   => $partitionOffset,
343                     'metadata' => $metaData,
344                     'errCode'  => $errCode[1],
345                 );
346             }
347         }
348         return $result;
349     }
350
351     // }}}
352     // {{{ public static function getError()
353
354     /**
355      * get error
356      *
357      * @param integer $errCode
358      * @static
359      * @access public
360      * @return string
361      */
362     public static function getError($errCode)
363     {
364         switch($errCode) {
365             case 0:
366                 $error = 'No error--it worked!';
367                 break;
368             case -1:
369                 $error = 'An unexpected server error';
370                 break;
371             case 1:
372                 $error = 'The requested offset is outside the range of offsets maintained by the server for the given topic/partition.';
373                 break;
374             case 2:
375                 $error = 'This indicates that a message contents does not match its CRC';
376                 break;
377             case 3:
378                 $error = 'This request is for a topic or partition that does not exist on this broker.';
379                 break;
380             case 4:
381                 $error = 'The message has a negative size';
382                 break;
383             case 5:
384                 $error = 'This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes';
385                 break;
386             case 6:
387                 $error = 'This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.';
388                 break;
389             case 7:
390                 $error = 'This error is thrown if the request exceeds the user-specified time limit in the request.';
391                 break;
392             case 8:
393                 $error = 'This is not a client facing error and is used only internally by intra-cluster broker communication.';
394                 break;
395             case 10:
396                 $error = 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.';
397                 break;
398             case 11:
399                 $error = 'Internal error code for broker-to-broker communication.';
400                 break;
401             case 12:
402                 $error = 'If you specify a string larger than configured maximum for offset metadata';
403                 break;
404             case 14:
405                 $error = 'The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).';
406                 break;
407             case 15:
408                 $error = 'The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.';
409                 break;
410             case 16:
411                 $error = 'The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.';
412                 break;
413             default:
414                 $error = 'Unknown error';
415         }
416
417         return $error;
418     }
419
420     // }}}
421     // }}}
422 }