]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/Client.php
MediaWiki 1.30.2-scripts2
[autoinstalls/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Client.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka;
16
17 /**
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
21 *
22 * @package
23 * @version $_SWANBR_VERSION_$
24 * @copyright Copyleft
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
27 */
28
29 class Client
30 {
31     // {{{ consts
32     // }}}
33     // {{{ members
34
35     /**
36      * cluster metadata
37      *
38      * @var \Kafka\ClusterMetaData
39      * @access private
40      */
41     private $metadata = null;
42
43     /**
44      * broker host list
45      *
46      * @var array
47      * @access private
48      */
49     private $hostList = array();
50
51     /**
52      * save broker connection
53      *
54      * @var array
55      * @access private
56      */
57     private static $stream = array();
58
59     /**
60      * default stream options
61      *
62      * @var array
63      * @access private
64      */
65     private $streamOptions = array(
66         'RecvTimeoutSec' => 0,
67         'RecvTimeoutUsec' => 750000,
68         'SendTimeoutSec' => 0,
69         'SendTimeoutUsec' => 100000,
70     );
71
72     // }}}
73     // {{{ functions
74     // {{{ public function __construct()
75
76     /**
77      * __construct
78      *
79      * @access public
80      * @param ClusterMetaData $metadata
81      */
82     public function __construct(ClusterMetaData $metadata)
83     {
84         $this->metadata = $metadata;
85         if (method_exists($metadata, 'setClient')) {
86             $this->metadata->setClient($this);
87         }
88     }
89
90     /**
91      * update stream options
92      *
93      * @param array $options
94      */
95     public function setStreamOptions($options = array())
96     {
97         // Merge the arrays
98         $this->streamOptions = array_merge($this->streamOptions, $options);
99         $this->updateStreamOptions();
100     }
101
102     /**
103      * @access public
104      * @param $name - name of stream option
105      * @param $value - value for option
106      */
107     public function setStreamOption($name, $value)
108     {
109         $this->streamOptions[$name] = $value;
110         $this->updateStreamOptions();
111     }
112
113     /**
114      * @access public
115      * @param $name - name of option
116      * @return mixed
117      */
118     public function getStreamOption($name)
119     {
120         if (array_key_exists($name, $this->streamOptions)) {
121             return $this->streamOptions[$name];
122         }
123         return null;
124     }
125
126     /**
127      * @access private
128      */
129     private function updateStreamOptions()
130     {
131         // Loop thru each stream
132         foreach (self::$stream as $host => $streams) {
133             foreach ($streams as $key => $info) {
134                 // Update options
135                 if (isset($info['stream'])) {
136                     /** @var \Kafka\Socket $stream */
137                     $stream = $info['stream'];
138                     $stream->setRecvTimeoutSec($this->streamOptions['RecvTimeoutSec']);
139                     $stream->setRecvTimeoutUsec($this->streamOptions['SendTimeoutUsec']);
140                     $stream->setSendTimeoutSec($this->streamOptions['SendTimeoutSec']);
141                     $stream->setSendTimeoutUsec($this->streamOptions['SendTimeoutUsec']);
142                 }
143             }
144         }
145     }
146
147     // }}}
148     // {{{ public function getBrokers()
149
150     /**
151      * get broker server
152      *
153      * @access public
154      * @return array
155      */
156     public function getBrokers()
157     {
158         if (empty($this->hostList)) {
159             $brokerList = $this->metadata->listBrokers();
160             foreach ($brokerList as $brokerId => $info) {
161                 if (!isset($info['host']) || !isset($info['port'])) {
162                     continue;
163                 }
164                 $this->hostList[$brokerId] = $info['host'] . ':' . $info['port'];
165             }
166         }
167
168         return $this->hostList;
169     }
170
171     // }}}
172     // {{{ public function getHostByPartition()
173
174     /**
175      * get broker host by topic partition
176      *
177      * @param string $topicName
178      * @param int $partitionId
179      * @access public
180      * @return string
181      */
182     public function getHostByPartition($topicName, $partitionId = 0)
183     {
184         $partitionInfo = $this->metadata->getPartitionState($topicName, $partitionId);
185         if (!$partitionInfo) {
186             throw new \Kafka\Exception('topic:' . $topicName . ', partition id: ' . $partitionId . ' is not exists.');
187         }
188
189         $hostList = $this->getBrokers();
190         if (isset($partitionInfo['leader']) && isset($hostList[$partitionInfo['leader']])) {
191             return $hostList[$partitionInfo['leader']];
192         } else {
193             throw new \Kafka\Exception('can\'t find broker host.');
194         }
195     }
196
197     // }}}
198     // {{{ public function getZooKeeper()
199
200     /**
201      * get kafka zookeeper object
202      *
203      * @access public
204      * @return \Kafka\ZooKeeper
205      */
206     public function getZooKeeper()
207     {
208         if ($this->metadata instanceof \Kafka\ZooKeeper) {
209                 return $this->metadata;
210         } else {
211                 throw new \Kafka\Exception( 'ZooKeeper was not provided' );
212         }
213     }
214
215     // }}}
216     // {{{ public function getStream()
217
218     /**
219      * get broker broker connect
220      *
221      * @param string $host
222      * @param null $lockKey
223      * @return array
224      * @access private
225      */
226     public function getStream($host, $lockKey = null)
227     {
228         if (!$lockKey) {
229             $lockKey = uniqid($host);
230         }
231
232         list($hostname, $port) = explode(':', $host);
233         // find unlock stream
234         if (isset(self::$stream[$host])) {
235             foreach (self::$stream[$host] as $key => $info) {
236                 if ($info['locked']) {
237                     continue;
238                 } else {
239                     self::$stream[$host][$key]['locked'] = true;
240                     $info['stream']->connect();
241                     return array('key' => $key, 'stream' => $info['stream']);
242                 }
243             }
244         }
245
246         // no idle stream
247         $stream = new \Kafka\Socket($hostname, $port, $this->getStreamOption('RecvTimeoutSec'), $this->getStreamOption('RecvTimeoutUsec'), $this->getStreamOption('SendTimeoutSec'), $this->getStreamOption('SendTimeoutUsec'));
248         $stream->connect();
249         self::$stream[$host][$lockKey] = array(
250             'locked' => true,
251             'stream' => $stream,
252         );
253         return array('key' => $lockKey, 'stream' => $stream);
254     }
255
256     // }}}
257     // {{{ public function freeStream()
258
259     /**
260      * free stream pool
261      *
262      * @param string $key
263      * @access public
264      * @return void
265      */
266     public function freeStream($key)
267     {
268         foreach (self::$stream as $host => $values) {
269             if (isset($values[$key])) {
270                 self::$stream[$host][$key]['locked'] = false;
271             }
272         }
273     }
274
275     // }}}
276     // {{{ public function getTopicDetail()
277
278     /**
279      * get topic detail info
280      *
281      * @param  string $topicName
282      * @return array
283      */
284     public function getTopicDetail($topicName)
285     {
286         return $this->metadata->getTopicDetail($topicName);
287     }
288
289     // }}}
290     // }}}
291 }