2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
15 namespace Kafka\Protocol;
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
23 * @version $_SWANBR_VERSION_$
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
29 class Encoder extends Protocol
32 // {{{ public function produceRequest()
37 * @param array $payloads
38 * @param int $compression
42 public function produceRequest($payloads, $compression = self::COMPRESSION_NONE)
44 if (!isset($payloads['data'])) {
45 throw new \Kafka\Exception\Protocol('given procude data invalid. `data` is undefined.');
48 if (!isset($payloads['required_ack'])) {
49 // default server will not send any response
50 // (this is the only case where the server will not reply to a request)
51 $payloads['required_ack'] = 0;
54 if (!isset($payloads['timeout'])) {
55 $payloads['timeout'] = 100; // default timeout 100ms
58 $header = self::requestHeader('kafka-php', 0, self::PRODUCE_REQUEST);
59 $data = self::pack(self::BIT_B16, $payloads['required_ack']);
60 $data .= self::pack(self::BIT_B32, $payloads['timeout']);
61 $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeProcudeTopic'), $compression);
62 $data = self::encodeString($header . $data, self::PACK_INT32);
64 return $this->stream->write($data);
68 // {{{ public function metadataRequest()
71 * build metadata request protocol
73 * @param array $topics
77 public function metadataRequest($topics)
79 if (!is_array($topics)) {
80 $topics = array($topics);
83 foreach ($topics as $topic) {
84 if (!is_string($topic)) {
85 throw new \Kafka\Exception\Protocol('request metadata topic array have invalid value. ');
89 $header = self::requestHeader('kafka-php', 0, self::METADATA_REQUEST);
90 $data = self::encodeArray($topics, array(__CLASS__, 'encodeString'), self::PACK_INT16);
91 $data = self::encodeString($header . $data, self::PACK_INT32);
93 return $this->stream->write($data);
97 // {{{ public function fetchRequest()
100 * build fetch request
102 * @param array $payloads
106 public function fetchRequest($payloads)
108 if (!isset($payloads['data'])) {
109 throw new \Kafka\Exception\Protocol('given fetch kafka data invalid. `data` is undefined.');
112 if (!isset($payloads['replica_id'])) {
113 $payloads['replica_id'] = -1;
116 if (!isset($payloads['max_wait_time'])) {
117 $payloads['max_wait_time'] = 100; // default timeout 100ms
120 if (!isset($payloads['min_bytes'])) {
121 $payloads['min_bytes'] = 64 * 1024; // 64k
124 $header = self::requestHeader('kafka-php', 0, self::FETCH_REQUEST);
125 $data = self::pack(self::BIT_B32, $payloads['replica_id']);
126 $data .= self::pack(self::BIT_B32, $payloads['max_wait_time']);
127 $data .= self::pack(self::BIT_B32, $payloads['min_bytes']);
128 $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchTopic'));
129 $data = self::encodeString($header . $data, self::PACK_INT32);
131 return $this->stream->write($data);
135 // {{{ public function offsetRequest()
138 * build offset request
140 * @param array $payloads
144 public function offsetRequest($payloads)
146 if (!isset($payloads['data'])) {
147 throw new \Kafka\Exception\Protocol('given offset data invalid. `data` is undefined.');
150 if (!isset($payloads['replica_id'])) {
151 $payloads['replica_id'] = -1;
154 $header = self::requestHeader('kafka-php', 0, self::OFFSET_REQUEST);
155 $data = self::pack(self::BIT_B32, $payloads['replica_id']);
156 $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeOffsetTopic'));
157 $data = self::encodeString($header . $data, self::PACK_INT32);
159 return $this->stream->write($data);
163 // {{{ public function commitOffsetRequest()
166 * build consumer commit offset request
168 * @param array $payloads
172 public function commitOffsetRequest($payloads)
174 if (!isset($payloads['data'])) {
175 throw new \Kafka\Exception\Protocol('given commit offset data invalid. `data` is undefined.');
178 if (!isset($payloads['group_id'])) {
179 throw new \Kafka\Exception\Protocol('given commit offset data invalid. `group_id` is undefined.');
182 $header = self::requestHeader('kafka-php', 0, self::OFFSET_COMMIT_REQUEST);
183 $data = self::encodeString($payloads['group_id'], self::PACK_INT16);
184 $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeCommitOffset'));
185 $data = self::encodeString($header . $data, self::PACK_INT32);
187 return $this->stream->write($data);
191 // {{{ public function fetchOffsetRequest()
194 * build consumer fetch offset request
196 * @param array $payloads
200 public function fetchOffsetRequest($payloads)
202 if (!isset($payloads['data'])) {
203 throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `data` is undefined.');
206 if (!isset($payloads['group_id'])) {
207 throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `group_id` is undefined.');
210 $header = self::requestHeader('kafka-php', 0, self::OFFSET_FETCH_REQUEST);
211 $data = self::encodeString($payloads['group_id'], self::PACK_INT16);
212 $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchOffset'));
213 $data = self::encodeString($header . $data, self::PACK_INT32);
215 return $this->stream->write($data);
219 // {{{ public static function encodeString()
222 * encode pack string type
224 * @param string $string
225 * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order.
226 * @param int $compression
231 public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE)
233 $packLen = ($bytes == self::PACK_INT32) ? self::BIT_B32 : self::BIT_B16;
234 switch ($compression) {
235 case self::COMPRESSION_NONE:
237 case self::COMPRESSION_GZIP:
238 $string = gzencode($string);
240 case self::COMPRESSION_SNAPPY:
241 throw new \Kafka\Exception\NotSupported('SNAPPY compression not yet implemented');
243 throw new \Kafka\Exception\NotSupported('Unknown compression flag: ' . $compression);
245 return self::pack($packLen, strlen($string)) . $string;
249 // {{{ public static function encodeArray()
254 * @param array $array
255 * @param Callable $func
256 * @param null $options
261 public static function encodeArray(array $array, $func, $options = null)
263 if (!is_callable($func, false)) {
264 throw new \Kafka\Exception\Protocol('Encode array failed, given function is not callable.');
267 $arrayCount = count($array);
270 foreach ($array as $value) {
271 if (!is_null($options)) {
272 $body .= call_user_func($func, $value, $options);
274 $body .= call_user_func($func, $value);
278 return self::pack(self::BIT_B32, $arrayCount) . $body;
282 // {{{ public static function encodeMessageSet()
286 * N.B., MessageSets are not preceded by an int32 like other array elements
289 * @param array $messages
290 * @param int $compression
295 public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE)
297 if (!is_array($messages)) {
298 $messages = array($messages);
302 foreach ($messages as $message) {
303 $tmpMessage = self::_encodeMessage($message, $compression);
305 // int64 -- message offset Message
306 $data .= self::pack(self::BIT_B64, 0) . self::encodeString($tmpMessage, self::PACK_INT32);
312 // {{{ public static function requestHeader()
317 * @param string $clientId
318 * @param integer $correlationId
319 * @param integer $apiKey
324 public static function requestHeader($clientId, $correlationId, $apiKey)
326 // int16 -- apiKey int16 -- apiVersion int32 correlationId
327 $binData = self::pack(self::BIT_B16, $apiKey);
328 $binData .= self::pack(self::BIT_B16, self::API_VERSION);
329 $binData .= self::pack(self::BIT_B32, $correlationId);
332 $binData .= self::encodeString($clientId, self::PACK_INT16);
338 // {{{ protected static function _encodeMessage()
341 * encode signal message
343 * @param string $message
344 * @param int $compression
349 protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE)
351 // int8 -- magic int8 -- attribute
352 $data = self::pack(self::BIT_B8, self::MESSAGE_MAGIC);
353 $data .= self::pack(self::BIT_B8, $compression);
356 $data .= self::encodeString('', self::PACK_INT32);
359 $data .= self::encodeString($message, self::PACK_INT32, $compression);
363 // int32 -- crc code string data
364 $message = self::pack(self::BIT_B32, $crc) . $data;
370 // {{{ protected static function _encodeProcudePartion()
376 * @param $compression
378 * @internal param $partions
382 protected static function _encodeProcudePartion($values, $compression)
384 if (!isset($values['partition_id'])) {
385 throw new \Kafka\Exception\Protocol('given produce data invalid. `partition_id` is undefined.');
388 if (!isset($values['messages']) || empty($values['messages'])) {
389 throw new \Kafka\Exception\Protocol('given produce data invalid. `messages` is undefined.');
392 $data = self::pack(self::BIT_B32, $values['partition_id']);
393 $data .= self::encodeString(self::encodeMessageSet($values['messages'], $compression), self::PACK_INT32);
399 // {{{ protected static function _encodeProcudeTopic()
402 * encode signal topic
405 * @param $compression
407 * @internal param $partions
411 protected static function _encodeProcudeTopic($values, $compression)
413 if (!isset($values['topic_name'])) {
414 throw new \Kafka\Exception\Protocol('given produce data invalid. `topic_name` is undefined.');
417 if (!isset($values['partitions']) || empty($values['partitions'])) {
418 throw new \Kafka\Exception\Protocol('given produce data invalid. `partitions` is undefined.');
421 $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
422 $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeProcudePartion'), $compression);
424 return $topic . $partitions;
428 // {{{ protected static function _encodeFetchPartion()
438 protected static function _encodeFetchPartion($values)
440 if (!isset($values['partition_id'])) {
441 throw new \Kafka\Exception\Protocol('given fetch data invalid. `partition_id` is undefined.');
444 if (!isset($values['offset'])) {
445 $values['offset'] = 0;
448 if (!isset($values['max_bytes'])) {
449 $values['max_bytes'] = 100 * 1024 * 1024;
452 $data = self::pack(self::BIT_B32, $values['partition_id']);
453 $data .= self::pack(self::BIT_B64, $values['offset']);
454 $data .= self::pack(self::BIT_B32, $values['max_bytes']);
460 // {{{ protected static function _encodeFetchTopic()
463 * encode signal topic
470 protected static function _encodeFetchTopic($values)
472 if (!isset($values['topic_name'])) {
473 throw new \Kafka\Exception\Protocol('given fetch data invalid. `topic_name` is undefined.');
476 if (!isset($values['partitions']) || empty($values['partitions'])) {
477 throw new \Kafka\Exception\Protocol('given fetch data invalid. `partitions` is undefined.');
480 $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
481 $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchPartion'));
483 return $topic . $partitions;
487 // {{{ protected static function _encodeOffsetPartion()
497 protected static function _encodeOffsetPartion($values)
499 if (!isset($values['partition_id'])) {
500 throw new \Kafka\Exception\Protocol('given offset data invalid. `partition_id` is undefined.');
503 if (!isset($values['time'])) {
504 $values['time'] = -1; // -1
507 if (!isset($values['max_offset'])) {
508 $values['max_offset'] = 100000;
511 $data = self::pack(self::BIT_B32, $values['partition_id']);
512 $data .= self::pack(self::BIT_B64, $values['time']);
513 $data .= self::pack(self::BIT_B32, $values['max_offset']);
519 // {{{ protected static function _encodeOffsetTopic()
522 * encode signal topic
529 protected static function _encodeOffsetTopic($values)
531 if (!isset($values['topic_name'])) {
532 throw new \Kafka\Exception\Protocol('given offset data invalid. `topic_name` is undefined.');
535 if (!isset($values['partitions']) || empty($values['partitions'])) {
536 throw new \Kafka\Exception\Protocol('given offset data invalid. `partitions` is undefined.');
539 $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
540 $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeOffsetPartion'));
542 return $topic . $partitions;
546 // {{{ protected static function _encodeCommitOffsetPartion()
556 protected static function _encodeCommitOffsetPartion($values)
558 if (!isset($values['partition_id'])) {
559 throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partition_id` is undefined.');
562 if (!isset($values['offset'])) {
563 throw new \Kafka\Exception\Protocol('given commit offset data invalid. `offset` is undefined.');
566 if (!isset($values['time'])) {
567 $values['time'] = -1;
570 if (!isset($values['metadata'])) {
571 $values['metadata'] = 'm';
574 $data = self::pack(self::BIT_B32, $values['partition_id']);
575 $data .= self::pack(self::BIT_B64, $values['offset']);
576 $data .= self::pack(self::BIT_B64, $values['time']);
577 $data .= self::encodeString($values['metadata'], self::PACK_INT16);
583 // {{{ protected static function _encodeCommitOffset()
586 * encode signal topic
593 protected static function _encodeCommitOffset($values)
595 if (!isset($values['topic_name'])) {
596 throw new \Kafka\Exception\Protocol('given commit offset data invalid. `topic_name` is undefined.');
599 if (!isset($values['partitions']) || empty($values['partitions'])) {
600 throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partitions` is undefined.');
603 $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
604 $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeCommitOffsetPartion'));
606 return $topic . $partitions;
610 // {{{ protected static function _encodeFetchOffsetPartion()
620 protected static function _encodeFetchOffsetPartion($values)
622 if (!isset($values['partition_id'])) {
623 throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partition_id` is undefined.');
626 $data = self::pack(self::BIT_B32, $values['partition_id']);
632 // {{{ protected static function _encodeFetchOffset()
635 * encode signal topic
642 protected static function _encodeFetchOffset($values)
644 if (!isset($values['topic_name'])) {
645 throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `topic_name` is undefined.');
648 if (!isset($values['partitions']) || empty($values['partitions'])) {
649 throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partitions` is undefined.');
652 $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
653 $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchOffsetPartion'));
655 return $topic . $partitions;