]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blobdiff - vendor/nmred/kafka-php/src/Kafka/Consumer.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Consumer.php
diff --git a/vendor/nmred/kafka-php/src/Kafka/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Consumer.php
new file mode 100644 (file)
index 0000000..1b9d42c
--- /dev/null
@@ -0,0 +1,369 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version  $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Consumer
+{
+    // {{{ consts
+    // }}}
+    // {{{ members
+
+    /**
+     * client
+     *
+     * @var Client
+     * @access private
+     */
+    private $client = null;
+
+    /**
+     * send message options cache
+     *
+     * @var array
+     * @access private
+     */
+    private $payload = array();
+
+    /**
+     * consumer group
+     *
+     * @var string
+     * @access private
+     */
+    private $group = '';
+
+    /**
+     * from offset
+     *
+     * @var mixed
+     * @access private
+     */
+    private $fromOffset = true;
+
+    /**
+     * produce instance
+     *
+     * @var \Kafka\Produce
+     * @access private
+     */
+    private static $instance = null;
+
+    /**
+     * maxSize
+     *
+     * @var integer
+     */
+    private $maxSize = 1048576;
+
+    /**
+     * offsetStrategy
+     * @var integer
+     */
+    private $offsetStrategy = \Kafka\Offset::DEFAULT_EARLY;
+
+    // }}}
+    // {{{ functions
+    // {{{ public function static getInstance()
+
+    /**
+     * set send messages
+     *
+     * @access public
+     * @param $hostList
+     * @param null $timeout
+     * @return Consumer
+     */
+    public static function getInstance($hostList, $timeout = null)
+    {
+        if (is_null(self::$instance)) {
+            self::$instance = new self($hostList, $timeout);
+        }
+
+        return self::$instance;
+    }
+
+    // }}}
+    // {{{ private function __construct()
+
+    /**
+     * __construct
+     *
+     * @access public
+     * @param $hostList
+     * @param null $timeout
+     */
+    private function __construct($hostList, $timeout = null)
+    {
+        $zookeeper = new \Kafka\ZooKeeper($hostList, $timeout);
+        $this->client = new \Kafka\Client($zookeeper);
+    }
+
+    // }}}
+    // {{{ public function clearPayload()
+
+    /**
+     * clearPayload
+     *
+     * @access public
+     * @return void
+     */
+    public function clearPayload()
+    {
+        $this->payload = array();
+    }
+
+    // }}}
+    // {{{ public function setTopic()
+
+    /**
+     * set topic name
+     *
+     * @access public
+     * @param $topicName
+     * @param null $defaultOffset
+     * @return Consumer
+     */
+    public function setTopic($topicName, $defaultOffset = null)
+    {
+        $parts = $this->client->getTopicDetail($topicName);
+        if (!isset($parts['partitions']) || empty($parts['partitions'])) {
+            // set topic fail.
+            return $this;
+        }
+
+        foreach ($parts['partitions'] as $partId => $info) {
+            $this->setPartition($topicName, $partId, $defaultOffset);
+        }
+
+        return $this;
+    }
+
+    // }}}
+    // {{{ public function setPartition()
+
+    /**
+     * set topic partition
+     *
+     * @access public
+     * @param $topicName
+     * @param int $partitionId
+     * @param null $offset
+     * @return Consumer
+     */
+    public function setPartition($topicName, $partitionId = 0, $offset = null)
+    {
+        if (is_null($offset)) {
+            if ($this->fromOffset) {
+                $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId);
+                $offset = $offsetObject->getOffset($this->offsetStrategy);
+                \Kafka\Log::log('topic name:' . $topicName . ', part:' . $partitionId . 'get offset from kafka server, offet:' . $offset, LOG_DEBUG);
+            } else {
+                $offset = 0;
+            }
+        }
+        $this->payload[$topicName][$partitionId] = $offset;
+
+        return $this;
+    }
+
+    // }}}
+    // {{{ public function setFromOffset()
+
+    /**
+     * set whether starting offset fetch
+     *
+     * @param boolean $fromOffset
+     * @access public
+     * @return void
+     */
+    public function setFromOffset($fromOffset)
+    {
+        $this->fromOffset = (boolean) $fromOffset;
+    }
+
+    // }}}
+    // {{{ public function setMaxBytes()
+
+    /**
+     * set fetch message max bytes
+     *
+     * @param int $maxSize
+     * @access public
+     * @return void
+     */
+    public function setMaxBytes($maxSize)
+    {
+        $this->maxSize = $maxSize;
+    }
+
+    // }}}
+    // {{{ public function setGroup()
+
+    /**
+     * set consumer group
+     *
+     * @param string $group
+     * @access public
+     * @return Consumer
+     */
+    public function setGroup($group)
+    {
+        $this->group = (string) $group;
+        return $this;
+    }
+
+    // }}}
+    // {{{ public function fetch()
+
+    /**
+     * fetch message to broker
+     *
+     * @access public
+     * @return \Kafka\Protocol\Fetch\Topic|bool
+     */
+    public function fetch()
+    {
+        $data = $this->_formatPayload();
+        if (empty($data)) {
+            return false;
+        }
+
+        $streams = array();
+        foreach ($data as $host => $requestData) {
+            $connArr = $this->client->getStream($host);
+            $conn    = $connArr['stream'];
+            $encoder = new \Kafka\Protocol\Encoder($conn);
+            $encoder->fetchRequest($requestData);
+            $streams[$connArr['key']] = $conn;
+        }
+
+        $fetch = new \Kafka\Protocol\Fetch\Topic($streams, $data);
+
+        // register fetch helper
+        $freeStream = new \Kafka\Protocol\Fetch\Helper\FreeStream($this->client);
+        $freeStream->setStreams($streams);
+        \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('freeStream', $freeStream);
+
+        // register partition commit offset
+        $commitOffset = new \Kafka\Protocol\Fetch\Helper\CommitOffset($this->client);
+        $commitOffset->setGroup($this->group);
+        \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('commitOffset', $commitOffset);
+
+        $updateConsumer = new \Kafka\Protocol\Fetch\Helper\Consumer($this);
+        \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('updateConsumer', $updateConsumer);
+
+        return $fetch;
+    }
+
+    // }}}
+    // {{{ public function getClient()
+
+    /**
+     * get client object
+     *
+     * @access public
+     * @return Client
+     */
+    public function getClient()
+    {
+        return $this->client;
+    }
+
+    /**
+     * passthru method to client for setting stream options
+     *
+     * @param array $options
+     */
+    public function setStreamOptions($options = array())
+    {
+        $this->client->setStreamOptions($options);
+    }
+
+    // }}}
+    // {{{ private function _formatPayload()
+
+    /**
+     * format payload array
+     *
+     * @access private
+     * @return array
+     */
+    private function _formatPayload()
+    {
+        if (empty($this->payload)) {
+            return array();
+        }
+
+        $data = array();
+        foreach ($this->payload as $topicName => $partitions) {
+            foreach ($partitions as $partitionId => $offset) {
+                $host = $this->client->getHostByPartition($topicName, $partitionId);
+                $data[$host][$topicName][$partitionId] = $offset;
+            }
+        }
+
+        $requestData = array();
+        foreach ($data as $host => $info) {
+            $topicData = array();
+            foreach ($info as $topicName => $partitions) {
+                $partitionData = array();
+                foreach ($partitions as $partitionId => $offset) {
+                    $partitionData[] = array(
+                        'partition_id' => $partitionId,
+                        'offset'       => $offset,
+                        'max_bytes'    => $this->maxSize,
+                    );
+                }
+                $topicData[] = array(
+                    'topic_name' => $topicName,
+                    'partitions' => $partitionData,
+                );
+            }
+
+            $requestData[$host] = array(
+                'data' => $topicData,
+            );
+        }
+
+       return $requestData;
+    }
+
+    /**
+     * const LAST_OFFSET = -1;
+     * const EARLIEST_OFFSET = -2;
+     * const DEFAULT_LAST  = -2;
+     * const DEFAULT_EARLY = -1;
+     * @param int $offsetStrategy
+     */
+    public function setOffsetStrategy($offsetStrategy)
+    {
+        $this->offsetStrategy = $offsetStrategy;
+    }
+
+    // }}}
+    // }}}
+}