]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/Consumer.php
MediaWiki 1.30.2-scripts
[autoinstalls/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Consumer.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka;
16
17 /**
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
21 *
22 * @package
23 * @version $_SWANBR_VERSION_$
24 * @copyright Copyleft
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
27 */
28
29 class Consumer
30 {
31     // {{{ consts
32     // }}}
33     // {{{ members
34
35     /**
36      * client
37      *
38      * @var Client
39      * @access private
40      */
41     private $client = null;
42
43     /**
44      * send message options cache
45      *
46      * @var array
47      * @access private
48      */
49     private $payload = array();
50
51     /**
52      * consumer group
53      *
54      * @var string
55      * @access private
56      */
57     private $group = '';
58
59     /**
60      * from offset
61      *
62      * @var mixed
63      * @access private
64      */
65     private $fromOffset = true;
66
67     /**
68      * produce instance
69      *
70      * @var \Kafka\Produce
71      * @access private
72      */
73     private static $instance = null;
74
75     /**
76      * maxSize
77      *
78      * @var integer
79      */
80     private $maxSize = 1048576;
81
82     /**
83      * offsetStrategy
84      * @var integer
85      */
86     private $offsetStrategy = \Kafka\Offset::DEFAULT_EARLY;
87
88     // }}}
89     // {{{ functions
90     // {{{ public function static getInstance()
91
92     /**
93      * set send messages
94      *
95      * @access public
96      * @param $hostList
97      * @param null $timeout
98      * @return Consumer
99      */
100     public static function getInstance($hostList, $timeout = null)
101     {
102         if (is_null(self::$instance)) {
103             self::$instance = new self($hostList, $timeout);
104         }
105
106         return self::$instance;
107     }
108
109     // }}}
110     // {{{ private function __construct()
111
112     /**
113      * __construct
114      *
115      * @access public
116      * @param $hostList
117      * @param null $timeout
118      */
119     private function __construct($hostList, $timeout = null)
120     {
121         $zookeeper = new \Kafka\ZooKeeper($hostList, $timeout);
122         $this->client = new \Kafka\Client($zookeeper);
123     }
124
125     // }}}
126     // {{{ public function clearPayload()
127
128     /**
129      * clearPayload
130      *
131      * @access public
132      * @return void
133      */
134     public function clearPayload()
135     {
136         $this->payload = array();
137     }
138
139     // }}}
140     // {{{ public function setTopic()
141
142     /**
143      * set topic name
144      *
145      * @access public
146      * @param $topicName
147      * @param null $defaultOffset
148      * @return Consumer
149      */
150     public function setTopic($topicName, $defaultOffset = null)
151     {
152         $parts = $this->client->getTopicDetail($topicName);
153         if (!isset($parts['partitions']) || empty($parts['partitions'])) {
154             // set topic fail.
155             return $this;
156         }
157
158         foreach ($parts['partitions'] as $partId => $info) {
159             $this->setPartition($topicName, $partId, $defaultOffset);
160         }
161
162         return $this;
163     }
164
165     // }}}
166     // {{{ public function setPartition()
167
168     /**
169      * set topic partition
170      *
171      * @access public
172      * @param $topicName
173      * @param int $partitionId
174      * @param null $offset
175      * @return Consumer
176      */
177     public function setPartition($topicName, $partitionId = 0, $offset = null)
178     {
179         if (is_null($offset)) {
180             if ($this->fromOffset) {
181                 $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId);
182                 $offset = $offsetObject->getOffset($this->offsetStrategy);
183                 \Kafka\Log::log('topic name:' . $topicName . ', part:' . $partitionId . 'get offset from kafka server, offet:' . $offset, LOG_DEBUG);
184             } else {
185                 $offset = 0;
186             }
187         }
188         $this->payload[$topicName][$partitionId] = $offset;
189
190         return $this;
191     }
192
193     // }}}
194     // {{{ public function setFromOffset()
195
196     /**
197      * set whether starting offset fetch
198      *
199      * @param boolean $fromOffset
200      * @access public
201      * @return void
202      */
203     public function setFromOffset($fromOffset)
204     {
205         $this->fromOffset = (boolean) $fromOffset;
206     }
207
208     // }}}
209     // {{{ public function setMaxBytes()
210
211     /**
212      * set fetch message max bytes
213      *
214      * @param int $maxSize
215      * @access public
216      * @return void
217      */
218     public function setMaxBytes($maxSize)
219     {
220         $this->maxSize = $maxSize;
221     }
222
223     // }}}
224     // {{{ public function setGroup()
225
226     /**
227      * set consumer group
228      *
229      * @param string $group
230      * @access public
231      * @return Consumer
232      */
233     public function setGroup($group)
234     {
235         $this->group = (string) $group;
236         return $this;
237     }
238
239     // }}}
240     // {{{ public function fetch()
241
242     /**
243      * fetch message to broker
244      *
245      * @access public
246      * @return \Kafka\Protocol\Fetch\Topic|bool
247      */
248     public function fetch()
249     {
250         $data = $this->_formatPayload();
251         if (empty($data)) {
252             return false;
253         }
254
255         $streams = array();
256         foreach ($data as $host => $requestData) {
257             $connArr = $this->client->getStream($host);
258             $conn    = $connArr['stream'];
259             $encoder = new \Kafka\Protocol\Encoder($conn);
260             $encoder->fetchRequest($requestData);
261             $streams[$connArr['key']] = $conn;
262         }
263
264         $fetch = new \Kafka\Protocol\Fetch\Topic($streams, $data);
265
266         // register fetch helper
267         $freeStream = new \Kafka\Protocol\Fetch\Helper\FreeStream($this->client);
268         $freeStream->setStreams($streams);
269         \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('freeStream', $freeStream);
270
271         // register partition commit offset
272         $commitOffset = new \Kafka\Protocol\Fetch\Helper\CommitOffset($this->client);
273         $commitOffset->setGroup($this->group);
274         \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('commitOffset', $commitOffset);
275
276         $updateConsumer = new \Kafka\Protocol\Fetch\Helper\Consumer($this);
277         \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('updateConsumer', $updateConsumer);
278
279         return $fetch;
280     }
281
282     // }}}
283     // {{{ public function getClient()
284
285     /**
286      * get client object
287      *
288      * @access public
289      * @return Client
290      */
291     public function getClient()
292     {
293         return $this->client;
294     }
295
296     /**
297      * passthru method to client for setting stream options
298      *
299      * @param array $options
300      */
301     public function setStreamOptions($options = array())
302     {
303         $this->client->setStreamOptions($options);
304     }
305
306     // }}}
307     // {{{ private function _formatPayload()
308
309     /**
310      * format payload array
311      *
312      * @access private
313      * @return array
314      */
315     private function _formatPayload()
316     {
317         if (empty($this->payload)) {
318             return array();
319         }
320
321         $data = array();
322         foreach ($this->payload as $topicName => $partitions) {
323             foreach ($partitions as $partitionId => $offset) {
324                 $host = $this->client->getHostByPartition($topicName, $partitionId);
325                 $data[$host][$topicName][$partitionId] = $offset;
326             }
327         }
328
329         $requestData = array();
330         foreach ($data as $host => $info) {
331             $topicData = array();
332             foreach ($info as $topicName => $partitions) {
333                 $partitionData = array();
334                 foreach ($partitions as $partitionId => $offset) {
335                     $partitionData[] = array(
336                         'partition_id' => $partitionId,
337                         'offset'       => $offset,
338                         'max_bytes'    => $this->maxSize,
339                     );
340                 }
341                 $topicData[] = array(
342                     'topic_name' => $topicName,
343                     'partitions' => $partitionData,
344                 );
345             }
346
347             $requestData[$host] = array(
348                 'data' => $topicData,
349             );
350         }
351
352        return $requestData;
353     }
354
355     /**
356      * const LAST_OFFSET = -1;
357      * const EARLIEST_OFFSET = -2;
358      * const DEFAULT_LAST  = -2;
359      * const DEFAULT_EARLY = -1;
360      * @param int $offsetStrategy
361      */
362     public function setOffsetStrategy($offsetStrategy)
363     {
364         $this->offsetStrategy = $offsetStrategy;
365     }
366
367     // }}}
368     // }}}
369 }