3 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
4 // +---------------------------------------------------------------------------
5 // | SWAN [ $_SWANBR_SLOGAN_$ ]
6 // +---------------------------------------------------------------------------
7 // | Copyright $_SWANBR_COPYRIGHT_$
8 // +---------------------------------------------------------------------------
9 // | Version $_SWANBR_VERSION_$
10 // +---------------------------------------------------------------------------
11 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
12 // +---------------------------------------------------------------------------
13 // | $_SWANBR_WEB_DOMAIN_$
14 // +---------------------------------------------------------------------------
19 +------------------------------------------------------------------------------
20 * Cluster metadata provided by kafka
21 +------------------------------------------------------------------------------
24 * @version $_SWANBR_VERSION_$
26 * @author ebernhardson@wikimedia.org
27 +------------------------------------------------------------------------------
30 class MetaDataFromKafka implements ClusterMetaData
45 * list of kafka brokers to get metadata from
53 * List of all kafka brokers
58 private $brokers = array();
61 * List of all loaded topic metadata
66 private $topics = array();
70 // {{{ public function __construct()
73 * @var string|array $hostList List of kafka brokers to get metadata from
76 public function __construct($hostList)
78 if (is_string($hostList)) { // support host list 127.0.0.1:9092,192.168.2.11:9092 form
79 $this->hostList = explode(',', $hostList);
81 $this->hostList = (array)$hostList;
83 // randomize the order of servers we collect metadata from
84 shuffle($this->hostList);
88 // {{{ public function setClient()
91 * @var \Kafka\Client $client
95 public function setClient(\Kafka\Client $client)
97 $this->client = $client;
101 // {{{ public function listBrokers()
104 * get broker list from kafka metadata
109 public function listBrokers()
111 if ($this->brokers === null) {
112 $this->loadBrokers();
114 return $this->brokers;
118 // {{{ public function getPartitionState()
121 * @param string $topicName
122 * @param int $partitionId
125 public function getPartitionState($topicName, $partitionId = 0)
127 if (!isset( $this->topics[$topicName] ) ) {
128 $this->loadTopicDetail(array($topicName));
130 if ( isset( $this->topics[$topicName]['partitions'][$partitionId] ) ) {
131 return $this->topics[$topicName]['partitions'][$partitionId];
138 // {{{ public function getTopicDetail()
142 * @param string $topicName
146 public function getTopicDetail($topicName)
148 if (!isset( $this->topics[$topicName] ) ) {
149 $this->loadTopicDetail(array($topicName));
151 if (isset( $this->topics[$topicName] ) ) {
152 return $this->topics[$topicName];
159 // {{{ private function loadBrokers()
161 private function loadBrokers()
163 $this->brokers = array();
164 // not sure how to ask for only the brokers without a topic...
165 // just ask for a topic we don't care about
166 $this->loadTopicDetail(array('test'));
170 // {{{ private function loadTopicDetail()
173 * @param array $topics
175 private function loadTopicDetail(array $topics)
177 if ($this->client === null) {
178 throw new \Kafka\Exception('client was not provided');
181 foreach ($this->hostList as $host) {
184 $stream = $this->client->getStream($host);
185 $conn = $stream['stream'];
186 $encoder = new \Kafka\Protocol\Encoder($conn);
187 $encoder->metadataRequest($topics);
188 $decoder = new \Kafka\Protocol\Decoder($conn);
189 $response = $decoder->metadataResponse();
190 $this->client->freeStream($stream['key']);
192 } catch (\Kafka\Exception $e) {
197 // Merge arrays using "+" operator to preserve key (which are broker IDs)
198 // instead of array_merge (which reindex numeric keys)
199 $this->brokers = $response['brokers'] + $this->brokers;
200 $this->topics = array_merge($response['topics'], $this->topics);
202 throw new \Kafka\Exception('Could not connect to any kafka brokers');