]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/Produce.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Produce.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka;
16
17 /**
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
21 *
22 * @package
23 * @version $_SWANBR_VERSION_$
24 * @copyright Copyleft
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
27 */
28
29 class Produce
30 {
31     // {{{ consts
32     // }}}
33     // {{{ members
34
35     /**
36      * client
37      *
38      * @var mixed
39      * @access private
40      */
41     private $client = null;
42
43     /**
44      * send message options cache
45      *
46      * @var array
47      * @access private
48      */
49     private $payload = array();
50
51     /**
52      * default the server will not send any response
53      *
54      * @var float
55      * @access private
56      */
57     private $requiredAck = 0;
58
59     /**
60      * default timeout is 100ms
61      *
62      * @var float
63      * @access private
64      */
65     private $timeout = 100;
66
67     /**
68      * produce instance
69      *
70      * @var \Kafka\Produce
71      * @access private
72      */
73     private static $instance = null;
74
75     // }}}
76     // {{{ functions
77     // {{{ public function static getInstance()
78
79     /**
80      * set send messages
81      *
82      * @access public
83      * @param $hostList
84      * @param $timeout
85      * @param null $kafkaHostList
86      * @return Produce
87      */
88     public static function getInstance($hostList, $timeout, $kafkaHostList = null)
89     {
90         if (is_null(self::$instance)) {
91             self::$instance = new self($hostList, $timeout, $kafkaHostList);
92         }
93
94         return self::$instance;
95     }
96
97     // }}}
98     // {{{ public function __construct()
99
100     /**
101      * __construct
102      *
103      * @access public
104      * @param $hostList
105      * @param null $timeout
106      * @param null $kafkaHostList
107      */
108     public function __construct($hostList, $timeout = null, $kafkaHostList = null)
109     {
110         if ($hostList instanceof \Kafka\ClusterMetaData) {
111             $metadata = $hostList;
112         } elseif ( $kafkaHostList !== null ) {
113             $metadata = new \Kafka\MetaDataFromKafka($kafkaHostList);
114         } else {
115             $metadata = new \Kafka\ZooKeeper($hostList, $timeout);
116         }
117         $this->client = new \Kafka\Client($metadata);
118     }
119
120     // }}}
121     // {{{ public function setMessages()
122
123     /**
124      * set send messages
125      *
126      * @access public
127      * @param $topicName
128      * @param int $partitionId
129      * @param array $messages
130      * @return Produce
131      */
132     public function setMessages($topicName, $partitionId = 0, $messages = array())
133     {
134         if (isset($this->payload[$topicName][$partitionId])) {
135             $this->payload[$topicName][$partitionId] =
136                     array_merge($this->payload[$topicName][$partitionId], $messages);
137         } else {
138             $this->payload[$topicName][$partitionId] = $messages;
139         }
140
141         return $this;
142     }
143
144     // }}}
145     // {{{ public function setRequireAck()
146
147     /**
148      * set request mode
149      * This field indicates how many acknowledgements the servers should receive
150      * before responding to the request. If it is 0 the server will not send any
151      * response (this is the only case where the server will not reply to a
152      * request). If it is 1, the server will wait the data is written to the
153      * local log before sending a response. If it is -1 the server will block
154      * until the message is committed by all in sync replicas before sending a
155      * response. For any number > 1 the server will block waiting for this
156      * number of acknowledgements to occur (but the server will never wait for
157      * more acknowledgements than there are in-sync replicas).
158      *
159      * @param int $ack
160      * @access public
161      * @return Produce
162      */
163     public function setRequireAck($ack = 0)
164     {
165         if ($ack >= -1) {
166             $this->requiredAck = (int) $ack;
167         }
168
169         return $this;
170     }
171
172     // }}}
173     // {{{ public function setTimeOut()
174
175     /**
176      * set request timeout
177      *
178      * @param int $timeout
179      * @access public
180      * @return Produce
181      */
182     public function setTimeOut($timeout = 100)
183     {
184         if ((int) $timeout) {
185             $this->timeout = (int) $timeout;
186         }
187         return $this;
188     }
189
190     // }}}
191     // {{{ public function send()
192
193     /**
194      * send message to broker
195      *
196      * @access public
197      * @return bool|array
198      */
199     public function send()
200     {
201         $data = $this->_formatPayload();
202         if (empty($data)) {
203             return false;
204         }
205
206         $responseData = array();
207         foreach ($data as $host => $requestData) {
208             $stream = $this->client->getStream($host);
209             $conn   = $stream['stream'];
210             $encoder = new \Kafka\Protocol\Encoder($conn);
211             $encoder->produceRequest($requestData);
212             if ((int) $this->requiredAck !== 0) { // get broker response
213                 $decoder = new \Kafka\Protocol\Decoder($conn);
214                 $response = $decoder->produceResponse();
215                 foreach ($response as $topicName => $info) {
216                     if (!isset($responseData[$topicName])) {
217                         $responseData[$topicName] = $info;
218                     } else {
219                         $responseData[$topicName] = array_merge($info, $responseData[$topicName]);
220                     }
221                 }
222             }
223
224             $this->client->freeStream($stream['key']);
225         }
226
227         $this->payload = array();
228         return $responseData;
229     }
230
231     // }}}
232     // {{{ public function getClient()
233
234     /**
235      * get client object
236      *
237      * @access public
238      * @return Client
239      */
240     public function getClient()
241     {
242         return $this->client;
243     }
244
245     /**
246      * passthru method to client for setting stream options
247      *
248      * @access public
249      * @param array $options
250      */
251     public function setStreamOptions($options = array())
252     {
253         $this->client->setStreamOptions($options);
254     }
255
256     // }}}
257     // {{{ public function getAvailablePartitions()
258
259     /**
260      * get available partition
261      *
262      * @access public
263      * @param $topicName
264      * @return array
265      */
266     public function getAvailablePartitions($topicName)
267     {
268         $topicDetail = $this->client->getTopicDetail($topicName);
269         if (is_array($topicDetail) && isset($topicDetail['partitions'])) {
270             $topicPartitiions = array_keys($topicDetail['partitions']);
271         } else {
272             $topicPartitiions = array();
273         }
274
275         return $topicPartitiions;
276     }
277
278     // }}}
279     // {{{ private function _formatPayload()
280
281     /**
282      * format payload array
283      *
284      * @access private
285      * @return array
286      */
287     private function _formatPayload()
288     {
289         if (empty($this->payload)) {
290             return array();
291         }
292
293         $data = array();
294         foreach ($this->payload as $topicName => $partitions) {
295             foreach ($partitions as $partitionId => $messages) {
296                 $host = $this->client->getHostByPartition($topicName, $partitionId);
297                 $data[$host][$topicName][$partitionId] = $messages;
298             }
299         }
300
301         $requestData = array();
302         foreach ($data as $host => $info) {
303             $topicData = array();
304             foreach ($info as $topicName => $partitions) {
305                 $partitionData = array();
306                 foreach ($partitions as $partitionId => $messages) {
307                     $partitionData[] = array(
308                         'partition_id' => $partitionId,
309                         'messages'     => $messages,
310                     );
311                 }
312                 $topicData[] = array(
313                     'topic_name' => $topicName,
314                     'partitions' => $partitionData,
315                 );
316             }
317
318             $requestData[$host] = array(
319                 'required_ack' => $this->requiredAck,
320                 'timeout'      => $this->timeout,
321                 'data' => $topicData,
322             );
323         }
324
325        return $requestData;
326     }
327
328     // }}}
329     // }}}
330 }