2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
15 namespace Kafka\Protocol;
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
23 * @version $_SWANBR_VERSION_$
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
29 abstract class Protocol
34 * Kafka server protocol version
36 const API_VERSION = 0;
39 * use encode message, This is a version id used to allow backwards
40 * compatible evolution of the message binary format.
42 const MESSAGE_MAGIC = 0;
45 * message no compression
47 const COMPRESSION_NONE = 0;
50 * Message using gzip compression
52 const COMPRESSION_GZIP = 1;
55 * Message using Snappy compression
57 const COMPRESSION_SNAPPY = 2;
70 * protocol request code
72 const PRODUCE_REQUEST = 0;
73 const FETCH_REQUEST = 1;
74 const OFFSET_REQUEST = 2;
75 const METADATA_REQUEST = 3;
76 const OFFSET_COMMIT_REQUEST = 8;
77 const OFFSET_FETCH_REQUEST = 9;
78 const CONSUMER_METADATA_REQUEST = 10;
84 const BIT_B16_SIGNED = 's';
96 protected $stream = null;
101 * gets set to true if the computer this code is running is little endian,
102 * gets set to false if the computer this code is running on is big endian.
107 private static $isLittleEndianSystem = null;
111 // {{{ public function __construct()
116 * @param \Kafka\Socket $stream
119 public function __construct(\Kafka\Socket $stream)
121 $this->stream = $stream;
125 // {{{ public static function Khex2bin()
130 * @param string $string
133 * @return string (raw)
135 public static function Khex2bin($string)
137 if (function_exists('\hex2bin')) {
138 return \hex2bin($string);
141 $len = strlen($string);
142 for ($i = 0; $i < $len; $i += 2) {
143 $bin .= pack('H*', substr($string, $i, 2));
151 // {{{ public static function unpack()
154 * Unpack a bit integer as big endian long
162 public static function unpack($type, $bytes)
164 self::checkLen($type, $bytes);
165 if ($type == self::BIT_B64) {
166 $set = unpack($type, $bytes);
167 $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF);
169 } elseif ($type == self::BIT_B16_SIGNED) {
170 // According to PHP docs: 's' = signed short (always 16 bit, machine byte order)
171 // So lets unpack it..
172 $set = unpack($type, $bytes);
174 // But if our system is little endian
175 if (self::isSystemLittleEndian()) {
176 // We need to flip the endianess because coming from kafka it is big endian
177 $set = self::convertSignedShortFromLittleEndianToBigEndian($set);
181 return unpack($type, $bytes);
186 // {{{ public static function pack()
189 * pack a bit integer as big endian long
197 public static function pack($type, $data)
199 if ($type == self::BIT_B64) {
200 if ($data == -1) { // -1L
201 $data = self::Khex2bin('ffffffffffffffff');
202 } elseif ($data == -2) { // -2L
203 $data = self::Khex2bin('fffffffffffffffe');
205 $left = 0xffffffff00000000;
206 $right = 0x00000000ffffffff;
208 $l = ($data & $left) >> 32;
210 $data = pack($type, $l, $r);
213 $data = pack($type, $data);
220 // {{{ protected static function checkLen()
223 * check unpack bit is valid
225 * @param string $type
226 * @param string(raw) $bytes
231 protected static function checkLen($type, $bytes)
244 case self::BIT_B16_SIGNED:
252 if (strlen($bytes) != $len) {
253 throw new \Kafka\Exception\Protocol('unpack failed. string(raw) length is ' . strlen($bytes) . ' , TO ' . $type);
258 // {{{ public static function isSystemLittleEndian()
261 * Determines if the computer currently running this code is big endian or little endian.
264 * @return bool - false if big endian, true if little endian
266 public static function isSystemLittleEndian()
268 // If we don't know if our system is big endian or not yet...
269 if (is_null(self::$isLittleEndianSystem)) {
271 list ($endiantest) = array_values(unpack('L1L', pack('V', 1)));
272 if ($endiantest != 1) {
273 // This is a big endian system
274 self::$isLittleEndianSystem = false;
276 // This is a little endian system
277 self::$isLittleEndianSystem = true;
281 return self::$isLittleEndianSystem;
285 // {{{ public static function convertSignedShortFromLittleEndianToBigEndian()
288 * Converts a signed short (16 bits) from little endian to big endian.
294 public static function convertSignedShortFromLittleEndianToBigEndian($bits)
296 foreach ($bits as $index => $bit) {
302 $msb = $bit >> 8 & 0xff;
305 $bit = $lsb <<8 | $msb;
310 $bits[$index] = $bit;