2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
23 * @version $_SWANBR_VERSION_$
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
41 private $client = null;
44 * send message options cache
49 private $payload = array();
65 private $fromOffset = true;
73 private static $instance = null;
80 private $maxSize = 1048576;
86 private $offsetStrategy = \Kafka\Offset::DEFAULT_EARLY;
90 // {{{ public function static getInstance()
97 * @param null $timeout
100 public static function getInstance($hostList, $timeout = null)
102 if (is_null(self::$instance)) {
103 self::$instance = new self($hostList, $timeout);
106 return self::$instance;
110 // {{{ private function __construct()
117 * @param null $timeout
119 private function __construct($hostList, $timeout = null)
121 $zookeeper = new \Kafka\ZooKeeper($hostList, $timeout);
122 $this->client = new \Kafka\Client($zookeeper);
126 // {{{ public function clearPayload()
134 public function clearPayload()
136 $this->payload = array();
140 // {{{ public function setTopic()
147 * @param null $defaultOffset
150 public function setTopic($topicName, $defaultOffset = null)
152 $parts = $this->client->getTopicDetail($topicName);
153 if (!isset($parts['partitions']) || empty($parts['partitions'])) {
158 foreach ($parts['partitions'] as $partId => $info) {
159 $this->setPartition($topicName, $partId, $defaultOffset);
166 // {{{ public function setPartition()
169 * set topic partition
173 * @param int $partitionId
174 * @param null $offset
177 public function setPartition($topicName, $partitionId = 0, $offset = null)
179 if (is_null($offset)) {
180 if ($this->fromOffset) {
181 $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId);
182 $offset = $offsetObject->getOffset($this->offsetStrategy);
183 \Kafka\Log::log('topic name:' . $topicName . ', part:' . $partitionId . 'get offset from kafka server, offet:' . $offset, LOG_DEBUG);
188 $this->payload[$topicName][$partitionId] = $offset;
194 // {{{ public function setFromOffset()
197 * set whether starting offset fetch
199 * @param boolean $fromOffset
203 public function setFromOffset($fromOffset)
205 $this->fromOffset = (boolean) $fromOffset;
209 // {{{ public function setMaxBytes()
212 * set fetch message max bytes
214 * @param int $maxSize
218 public function setMaxBytes($maxSize)
220 $this->maxSize = $maxSize;
224 // {{{ public function setGroup()
229 * @param string $group
233 public function setGroup($group)
235 $this->group = (string) $group;
240 // {{{ public function fetch()
243 * fetch message to broker
246 * @return \Kafka\Protocol\Fetch\Topic|bool
248 public function fetch()
250 $data = $this->_formatPayload();
256 foreach ($data as $host => $requestData) {
257 $connArr = $this->client->getStream($host);
258 $conn = $connArr['stream'];
259 $encoder = new \Kafka\Protocol\Encoder($conn);
260 $encoder->fetchRequest($requestData);
261 $streams[$connArr['key']] = $conn;
264 $fetch = new \Kafka\Protocol\Fetch\Topic($streams, $data);
266 // register fetch helper
267 $freeStream = new \Kafka\Protocol\Fetch\Helper\FreeStream($this->client);
268 $freeStream->setStreams($streams);
269 \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('freeStream', $freeStream);
271 // register partition commit offset
272 $commitOffset = new \Kafka\Protocol\Fetch\Helper\CommitOffset($this->client);
273 $commitOffset->setGroup($this->group);
274 \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('commitOffset', $commitOffset);
276 $updateConsumer = new \Kafka\Protocol\Fetch\Helper\Consumer($this);
277 \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('updateConsumer', $updateConsumer);
283 // {{{ public function getClient()
291 public function getClient()
293 return $this->client;
297 * passthru method to client for setting stream options
299 * @param array $options
301 public function setStreamOptions($options = array())
303 $this->client->setStreamOptions($options);
307 // {{{ private function _formatPayload()
310 * format payload array
315 private function _formatPayload()
317 if (empty($this->payload)) {
322 foreach ($this->payload as $topicName => $partitions) {
323 foreach ($partitions as $partitionId => $offset) {
324 $host = $this->client->getHostByPartition($topicName, $partitionId);
325 $data[$host][$topicName][$partitionId] = $offset;
329 $requestData = array();
330 foreach ($data as $host => $info) {
331 $topicData = array();
332 foreach ($info as $topicName => $partitions) {
333 $partitionData = array();
334 foreach ($partitions as $partitionId => $offset) {
335 $partitionData[] = array(
336 'partition_id' => $partitionId,
338 'max_bytes' => $this->maxSize,
341 $topicData[] = array(
342 'topic_name' => $topicName,
343 'partitions' => $partitionData,
347 $requestData[$host] = array(
348 'data' => $topicData,
356 * const LAST_OFFSET = -1;
357 * const EARLIEST_OFFSET = -2;
358 * const DEFAULT_LAST = -2;
359 * const DEFAULT_EARLY = -1;
360 * @param int $offsetStrategy
362 public function setOffsetStrategy($offsetStrategy)
364 $this->offsetStrategy = $offsetStrategy;