2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
15 namespace Kafka\Protocol\Fetch;
17 use \Kafka\Protocol\Decoder;
20 +------------------------------------------------------------------------------
21 * Kafka protocol since Kafka v0.8
22 +------------------------------------------------------------------------------
25 * @version $_SWANBR_VERSION_$
27 * @author $_SWANBR_AUTHOR_$
28 +------------------------------------------------------------------------------
44 * This is a version id used to allow backwards compatible evolution of the
45 * message binary format.
53 * The lowest 2 bits contain the compression codec used for the message. The
54 * other bits should be set to 0.
59 private $attribute = 0;
79 // {{{ public function __construct()
84 * @param string(raw) $msg
87 public function __construct($msg)
90 $crc = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
92 $this->crc = array_shift($crc);
93 $magic = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1));
94 $this->magic = array_shift($magic);
96 $attr = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1));
97 $this->attribute = array_shift($attr);
99 $keyLen = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
100 $keyLen = array_shift($keyLen);
102 if ($keyLen > 0 && $keyLen != 0xFFFFFFFF) {
103 $this->key = substr($msg, $offset, $keyLen);
106 $messageSize = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
107 $messageSize = array_shift($messageSize);
110 $this->value = substr($msg, $offset, $messageSize);
115 // {{{ public function getMessage()
121 * @return string (raw)
123 public function getMessage()
129 // {{{ public function getMessageKey()
135 * @return string (raw)
137 public function getMessageKey()
143 // {{{ public function __toString()
149 * @return string (raw)
151 public function __toString()