]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php
MediaWiki 1.30.2-scripts2
[autoinstalls/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / MetaDataFromKafka.php
1 <?php
2
3 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
4 // +---------------------------------------------------------------------------
5 // | SWAN [ $_SWANBR_SLOGAN_$ ]
6 // +---------------------------------------------------------------------------
7 // | Copyright $_SWANBR_COPYRIGHT_$
8 // +---------------------------------------------------------------------------
9 // | Version  $_SWANBR_VERSION_$
10 // +---------------------------------------------------------------------------
11 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
12 // +---------------------------------------------------------------------------
13 // | $_SWANBR_WEB_DOMAIN_$
14 // +---------------------------------------------------------------------------
15
16 namespace Kafka;
17
18 /**
19 +------------------------------------------------------------------------------
20 * Cluster metadata provided by kafka
21 +------------------------------------------------------------------------------
22 *
23 * @package
24 * @version $_SWANBR_VERSION_$
25 * @copyright Copyleft
26 * @author ebernhardson@wikimedia.org
27 +------------------------------------------------------------------------------
28 */
29
30 class MetaDataFromKafka implements ClusterMetaData
31 {
32     // {{{ consts
33     // }}}
34     // {{{ members
35
36     /**
37      * client
38      *
39      * @var \Kafka\Client
40      * @access private
41      */
42     private $client;
43
44     /**
45      * list of kafka brokers to get metadata from
46      *
47      * @var array
48      * @access private
49      */
50     private $hostList;
51
52     /**
53      * List of all kafka brokers
54      *
55      * @var array
56      * @access private
57      */
58     private $brokers = array();
59
60     /**
61      * List of all loaded topic metadata
62      *
63      * @var array
64      * @access private
65      */
66     private $topics = array();
67
68     // }}}
69     // {{{ functions
70     // {{{ public function __construct()
71
72     /**
73      * @var string|array $hostList List of kafka brokers to get metadata from
74      * @access public
75      */
76     public function __construct($hostList)
77     {
78         if (is_string($hostList)) { // support host list 127.0.0.1:9092,192.168.2.11:9092 form
79             $this->hostList = explode(',', $hostList);
80         } else {
81             $this->hostList = (array)$hostList;
82         }
83         // randomize the order of servers we collect metadata from
84         shuffle($this->hostList);
85     }
86
87     // }}}
88     // {{{ public function setClient()
89
90     /**
91      * @var \Kafka\Client $client
92      * @access public
93      * @return void
94      */
95     public function setClient(\Kafka\Client $client)
96     {
97         $this->client = $client;
98     }
99
100     // }}}
101     // {{{ public function listBrokers()
102
103     /**
104      * get broker list from kafka metadata
105      *
106      * @access public
107      * @return array
108      */
109     public function listBrokers()
110     {
111         if ($this->brokers === null) {
112             $this->loadBrokers();
113         }
114         return $this->brokers;
115     }
116
117     // }}}
118     // {{{ public function getPartitionState()
119
120     /**
121      * @param string $topicName
122      * @param int $partitionId
123      * @return null
124      */
125     public function getPartitionState($topicName, $partitionId = 0)
126     {
127         if (!isset( $this->topics[$topicName] ) ) {
128             $this->loadTopicDetail(array($topicName));
129         }
130         if ( isset( $this->topics[$topicName]['partitions'][$partitionId] ) ) {
131             return $this->topics[$topicName]['partitions'][$partitionId];
132         } else {
133             return null;
134         }
135     }
136
137     // }}}
138     // {{{ public function getTopicDetail()
139
140     /**
141      *
142      * @param string $topicName
143      * @access public
144      * @return array
145      */
146     public function getTopicDetail($topicName)
147     {
148         if (!isset( $this->topics[$topicName] ) ) {
149             $this->loadTopicDetail(array($topicName));
150         }
151         if (isset( $this->topics[$topicName] ) ) {
152             return $this->topics[$topicName];
153         } else {
154             return array();
155         }
156     }
157
158     // }}}
159     // {{{ private function loadBrokers()
160
161     private function loadBrokers()
162     {
163         $this->brokers = array();
164         // not sure how to ask for only the brokers without a topic...
165         // just ask for a topic we don't care about
166         $this->loadTopicDetail(array('test'));
167     }
168
169     // }}}
170     // {{{ private function loadTopicDetail()
171
172     /**
173      * @param array $topics
174      */
175     private function loadTopicDetail(array $topics)
176     {
177         if ($this->client === null) {
178             throw new \Kafka\Exception('client was not provided');
179         }
180         $response = null;
181         foreach ($this->hostList as $host) {
182             try {
183                 $response = null;
184                 $stream = $this->client->getStream($host);
185                 $conn = $stream['stream'];
186                 $encoder = new \Kafka\Protocol\Encoder($conn);
187                 $encoder->metadataRequest($topics);
188                 $decoder = new \Kafka\Protocol\Decoder($conn);
189                 $response = $decoder->metadataResponse();
190                 $this->client->freeStream($stream['key']);
191                 break;
192             } catch (\Kafka\Exception $e) {
193                 // keep trying
194             }
195         }
196         if ($response) {
197             // Merge arrays using "+" operator to preserve key (which are broker IDs)
198             // instead of array_merge (which reindex numeric keys)
199             $this->brokers = $response['brokers'] + $this->brokers;
200             $this->topics = array_merge($response['topics'], $this->topics);
201         } else {
202             throw new \Kafka\Exception('Could not connect to any kafka brokers');
203         }
204     }
205
206     // }}}
207     // }}}
208 }