]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php
MediaWiki 1.30.2-scripts
[autoinstallsdev/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / ZooKeeper.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka;
16
17 /**
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
21 *
22 * @package
23 * @version $_SWANBR_VERSION_$
24 * @copyright Copyleft
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
27 */
28
29 class ZooKeeper implements \Kafka\ClusterMetaData
30 {
31     // {{{ consts
32
33     /**
34      * get all broker
35      */
36     const BROKER_PATH = '/brokers/ids';
37
38     /**
39      * get broker detail
40      */
41     const BROKER_DETAIL_PATH = '/brokers/ids/%d';
42
43     /**
44      * get topic detail
45      */
46     const TOPIC_PATCH = '/brokers/topics/%s';
47
48     /**
49      * get partition state
50      */
51     const PARTITION_STATE = '/brokers/topics/%s/partitions/%d/state';
52
53     /**
54      * register consumer
55      */
56     const REG_CONSUMER = '/consumers/%s/ids/%s';
57
58     /**
59      * list consumer
60      */
61     const LIST_CONSUMER = '/consumers/%s/ids';
62
63     /**
64      * partition owner
65      */
66     const PARTITION_OWNER = '/consumers/%s/owners/%s/%d';
67
68     // }}}
69     // {{{ members
70
71     /**
72      * zookeeper
73      *
74      * @var mixed
75      * @access private
76      */
77     private $zookeeper = null;
78
79     // }}}
80     // {{{ functions
81     // {{{ public function __construct()
82
83     /**
84      * __construct
85      *
86      * @access public
87      * @param $hostList
88      * @param null $timeout
89      */
90     public function __construct($hostList, $timeout = null)
91     {
92         if (!is_null($timeout) && is_numeric($timeout)) {
93             $this->zookeeper = new \ZooKeeper($hostList, null, $timeout);
94         } else {
95             $this->zookeeper = new \ZooKeeper($hostList);
96         }
97     }
98
99     // }}}
100     // {{{ public function listBrokers()
101
102     /**
103      * get broker list using zookeeper
104      *
105      * @access public
106      * @return array
107      */
108     public function listBrokers()
109     {
110         $result = array();
111         $lists = $this->zookeeper->getChildren(self::BROKER_PATH);
112         if (!empty($lists)) {
113             foreach ($lists as $brokerId) {
114                 $brokerDetail = $this->getBrokerDetail($brokerId);
115                 if (!$brokerDetail) {
116                     continue;
117                 }
118                 $result[$brokerId] = $brokerDetail;
119             }
120         }
121
122         return $result;
123     }
124
125     // }}}
126     // {{{ public function getBrokerDetail()
127
128     /**
129      * get broker detail
130      *
131      * @param integer $brokerId
132      * @access public
133      * @return string|bool
134      */
135     public function getBrokerDetail($brokerId)
136     {
137         $result = array();
138         $path = sprintf(self::BROKER_DETAIL_PATH, (int) $brokerId);
139         if ($this->zookeeper->exists($path)) {
140             $result = $this->zookeeper->get($path);
141             if (!$result) {
142                 return false;
143             }
144
145             $result = json_decode($result, true);
146         }
147
148         return $result;
149     }
150
151     // }}}
152     // {{{ public function getTopicDetail()
153
154     /**
155      * get topic detail
156      *
157      * @param string $topicName
158      * @access public
159      * @return string|bool
160      */
161     public function getTopicDetail($topicName)
162     {
163         $result = array();
164         $path = sprintf(self::TOPIC_PATCH, (string) $topicName);
165         if ($this->zookeeper->exists($path)) {
166             $result = $this->zookeeper->get($path);
167             if (!$result) {
168                 return false;
169             }
170             $result = json_decode($result, true);
171         }
172
173         return $result;
174     }
175
176     // }}}
177     // {{{ public function getPartitionState()
178
179     /**
180      * get partition state
181      *
182      * @param string $topicName
183      * @param integer $partitionId
184      * @access public
185      * @return string|bool
186      */
187     public function getPartitionState($topicName, $partitionId = 0)
188     {
189         $result = array();
190         $path = sprintf(self::PARTITION_STATE, (string) $topicName, (int) $partitionId);
191         if ($this->zookeeper->exists($path)) {
192             $result = $this->zookeeper->get($path);
193             if (!$result) {
194                 return false;
195             }
196             $result = json_decode($result, true);
197         }
198
199         return $result;
200     }
201
202     // }}}
203     // {{{ public function registerConsumer()
204
205     /**
206      * register consumer
207      *
208      * @param $groupId
209      * @param integer $consumerId
210      * @param array $topics
211      * @access public
212      */
213     public function registerConsumer($groupId, $consumerId, $topics = array())
214     {
215         if (empty($topics)) {
216             return;
217         }
218
219         $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId);
220         $subData = array();
221         foreach ($topics as $topic) {
222             $subData[$topic] = 1;
223         }
224         $data = array(
225             'version' => '1',
226             'pattern' => 'white_list',
227             'subscription' => $subData,
228         );
229         if (!$this->zookeeper->exists($path)) {
230             $this->makeZkPath($path);
231             $this->makeZkNode($path, json_encode($data));
232         } else {
233             $this->zookeeper->set($path, json_encode($data));
234         }
235     }
236
237     // }}}
238     // {{{ public function listConsumer()
239
240     /**
241      * list consumer
242      *
243      * @param string $groupId
244      * @access public
245      * @return array
246      */
247     public function listConsumer($groupId)
248     {
249         $path = sprintf(self::LIST_CONSUMER, (string) $groupId);
250         if (!$this->zookeeper->exists($path)) {
251             return array();
252         } else {
253             return $this->zookeeper->getChildren($path);
254         }
255     }
256
257     // }}}
258     // {{{ public function getConsumersPerTopic()
259
260     /**
261      * get consumer per topic
262      *
263      * @param string $groupId
264      * @access public
265      * @return array
266      */
267     public function getConsumersPerTopic($groupId)
268     {
269         $consumers = $this->listConsumer($groupId);
270         if (empty($consumers)) {
271             return array();
272         }
273
274         $topics = array();
275         foreach ($consumers as $consumerId) {
276             $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId);
277             if (!$this->zookeeper->exists($path)) {
278                 continue;
279             }
280
281             $info = $this->zookeeper->get($path);
282             $info = json_decode($info, true);
283             $subTopic = isset($info['subscription']) ? $info['subscription'] : array();
284             foreach ($subTopic as $topic => $num) {
285                 $topics[$topic] = $consumerId;
286             }
287         }
288
289         return $topics;
290     }
291
292     // }}}
293     // {{{ public function addPartitionOwner()
294
295     /**
296      * add partition owner
297      *
298      * @param string $groupId
299      * @param string $topicName
300      * @param integer $partitionId
301      * @param string $consumerId
302      * @access public
303      * @return void
304      */
305     public function addPartitionOwner($groupId, $topicName, $partitionId, $consumerId)
306     {
307         $path = sprintf(self::PARTITION_OWNER, (string) $groupId, $topicName, (string) $partitionId);
308         if (!$this->zookeeper->exists($path)) {
309             $this->makeZkPath($path);
310             $this->makeZkNode($path, $consumerId);
311         } else {
312             $this->zookeeper->set($path, $consumerId);
313         }
314     }
315
316     // }}}
317     // {{{ protected function makeZkPath()
318
319     /**
320      * Equivalent of "mkdir -p" on ZooKeeper
321      *
322      * @param string $path  The path to the node
323      * @param mixed  $value The value to assign to each new node along the path
324      *
325      * @return bool
326      */
327     protected function makeZkPath($path, $value = 0)
328     {
329         $parts = explode('/', $path);
330         $parts = array_filter($parts);
331         $subpath = '';
332         while (count($parts) > 1) {
333             $subpath .= '/' . array_shift($parts);
334             if (!$this->zookeeper->exists($subpath)) {
335                 $this->makeZkNode($subpath, $value);
336             }
337         }
338     }
339
340     // }}}
341     // {{{ protected function makeZkNode()
342
343     /**
344      * Create a node on ZooKeeper at the given path
345      *
346      * @param string $path  The path to the node
347      * @param mixed  $value The value to assign to the new node
348      *
349      * @return bool
350      */
351     protected function makeZkNode($path, $value)
352     {
353         $params = array(
354             array(
355                 'perms'  => \Zookeeper::PERM_ALL,
356                 'scheme' => 'world',
357                 'id'     => 'anyone',
358             )
359         );
360         return $this->zookeeper->create($path, $value, $params);
361     }
362
363     // }}}
364     // }}}
365 }