X-Git-Url: https://scripts.mit.edu/gitweb/autoinstallsdev/mediawiki.git/blobdiff_plain/19e297c21b10b1b8a3acad5e73fc71dcb35db44a..6932310fd58ebef145fa01eb76edf7150284d8ea:/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php new file mode 100644 index 00000000..64f60f26 --- /dev/null +++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php @@ -0,0 +1,344 @@ +streams = $streams; + $topicInfos = array(); + foreach ($context as $values) { + if (!isset($values['data'])) { + continue; + } + + foreach ($values['data'] as $value) { + if (!isset($value['topic_name']) || !isset($value['partitions'])) { + continue; + } + + $topicName = $value['topic_name']; + foreach ($value['partitions'] as $part) { + $topicInfos[$topicName][$part['partition_id']] = array( + 'offset' => $part['offset'], + ); + } + } + } + $this->context = $topicInfos; + $this->topicCount = $this->getTopicCount(); + } + + // }}} + // {{{ public function current() + + /** + * current + * + * @access public + * @return mixed + */ + public function current() + { + return $this->current; + } + + // }}} + // {{{ public function key() + + /** + * key + * + * @access public + * @return string + */ + public function key() + { + return $this->key; + } + + // }}} + // {{{ public function rewind() + + /** + * implements Iterator function + * + * @access public + * @return void + */ + public function rewind() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function valid() + + /** + * implements Iterator function + * + * @access public + * @return integer + */ + public function valid() + { + return $this->valid; + } + + // }}} + // {{{ public function next() + + /** + * implements Iterator function + * + * @access public + * @return void + */ + public function next() + { + $this->valid = $this->loadNextTopic(); + } + + // }}} + // {{{ public function count() + + /** + * implements Countable function + * + * @access public + * @return integer + */ + public function count() + { + return $this->topicCount; + } + + // }}} + // {{{ protected function getTopicCount() + + /** + * get message size + * only use to object init + * + * @access protected + * @return integer + */ + protected function getTopicCount() + { + $count = 0; + foreach (array_values($this->streams) as $key => $stream) { + // read topic count + $stream->read(8, true); + $data = $stream->read(4, true); + $data = Decoder::unpack(Decoder::BIT_B32, $data); + $topicCount = array_shift($data); + $count += $topicCount; + $this->topicCounts[$key] = $topicCount; + if ($count <= 0) { + throw new \Kafka\Exception\OutOfRange($count . ' is not a valid topic count'); + } + } + + return $count; + } + + // }}} + // {{{ public function loadNextTopic() + + /** + * load next topic + * + * @access public + * @return bool + */ + public function loadNextTopic() + { + if ($this->validCount >= $this->topicCount) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + return false; + } + + if ($this->currentStreamCount >= $this->topicCounts[$this->currentStreamKey]) { + \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey); + $this->currentStreamKey++; + } + + $lockKeys = array_keys($this->streams); + $streams = array_values($this->streams); + if (!isset($streams[$this->currentStreamKey])) { + return false; + } + + $stream = $streams[$this->currentStreamKey]; + $this->currentStreamLockKey = $lockKeys[$this->currentStreamKey]; + + try { + $topicLen = $stream->read(2, true); + $topicLen = Decoder::unpack(Decoder::BIT_B16, $topicLen); + $topicLen = array_shift($topicLen); + if ($topicLen <= 0) { + return false; + } + + // topic name + $this->key = $stream->read($topicLen, true); + $this->current = new Partition($this, $this->context); + } catch (\Kafka\Exception $e) { + return false; + } + + $this->validCount++; + $this->currentStreamCount++; + + return true; + } + + // }}} + // {{{ public function getStream() + + /** + * get current stream + * + * @access public + * @return \Kafka\Socket + */ + public function getStream() + { + $streams = array_values($this->streams); + return $streams[$this->currentStreamKey]; + } + + // }}} + // }}} +}