2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
23 * @version $_SWANBR_VERSION_$
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
41 private $client = null;
44 * send message options cache
49 private $payload = array();
52 * default the server will not send any response
57 private $requiredAck = 0;
60 * default timeout is 100ms
65 private $timeout = 100;
73 private static $instance = null;
77 // {{{ public function static getInstance()
85 * @param null $kafkaHostList
88 public static function getInstance($hostList, $timeout, $kafkaHostList = null)
90 if (is_null(self::$instance)) {
91 self::$instance = new self($hostList, $timeout, $kafkaHostList);
94 return self::$instance;
98 // {{{ public function __construct()
105 * @param null $timeout
106 * @param null $kafkaHostList
108 public function __construct($hostList, $timeout = null, $kafkaHostList = null)
110 if ($hostList instanceof \Kafka\ClusterMetaData) {
111 $metadata = $hostList;
112 } elseif ( $kafkaHostList !== null ) {
113 $metadata = new \Kafka\MetaDataFromKafka($kafkaHostList);
115 $metadata = new \Kafka\ZooKeeper($hostList, $timeout);
117 $this->client = new \Kafka\Client($metadata);
121 // {{{ public function setMessages()
128 * @param int $partitionId
129 * @param array $messages
132 public function setMessages($topicName, $partitionId = 0, $messages = array())
134 if (isset($this->payload[$topicName][$partitionId])) {
135 $this->payload[$topicName][$partitionId] =
136 array_merge($this->payload[$topicName][$partitionId], $messages);
138 $this->payload[$topicName][$partitionId] = $messages;
145 // {{{ public function setRequireAck()
149 * This field indicates how many acknowledgements the servers should receive
150 * before responding to the request. If it is 0 the server will not send any
151 * response (this is the only case where the server will not reply to a
152 * request). If it is 1, the server will wait the data is written to the
153 * local log before sending a response. If it is -1 the server will block
154 * until the message is committed by all in sync replicas before sending a
155 * response. For any number > 1 the server will block waiting for this
156 * number of acknowledgements to occur (but the server will never wait for
157 * more acknowledgements than there are in-sync replicas).
163 public function setRequireAck($ack = 0)
166 $this->requiredAck = (int) $ack;
173 // {{{ public function setTimeOut()
176 * set request timeout
178 * @param int $timeout
182 public function setTimeOut($timeout = 100)
184 if ((int) $timeout) {
185 $this->timeout = (int) $timeout;
191 // {{{ public function send()
194 * send message to broker
199 public function send()
201 $data = $this->_formatPayload();
206 $responseData = array();
207 foreach ($data as $host => $requestData) {
208 $stream = $this->client->getStream($host);
209 $conn = $stream['stream'];
210 $encoder = new \Kafka\Protocol\Encoder($conn);
211 $encoder->produceRequest($requestData);
212 if ((int) $this->requiredAck !== 0) { // get broker response
213 $decoder = new \Kafka\Protocol\Decoder($conn);
214 $response = $decoder->produceResponse();
215 foreach ($response as $topicName => $info) {
216 if (!isset($responseData[$topicName])) {
217 $responseData[$topicName] = $info;
219 $responseData[$topicName] = array_merge($info, $responseData[$topicName]);
224 $this->client->freeStream($stream['key']);
227 $this->payload = array();
228 return $responseData;
232 // {{{ public function getClient()
240 public function getClient()
242 return $this->client;
246 * passthru method to client for setting stream options
249 * @param array $options
251 public function setStreamOptions($options = array())
253 $this->client->setStreamOptions($options);
257 // {{{ public function getAvailablePartitions()
260 * get available partition
266 public function getAvailablePartitions($topicName)
268 $topicDetail = $this->client->getTopicDetail($topicName);
269 if (is_array($topicDetail) && isset($topicDetail['partitions'])) {
270 $topicPartitiions = array_keys($topicDetail['partitions']);
272 $topicPartitiions = array();
275 return $topicPartitiions;
279 // {{{ private function _formatPayload()
282 * format payload array
287 private function _formatPayload()
289 if (empty($this->payload)) {
294 foreach ($this->payload as $topicName => $partitions) {
295 foreach ($partitions as $partitionId => $messages) {
296 $host = $this->client->getHostByPartition($topicName, $partitionId);
297 $data[$host][$topicName][$partitionId] = $messages;
301 $requestData = array();
302 foreach ($data as $host => $info) {
303 $topicData = array();
304 foreach ($info as $topicName => $partitions) {
305 $partitionData = array();
306 foreach ($partitions as $partitionId => $messages) {
307 $partitionData[] = array(
308 'partition_id' => $partitionId,
309 'messages' => $messages,
312 $topicData[] = array(
313 'topic_name' => $topicName,
314 'partitions' => $partitionData,
318 $requestData[$host] = array(
319 'required_ack' => $this->requiredAck,
320 'timeout' => $this->timeout,
321 'data' => $topicData,