]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php
MediaWiki 1.30.2-scripts2
[autoinstalls/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Protocol / Encoder.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka\Protocol;
16
17 /**
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
21 *
22 * @package
23 * @version $_SWANBR_VERSION_$
24 * @copyright Copyleft
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
27 */
28
29 class Encoder extends Protocol
30 {
31     // {{{ functions
32     // {{{ public function produceRequest()
33
34     /**
35      * produce request
36      *
37      * @param array $payloads
38      * @param int $compression
39      * @return int
40      * @access public
41      */
42     public function produceRequest($payloads, $compression = self::COMPRESSION_NONE)
43     {
44         if (!isset($payloads['data'])) {
45             throw new \Kafka\Exception\Protocol('given procude data invalid. `data` is undefined.');
46         }
47
48         if (!isset($payloads['required_ack'])) {
49             // default server will not send any response
50             // (this is the only case where the server will not reply to a request)
51             $payloads['required_ack'] = 0;
52         }
53
54         if (!isset($payloads['timeout'])) {
55             $payloads['timeout'] = 100; // default timeout 100ms
56         }
57
58         $header = self::requestHeader('kafka-php', 0, self::PRODUCE_REQUEST);
59         $data   = self::pack(self::BIT_B16, $payloads['required_ack']);
60         $data  .= self::pack(self::BIT_B32, $payloads['timeout']);
61         $data  .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeProcudeTopic'), $compression);
62         $data   = self::encodeString($header . $data, self::PACK_INT32);
63
64         return $this->stream->write($data);
65     }
66
67     // }}}
68     // {{{ public function metadataRequest()
69
70     /**
71      * build metadata request protocol
72      *
73      * @param array $topics
74      * @access public
75      * @return string
76      */
77     public function metadataRequest($topics)
78     {
79         if (!is_array($topics)) {
80             $topics = array($topics);
81         }
82
83         foreach ($topics as $topic) {
84             if (!is_string($topic)) {
85                 throw new \Kafka\Exception\Protocol('request metadata topic array have invalid value. ');
86             }
87         }
88
89         $header = self::requestHeader('kafka-php', 0, self::METADATA_REQUEST);
90         $data   = self::encodeArray($topics, array(__CLASS__, 'encodeString'), self::PACK_INT16);
91         $data   = self::encodeString($header . $data, self::PACK_INT32);
92
93         return $this->stream->write($data);
94     }
95
96     // }}}
97     // {{{ public function fetchRequest()
98
99     /**
100      * build fetch request
101      *
102      * @param array $payloads
103      * @access public
104      * @return string
105      */
106     public function fetchRequest($payloads)
107     {
108         if (!isset($payloads['data'])) {
109             throw new \Kafka\Exception\Protocol('given fetch kafka data invalid. `data` is undefined.');
110         }
111
112         if (!isset($payloads['replica_id'])) {
113             $payloads['replica_id'] = -1;
114         }
115
116         if (!isset($payloads['max_wait_time'])) {
117             $payloads['max_wait_time'] = 100; // default timeout 100ms
118         }
119
120         if (!isset($payloads['min_bytes'])) {
121             $payloads['min_bytes'] = 64 * 1024; // 64k
122         }
123
124         $header = self::requestHeader('kafka-php', 0, self::FETCH_REQUEST);
125         $data   = self::pack(self::BIT_B32, $payloads['replica_id']);
126         $data  .= self::pack(self::BIT_B32, $payloads['max_wait_time']);
127         $data  .= self::pack(self::BIT_B32, $payloads['min_bytes']);
128         $data  .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchTopic'));
129         $data   = self::encodeString($header . $data, self::PACK_INT32);
130
131         return $this->stream->write($data);
132     }
133
134     // }}}
135     // {{{ public function offsetRequest()
136
137     /**
138      * build offset request
139      *
140      * @param array $payloads
141      * @access public
142      * @return string
143      */
144     public function offsetRequest($payloads)
145     {
146         if (!isset($payloads['data'])) {
147             throw new \Kafka\Exception\Protocol('given offset data invalid. `data` is undefined.');
148         }
149
150         if (!isset($payloads['replica_id'])) {
151             $payloads['replica_id'] = -1;
152         }
153
154         $header = self::requestHeader('kafka-php', 0, self::OFFSET_REQUEST);
155         $data   = self::pack(self::BIT_B32, $payloads['replica_id']);
156         $data  .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeOffsetTopic'));
157         $data   = self::encodeString($header . $data, self::PACK_INT32);
158
159         return $this->stream->write($data);
160     }
161
162     // }}}
163     // {{{ public function commitOffsetRequest()
164
165     /**
166      * build consumer commit offset request
167      *
168      * @param array $payloads
169      * @access public
170      * @return string
171      */
172     public function commitOffsetRequest($payloads)
173     {
174         if (!isset($payloads['data'])) {
175             throw new \Kafka\Exception\Protocol('given commit offset data invalid. `data` is undefined.');
176         }
177
178         if (!isset($payloads['group_id'])) {
179             throw new \Kafka\Exception\Protocol('given commit offset data invalid. `group_id` is undefined.');
180         }
181
182         $header = self::requestHeader('kafka-php', 0, self::OFFSET_COMMIT_REQUEST);
183         $data   = self::encodeString($payloads['group_id'], self::PACK_INT16);
184         $data  .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeCommitOffset'));
185         $data   = self::encodeString($header . $data, self::PACK_INT32);
186
187         return $this->stream->write($data);
188     }
189
190     // }}}
191     // {{{ public function fetchOffsetRequest()
192
193     /**
194      * build consumer fetch offset request
195      *
196      * @param array $payloads
197      * @access public
198      * @return string
199      */
200     public function fetchOffsetRequest($payloads)
201     {
202         if (!isset($payloads['data'])) {
203             throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `data` is undefined.');
204         }
205
206         if (!isset($payloads['group_id'])) {
207             throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `group_id` is undefined.');
208         }
209
210         $header = self::requestHeader('kafka-php', 0, self::OFFSET_FETCH_REQUEST);
211         $data   = self::encodeString($payloads['group_id'], self::PACK_INT16);
212         $data  .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchOffset'));
213         $data   = self::encodeString($header . $data, self::PACK_INT32);
214
215         return $this->stream->write($data);
216     }
217
218     // }}}
219     // {{{ public static function encodeString()
220
221     /**
222      * encode pack string type
223      *
224      * @param string $string
225      * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order.
226      * @param int $compression
227      * @return string
228      * @static
229      * @access public
230      */
231     public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE)
232     {
233         $packLen = ($bytes == self::PACK_INT32) ? self::BIT_B32 : self::BIT_B16;
234         switch ($compression) {
235             case self::COMPRESSION_NONE:
236                 break;
237             case self::COMPRESSION_GZIP:
238                 $string = gzencode($string);
239                 break;
240             case self::COMPRESSION_SNAPPY:
241                 throw new \Kafka\Exception\NotSupported('SNAPPY compression not yet implemented');
242             default:
243                 throw new \Kafka\Exception\NotSupported('Unknown compression flag: ' . $compression);
244         }
245         return self::pack($packLen, strlen($string)) . $string;
246     }
247
248     // }}}
249     // {{{ public static function encodeArray()
250
251     /**
252      * encode key array
253      *
254      * @param array $array
255      * @param Callable $func
256      * @param null $options
257      * @return string
258      * @static
259      * @access public
260      */
261     public static function encodeArray(array $array, $func, $options = null)
262     {
263         if (!is_callable($func, false)) {
264             throw new \Kafka\Exception\Protocol('Encode array failed, given function is not callable.');
265         }
266
267         $arrayCount = count($array);
268
269         $body = '';
270         foreach ($array as $value) {
271             if (!is_null($options)) {
272                 $body .= call_user_func($func, $value, $options);
273             } else {
274                 $body .= call_user_func($func, $value);
275             }
276         }
277
278         return self::pack(self::BIT_B32, $arrayCount) . $body;
279     }
280
281     // }}}
282     // {{{ public static function encodeMessageSet()
283
284     /**
285      * encode message set
286      * N.B., MessageSets are not preceded by an int32 like other array elements
287      * in the protocol.
288      *
289      * @param array $messages
290      * @param int $compression
291      * @return string
292      * @static
293      * @access public
294      */
295     public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE)
296     {
297         if (!is_array($messages)) {
298             $messages = array($messages);
299         }
300
301         $data = '';
302         foreach ($messages as $message) {
303             $tmpMessage = self::_encodeMessage($message, $compression);
304
305             // int64 -- message offset     Message
306             $data .= self::pack(self::BIT_B64, 0) . self::encodeString($tmpMessage, self::PACK_INT32);
307         }
308         return $data;
309     }
310
311     // }}}
312     // {{{ public static function requestHeader()
313
314     /**
315      * get request header
316      *
317      * @param string $clientId
318      * @param integer $correlationId
319      * @param integer $apiKey
320      * @static
321      * @access public
322      * @return string
323      */
324     public static function requestHeader($clientId, $correlationId, $apiKey)
325     {
326         // int16 -- apiKey int16 -- apiVersion int32 correlationId
327         $binData  = self::pack(self::BIT_B16, $apiKey);
328         $binData .= self::pack(self::BIT_B16, self::API_VERSION);
329         $binData .= self::pack(self::BIT_B32, $correlationId);
330
331         // concat client id
332         $binData .= self::encodeString($clientId, self::PACK_INT16);
333
334         return $binData;
335     }
336
337     // }}}
338     // {{{ protected static function _encodeMessage()
339
340     /**
341      * encode signal message
342      *
343      * @param string $message
344      * @param int $compression
345      * @return string
346      * @static
347      * @access protected
348      */
349     protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE)
350     {
351         // int8 -- magic  int8 -- attribute
352         $data  = self::pack(self::BIT_B8, self::MESSAGE_MAGIC);
353         $data .= self::pack(self::BIT_B8, $compression);
354
355         // message key
356         $data .= self::encodeString('', self::PACK_INT32);
357
358         // message value
359         $data .= self::encodeString($message, self::PACK_INT32, $compression);
360
361         $crc = crc32($data);
362
363         // int32 -- crc code  string data
364         $message = self::pack(self::BIT_B32, $crc) . $data;
365
366         return $message;
367     }
368
369     // }}}
370     // {{{ protected static function _encodeProcudePartion()
371
372     /**
373      * encode signal part
374      *
375      * @param $values
376      * @param $compression
377      * @return string
378      * @internal param $partions
379      * @static
380      * @access protected
381      */
382     protected static function _encodeProcudePartion($values, $compression)
383     {
384         if (!isset($values['partition_id'])) {
385             throw new \Kafka\Exception\Protocol('given produce data invalid. `partition_id` is undefined.');
386         }
387
388         if (!isset($values['messages']) || empty($values['messages'])) {
389             throw new \Kafka\Exception\Protocol('given produce data invalid. `messages` is undefined.');
390         }
391
392         $data = self::pack(self::BIT_B32, $values['partition_id']);
393         $data .= self::encodeString(self::encodeMessageSet($values['messages'], $compression), self::PACK_INT32);
394
395         return $data;
396     }
397
398     // }}}
399     // {{{ protected static function _encodeProcudeTopic()
400
401     /**
402      * encode signal topic
403      *
404      * @param $values
405      * @param $compression
406      * @return string
407      * @internal param $partions
408      * @static
409      * @access protected
410      */
411     protected static function _encodeProcudeTopic($values, $compression)
412     {
413         if (!isset($values['topic_name'])) {
414             throw new \Kafka\Exception\Protocol('given produce data invalid. `topic_name` is undefined.');
415         }
416
417         if (!isset($values['partitions']) || empty($values['partitions'])) {
418             throw new \Kafka\Exception\Protocol('given produce data invalid. `partitions` is undefined.');
419         }
420
421         $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
422         $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeProcudePartion'), $compression);
423
424         return $topic . $partitions;
425     }
426
427     // }}}
428     // {{{ protected static function _encodeFetchPartion()
429
430     /**
431      * encode signal part
432      *
433      * @param partions
434      * @static
435      * @access protected
436      * @return string
437      */
438     protected static function _encodeFetchPartion($values)
439     {
440         if (!isset($values['partition_id'])) {
441             throw new \Kafka\Exception\Protocol('given fetch data invalid. `partition_id` is undefined.');
442         }
443
444         if (!isset($values['offset'])) {
445             $values['offset'] = 0;
446         }
447
448         if (!isset($values['max_bytes'])) {
449             $values['max_bytes'] = 100 * 1024 * 1024;
450         }
451
452         $data = self::pack(self::BIT_B32, $values['partition_id']);
453         $data .= self::pack(self::BIT_B64, $values['offset']);
454         $data .= self::pack(self::BIT_B32, $values['max_bytes']);
455
456         return $data;
457     }
458
459     // }}}
460     // {{{ protected static function _encodeFetchTopic()
461
462     /**
463      * encode signal topic
464      *
465      * @param partions
466      * @static
467      * @access protected
468      * @return string
469      */
470     protected static function _encodeFetchTopic($values)
471     {
472         if (!isset($values['topic_name'])) {
473             throw new \Kafka\Exception\Protocol('given fetch data invalid. `topic_name` is undefined.');
474         }
475
476         if (!isset($values['partitions']) || empty($values['partitions'])) {
477             throw new \Kafka\Exception\Protocol('given fetch data invalid. `partitions` is undefined.');
478         }
479
480         $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
481         $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchPartion'));
482
483         return $topic . $partitions;
484     }
485
486     // }}}
487     // {{{ protected static function _encodeOffsetPartion()
488
489     /**
490      * encode signal part
491      *
492      * @param partions
493      * @static
494      * @access protected
495      * @return string
496      */
497     protected static function _encodeOffsetPartion($values)
498     {
499         if (!isset($values['partition_id'])) {
500             throw new \Kafka\Exception\Protocol('given offset data invalid. `partition_id` is undefined.');
501         }
502
503         if (!isset($values['time'])) {
504             $values['time'] = -1; // -1
505         }
506
507         if (!isset($values['max_offset'])) {
508             $values['max_offset'] = 100000;
509         }
510
511         $data = self::pack(self::BIT_B32, $values['partition_id']);
512         $data .= self::pack(self::BIT_B64, $values['time']);
513         $data .= self::pack(self::BIT_B32, $values['max_offset']);
514
515         return $data;
516     }
517
518     // }}}
519     // {{{ protected static function _encodeOffsetTopic()
520
521     /**
522      * encode signal topic
523      *
524      * @param partions
525      * @static
526      * @access protected
527      * @return string
528      */
529     protected static function _encodeOffsetTopic($values)
530     {
531         if (!isset($values['topic_name'])) {
532             throw new \Kafka\Exception\Protocol('given offset data invalid. `topic_name` is undefined.');
533         }
534
535         if (!isset($values['partitions']) || empty($values['partitions'])) {
536             throw new \Kafka\Exception\Protocol('given offset data invalid. `partitions` is undefined.');
537         }
538
539         $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
540         $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeOffsetPartion'));
541
542         return $topic . $partitions;
543     }
544
545     // }}}
546     // {{{ protected static function _encodeCommitOffsetPartion()
547
548     /**
549      * encode signal part
550      *
551      * @param partions
552      * @static
553      * @access protected
554      * @return string
555      */
556     protected static function _encodeCommitOffsetPartion($values)
557     {
558         if (!isset($values['partition_id'])) {
559             throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partition_id` is undefined.');
560         }
561
562         if (!isset($values['offset'])) {
563             throw new \Kafka\Exception\Protocol('given commit offset data invalid. `offset` is undefined.');
564         }
565
566         if (!isset($values['time'])) {
567             $values['time'] = -1;
568         }
569
570         if (!isset($values['metadata'])) {
571             $values['metadata'] = 'm';
572         }
573
574         $data = self::pack(self::BIT_B32, $values['partition_id']);
575         $data .= self::pack(self::BIT_B64, $values['offset']);
576         $data .= self::pack(self::BIT_B64, $values['time']);
577         $data .= self::encodeString($values['metadata'], self::PACK_INT16);
578
579         return $data;
580     }
581
582     // }}}
583     // {{{ protected static function _encodeCommitOffset()
584
585     /**
586      * encode signal topic
587      *
588      * @param partions
589      * @static
590      * @access protected
591      * @return string
592      */
593     protected static function _encodeCommitOffset($values)
594     {
595         if (!isset($values['topic_name'])) {
596             throw new \Kafka\Exception\Protocol('given commit offset data invalid. `topic_name` is undefined.');
597         }
598
599         if (!isset($values['partitions']) || empty($values['partitions'])) {
600             throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partitions` is undefined.');
601         }
602
603         $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
604         $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeCommitOffsetPartion'));
605
606         return $topic . $partitions;
607     }
608
609     // }}}
610     // {{{ protected static function _encodeFetchOffsetPartion()
611
612     /**
613      * encode signal part
614      *
615      * @param partions
616      * @static
617      * @access protected
618      * @return string
619      */
620     protected static function _encodeFetchOffsetPartion($values)
621     {
622         if (!isset($values['partition_id'])) {
623             throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partition_id` is undefined.');
624         }
625
626         $data = self::pack(self::BIT_B32, $values['partition_id']);
627
628         return $data;
629     }
630
631     // }}}
632     // {{{ protected static function _encodeFetchOffset()
633
634     /**
635      * encode signal topic
636      *
637      * @param partions
638      * @static
639      * @access protected
640      * @return string
641      */
642     protected static function _encodeFetchOffset($values)
643     {
644         if (!isset($values['topic_name'])) {
645             throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `topic_name` is undefined.');
646         }
647
648         if (!isset($values['partitions']) || empty($values['partitions'])) {
649             throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partitions` is undefined.');
650         }
651
652         $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
653         $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchOffsetPartion'));
654
655         return $topic . $partitions;
656     }
657
658     // }}}
659     // }}}
660 }