2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
23 * @version $_SWANBR_VERSION_$
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
29 class ZooKeeper implements \Kafka\ClusterMetaData
36 const BROKER_PATH = '/brokers/ids';
41 const BROKER_DETAIL_PATH = '/brokers/ids/%d';
46 const TOPIC_PATCH = '/brokers/topics/%s';
51 const PARTITION_STATE = '/brokers/topics/%s/partitions/%d/state';
56 const REG_CONSUMER = '/consumers/%s/ids/%s';
61 const LIST_CONSUMER = '/consumers/%s/ids';
66 const PARTITION_OWNER = '/consumers/%s/owners/%s/%d';
77 private $zookeeper = null;
81 // {{{ public function __construct()
88 * @param null $timeout
90 public function __construct($hostList, $timeout = null)
92 if (!is_null($timeout) && is_numeric($timeout)) {
93 $this->zookeeper = new \ZooKeeper($hostList, null, $timeout);
95 $this->zookeeper = new \ZooKeeper($hostList);
100 // {{{ public function listBrokers()
103 * get broker list using zookeeper
108 public function listBrokers()
111 $lists = $this->zookeeper->getChildren(self::BROKER_PATH);
112 if (!empty($lists)) {
113 foreach ($lists as $brokerId) {
114 $brokerDetail = $this->getBrokerDetail($brokerId);
115 if (!$brokerDetail) {
118 $result[$brokerId] = $brokerDetail;
126 // {{{ public function getBrokerDetail()
131 * @param integer $brokerId
133 * @return string|bool
135 public function getBrokerDetail($brokerId)
138 $path = sprintf(self::BROKER_DETAIL_PATH, (int) $brokerId);
139 if ($this->zookeeper->exists($path)) {
140 $result = $this->zookeeper->get($path);
145 $result = json_decode($result, true);
152 // {{{ public function getTopicDetail()
157 * @param string $topicName
159 * @return string|bool
161 public function getTopicDetail($topicName)
164 $path = sprintf(self::TOPIC_PATCH, (string) $topicName);
165 if ($this->zookeeper->exists($path)) {
166 $result = $this->zookeeper->get($path);
170 $result = json_decode($result, true);
177 // {{{ public function getPartitionState()
180 * get partition state
182 * @param string $topicName
183 * @param integer $partitionId
185 * @return string|bool
187 public function getPartitionState($topicName, $partitionId = 0)
190 $path = sprintf(self::PARTITION_STATE, (string) $topicName, (int) $partitionId);
191 if ($this->zookeeper->exists($path)) {
192 $result = $this->zookeeper->get($path);
196 $result = json_decode($result, true);
203 // {{{ public function registerConsumer()
209 * @param integer $consumerId
210 * @param array $topics
213 public function registerConsumer($groupId, $consumerId, $topics = array())
215 if (empty($topics)) {
219 $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId);
221 foreach ($topics as $topic) {
222 $subData[$topic] = 1;
226 'pattern' => 'white_list',
227 'subscription' => $subData,
229 if (!$this->zookeeper->exists($path)) {
230 $this->makeZkPath($path);
231 $this->makeZkNode($path, json_encode($data));
233 $this->zookeeper->set($path, json_encode($data));
238 // {{{ public function listConsumer()
243 * @param string $groupId
247 public function listConsumer($groupId)
249 $path = sprintf(self::LIST_CONSUMER, (string) $groupId);
250 if (!$this->zookeeper->exists($path)) {
253 return $this->zookeeper->getChildren($path);
258 // {{{ public function getConsumersPerTopic()
261 * get consumer per topic
263 * @param string $groupId
267 public function getConsumersPerTopic($groupId)
269 $consumers = $this->listConsumer($groupId);
270 if (empty($consumers)) {
275 foreach ($consumers as $consumerId) {
276 $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId);
277 if (!$this->zookeeper->exists($path)) {
281 $info = $this->zookeeper->get($path);
282 $info = json_decode($info, true);
283 $subTopic = isset($info['subscription']) ? $info['subscription'] : array();
284 foreach ($subTopic as $topic => $num) {
285 $topics[$topic] = $consumerId;
293 // {{{ public function addPartitionOwner()
296 * add partition owner
298 * @param string $groupId
299 * @param string $topicName
300 * @param integer $partitionId
301 * @param string $consumerId
305 public function addPartitionOwner($groupId, $topicName, $partitionId, $consumerId)
307 $path = sprintf(self::PARTITION_OWNER, (string) $groupId, $topicName, (string) $partitionId);
308 if (!$this->zookeeper->exists($path)) {
309 $this->makeZkPath($path);
310 $this->makeZkNode($path, $consumerId);
312 $this->zookeeper->set($path, $consumerId);
317 // {{{ protected function makeZkPath()
320 * Equivalent of "mkdir -p" on ZooKeeper
322 * @param string $path The path to the node
323 * @param mixed $value The value to assign to each new node along the path
327 protected function makeZkPath($path, $value = 0)
329 $parts = explode('/', $path);
330 $parts = array_filter($parts);
332 while (count($parts) > 1) {
333 $subpath .= '/' . array_shift($parts);
334 if (!$this->zookeeper->exists($subpath)) {
335 $this->makeZkNode($subpath, $value);
341 // {{{ protected function makeZkNode()
344 * Create a node on ZooKeeper at the given path
346 * @param string $path The path to the node
347 * @param mixed $value The value to assign to the new node
351 protected function makeZkNode($path, $value)
355 'perms' => \Zookeeper::PERM_ALL,
360 return $this->zookeeper->create($path, $value, $params);