]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php
MediaWiki 1.30.2-scripts
[autoinstalls/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Protocol / Protocol.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka\Protocol;
16
17 /**
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
21 *
22 * @package
23 * @version $_SWANBR_VERSION_$
24 * @copyright Copyleft
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
27 */
28
29 abstract class Protocol
30 {
31     // {{{ consts
32
33     /**
34      *  Kafka server protocol version
35      */
36     const API_VERSION = 0;
37
38     /**
39      * use encode message, This is a version id used to allow backwards
40      * compatible evolution of the message binary format.
41      */
42     const MESSAGE_MAGIC = 0;
43
44     /**
45      * message no compression
46      */
47     const COMPRESSION_NONE = 0;
48
49     /**
50      * Message using gzip compression
51      */
52     const COMPRESSION_GZIP = 1;
53
54     /**
55      * Message using Snappy compression
56      */
57     const COMPRESSION_SNAPPY = 2;
58
59     /**
60      *  pack int32 type
61      */
62     const PACK_INT32 = 0;
63
64     /**
65      * pack int16 type
66      */
67     const PACK_INT16 = 1;
68
69     /**
70      * protocol request code
71      */
72     const PRODUCE_REQUEST = 0;
73     const FETCH_REQUEST   = 1;
74     const OFFSET_REQUEST  = 2;
75     const METADATA_REQUEST      = 3;
76     const OFFSET_COMMIT_REQUEST = 8;
77     const OFFSET_FETCH_REQUEST  = 9;
78     const CONSUMER_METADATA_REQUEST = 10;
79
80     // unpack/pack bit
81     const BIT_B64 = 'N2';
82     const BIT_B32 = 'N';
83     const BIT_B16 = 'n';
84     const BIT_B16_SIGNED = 's';
85     const BIT_B8  = 'C';
86
87     // }}}
88     // {{{ members
89
90     /**
91      * stream
92      *
93      * @var mixed
94      * @access protected
95      */
96     protected $stream = null;
97
98     /**
99      * isBigEndianSystem
100      *
101      * gets set to true if the computer this code is running is little endian,
102      * gets set to false if the computer this code is running on is big endian.
103      *
104      * @var null|bool
105      * @access private
106      */
107     private static $isLittleEndianSystem = null;
108
109     // }}}
110     // {{{ functions
111     // {{{ public function __construct()
112
113     /**
114      * __construct
115      *
116      * @param \Kafka\Socket $stream
117      * @access public
118      */
119     public function __construct(\Kafka\Socket $stream)
120     {
121         $this->stream = $stream;
122     }
123
124     // }}}
125     // {{{ public static function Khex2bin()
126
127     /**
128      * hex to bin
129      *
130      * @param string $string
131      * @static
132      * @access protected
133      * @return string (raw)
134      */
135     public static function Khex2bin($string)
136     {
137         if (function_exists('\hex2bin')) {
138             return \hex2bin($string);
139         } else {
140             $bin = '';
141             $len = strlen($string);
142             for ($i = 0; $i < $len; $i += 2) {
143                 $bin .= pack('H*', substr($string, $i, 2));
144             }
145
146             return $bin;
147         }
148     }
149
150     // }}}
151     // {{{ public static function unpack()
152
153     /**
154      * Unpack a bit integer as big endian long
155      *
156      * @static
157      * @access public
158      * @param $type
159      * @param $bytes
160      * @return int
161      */
162     public static function unpack($type, $bytes)
163     {
164         self::checkLen($type, $bytes);
165         if ($type == self::BIT_B64) {
166             $set = unpack($type, $bytes);
167             $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF);
168             return $original;
169         } elseif ($type == self::BIT_B16_SIGNED) {
170             // According to PHP docs: 's' = signed short (always 16 bit, machine byte order)
171             // So lets unpack it..
172             $set = unpack($type, $bytes);
173
174             // But if our system is little endian
175             if (self::isSystemLittleEndian()) {
176                 // We need to flip the endianess because coming from kafka it is big endian
177                 $set = self::convertSignedShortFromLittleEndianToBigEndian($set);
178             }
179             return $set;
180         } else {
181             return unpack($type, $bytes);
182         }
183     }
184
185     // }}}
186     // {{{ public static function pack()
187
188     /**
189      * pack a bit integer as big endian long
190      *
191      * @static
192      * @access public
193      * @param $type
194      * @param $data
195      * @return int
196      */
197     public static function pack($type, $data)
198     {
199         if ($type == self::BIT_B64) {
200             if ($data == -1) { // -1L
201                 $data = self::Khex2bin('ffffffffffffffff');
202             } elseif ($data == -2) { // -2L
203                 $data = self::Khex2bin('fffffffffffffffe');
204             } else {
205                 $left  = 0xffffffff00000000;
206                 $right = 0x00000000ffffffff;
207
208                 $l = ($data & $left) >> 32;
209                 $r = $data & $right;
210                 $data = pack($type, $l, $r);
211             }
212         } else {
213             $data = pack($type, $data);
214         }
215
216         return $data;
217     }
218
219     // }}}
220     // {{{ protected static function checkLen()
221
222     /**
223      * check unpack bit is valid
224      *
225      * @param string $type
226      * @param string(raw) $bytes
227      * @static
228      * @access protected
229      * @return void
230      */
231     protected static function checkLen($type, $bytes)
232     {
233         $len = 0;
234         switch($type) {
235             case self::BIT_B64:
236                 $len = 8;
237                 break;
238             case self::BIT_B32:
239                 $len = 4;
240                 break;
241             case self::BIT_B16:
242                 $len = 2;
243                 break;
244             case self::BIT_B16_SIGNED:
245                 $len = 2;
246                 break;
247             case self::BIT_B8:
248                 $len = 1;
249                 break;
250         }
251
252         if (strlen($bytes) != $len) {
253             throw new \Kafka\Exception\Protocol('unpack failed. string(raw) length is ' . strlen($bytes) . ' , TO ' . $type);
254         }
255     }
256
257     // }}}
258     // {{{ public static function isSystemLittleEndian()
259
260     /**
261      * Determines if the computer currently running this code is big endian or little endian.
262      *
263      * @access public
264      * @return bool - false if big endian, true if little endian
265      */
266     public static function isSystemLittleEndian()
267     {
268         // If we don't know if our system is big endian or not yet...
269         if (is_null(self::$isLittleEndianSystem)) {
270             // Lets find out
271             list ($endiantest) = array_values(unpack('L1L', pack('V', 1)));
272             if ($endiantest != 1) {
273                 // This is a big endian system
274                 self::$isLittleEndianSystem = false;
275             } else {
276                 // This is a little endian system
277                 self::$isLittleEndianSystem = true;
278             }
279         }
280
281         return self::$isLittleEndianSystem;
282     }
283
284     // }}}
285     // {{{ public static function convertSignedShortFromLittleEndianToBigEndian()
286
287     /**
288      * Converts a signed short (16 bits) from little endian to big endian.
289      *
290      * @param int[] $bits
291      * @access public
292      * @return array
293      */
294     public static function convertSignedShortFromLittleEndianToBigEndian($bits)
295     {
296         foreach ($bits as $index => $bit) {
297
298             // get LSB
299             $lsb = $bit & 0xff;
300
301             // get MSB
302             $msb = $bit >> 8 & 0xff;
303
304             // swap bytes
305             $bit = $lsb <<8 | $msb;
306
307             if ($bit >= 32768) {
308                 $bit -= 65536;
309             }
310             $bits[$index] = $bit;
311         }
312         return $bits;
313     }
314
315     // }}}
316     // }}}
317 }