client = $client; $this->groupId = $groupId; $this->topicName = $topicName; $this->partitionId = $partitionId; $host = $this->client->getHostByPartition($topicName, $partitionId); $stream = $this->client->getStream($host); $conn = $stream['stream']; $this->streamKey = $stream['key']; $this->encoder = new \Kafka\Protocol\Encoder($conn); $this->decoder = new \Kafka\Protocol\Decoder($conn); } // }}} // {{{ public function setOffset() /** * set consumer offset * * @param integer $offset * @access public * @return void */ public function setOffset($offset) { $maxOffset = $this->getProduceOffset(); if ($offset > $maxOffset) { throw new \Kafka\Exception('this offset is invalid. must less than max offset:' . $maxOffset); } $data = array( 'group_id' => $this->groupId, 'data' => array( array( 'topic_name' => $this->topicName, 'partitions' => array( array( 'partition_id' => $this->partitionId, 'offset' => $offset, ), ), ), ), ); $topicName = $this->topicName; $partitionId = $this->partitionId; $this->encoder->commitOffsetRequest($data); $result = $this->decoder->commitOffsetResponse(); $this->client->freeStream($this->streamKey); if (!isset($result[$topicName][$partitionId]['errCode'])) { throw new \Kafka\Exception('commit topic offset failed.'); } if ($result[$topicName][$partitionId]['errCode'] != 0) { throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); } } // }}} // {{{ public function getOffset() /** * get consumer offset * * @param integer $defaultOffset * if defaultOffset -1 instead of early offset * if defaultOffset -2 instead of last offset * @access public * @return int */ public function getOffset($defaultOffset = self::DEFAULT_LAST) { $maxOffset = $this->getProduceOffset(self::LAST_OFFSET); $minOffset = $this->getProduceOffset(self::EARLIEST_OFFSET); $data = array( 'group_id' => $this->groupId, 'data' => array( array( 'topic_name' => $this->topicName, 'partitions' => array( array( 'partition_id' => $this->partitionId, ), ), ), ), ); $this->encoder->fetchOffsetRequest($data); $result = $this->decoder->fetchOffsetResponse(); $this->client->freeStream($this->streamKey); $topicName = $this->topicName; $partitionId = $this->partitionId; if (!isset($result[$topicName][$partitionId]['errCode'])) { throw new \Kafka\Exception('fetch topic offset failed.'); } if ($result[$topicName][$partitionId]['errCode'] == 3) { switch ($defaultOffset) { case self::DEFAULT_LAST: Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default last.", LOG_INFO); return $maxOffset; case self::DEFAULT_EARLY: Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default early.", LOG_INFO); return $minOffset; default: $this->setOffset($defaultOffset); Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default $defaultOffset.", LOG_INFO); return $defaultOffset; } } elseif ($result[$topicName][$partitionId]['errCode'] == 0) { $offset = $result[$topicName][$partitionId]['offset']; if ($offset > $maxOffset || $offset < $minOffset) { if ($defaultOffset == self::DEFAULT_EARLY) { $offset = $minOffset; } else { $offset = $maxOffset; } } Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is $offset.", LOG_INFO); return $offset; } else { throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); } } // }}} // {{{ public function getProduceOffset() /** * get produce server offset * * @param integer $timeLine * @access public * @return int */ public function getProduceOffset($timeLine = self::LAST_OFFSET) { $topicName = $this->topicName; $partitionId = $this->partitionId; $requestData = array( 'data' => array( array( 'topic_name' => $this->topicName, 'partitions' => array( array( 'partition_id' => $this->partitionId, 'time' => $timeLine, 'max_offset' => 1, ), ), ), ), ); $this->encoder->offsetRequest($requestData); $result = $this->decoder->offsetResponse(); $this->client->freeStream($this->streamKey); if (!isset($result[$topicName][$partitionId]['offset'])) { if (isset($result[$topicName][$partitionId]['errCode'])) { throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode'])); } else { throw new \Kafka\Exception('get offset failed. topic name:' . $this->topicName . ' partitionId: ' . $this->partitionId); } } return array_shift($result[$topicName][$partitionId]['offset']); } // }}} // }}} }