2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
23 * @version $_SWANBR_VERSION_$
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
33 const READ_MAX_LEN = 5242880; // read socket max length 5MB
36 * max write socket buffer
37 * fixed:send of 8192 bytes failed with errno=11 Resource temporarily
38 * unavailable error info
40 const MAX_WRITE_BUFFER = 4096;
46 * Send timeout in seconds.
51 private $sendTimeoutSec = 0;
54 * Send timeout in microseconds.
59 private $sendTimeoutUsec = 100000;
62 * Recv timeout in seconds
67 private $recvTimeoutSec = 0;
70 * Recv timeout in microseconds
75 private $recvTimeoutUsec = 750000;
83 private $stream = null;
103 // {{{ public function __construct()
111 * @param int $recvTimeoutSec
112 * @param int $recvTimeoutUsec
113 * @param int $sendTimeoutSec
114 * @param int $sendTimeoutUsec
116 public function __construct($host, $port, $recvTimeoutSec = 0, $recvTimeoutUsec = 750000, $sendTimeoutSec = 0, $sendTimeoutUsec = 100000)
120 $this->setRecvTimeoutSec($recvTimeoutSec);
121 $this->setRecvTimeoutUsec($recvTimeoutUsec);
122 $this->setSendTimeoutSec($sendTimeoutSec);
123 $this->setSendTimeoutUsec($sendTimeoutUsec);
127 * @param float $sendTimeoutSec
129 public function setSendTimeoutSec($sendTimeoutSec)
131 $this->sendTimeoutSec = $sendTimeoutSec;
135 * @param float $sendTimeoutUsec
137 public function setSendTimeoutUsec($sendTimeoutUsec)
139 $this->sendTimeoutUsec = $sendTimeoutUsec;
143 * @param float $recvTimeoutSec
145 public function setRecvTimeoutSec($recvTimeoutSec)
147 $this->recvTimeoutSec = $recvTimeoutSec;
151 * @param float $recvTimeoutUsec
153 public function setRecvTimeoutUsec($recvTimeoutUsec)
155 $this->recvTimeoutUsec = $recvTimeoutUsec;
161 // {{{ public static function createFromStream()
164 * Optional method to set the internal stream handle
171 public static function createFromStream($stream)
173 $socket = new self('localhost', 0);
174 $socket->setStream($stream);
179 // {{{ public function setStream()
182 * Optional method to set the internal stream handle
184 * @param mixed $stream
188 public function setStream($stream)
190 $this->stream = $stream;
194 // {{{ public function connect()
197 * Connects the socket
202 public function connect()
204 if (is_resource($this->stream)) {
208 if (empty($this->host)) {
209 throw new \Kafka\Exception('Cannot open null host.');
211 if ($this->port <= 0) {
212 throw new \Kafka\Exception('Cannot open without port.');
215 $this->stream = @fsockopen(
220 $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000)
223 if ($this->stream == false) {
224 $error = 'Could not connect to '
225 . $this->host . ':' . $this->port
226 . ' ('.$errstr.' ['.$errno.'])';
227 throw new \Kafka\Exception\SocketConnect($error);
230 stream_set_blocking($this->stream, 0);
234 // {{{ public function close()
242 public function close()
244 if (is_resource($this->stream)) {
245 fclose($this->stream);
250 // {{{ public function read()
253 * Read from the socket at most $len bytes.
255 * This method will not wait for all the requested data, it will return as
256 * soon as any data is received.
258 * @param integer $len Maximum number of bytes to read.
259 * @param boolean $verifyExactLength Throw an exception if the number of read bytes is less than $len
261 * @return string Binary data
262 * @throws \Kafka\Exception\SocketEOF
264 public function read($len, $verifyExactLength = false)
266 if ($len > self::READ_MAX_LEN) {
267 throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream, length too longer.');
271 $read = array($this->stream);
272 $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
274 $remainingBytes = $len;
276 while ($remainingBytes > 0) {
277 $chunk = fread($this->stream, $remainingBytes);
278 if ($chunk === false) {
280 throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
282 if (strlen($chunk) === 0) {
283 // Zero bytes because of EOF?
284 if (feof($this->stream)) {
286 throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
288 // Otherwise wait for bytes
289 $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
290 if ($readable !== 1) {
291 throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
293 continue; // attempt another read
296 $remainingBytes -= strlen($chunk);
298 if ($len === $remainingBytes || ($verifyExactLength && $len !== strlen($data))) {
299 // couldn't read anything at all OR reached EOF sooner than expected
301 throw new \Kafka\Exception\SocketEOF('Read ' . strlen($data) . ' bytes instead of the requested ' . $len . ' bytes');
306 if (false !== $readable) {
307 $res = stream_get_meta_data($this->stream);
308 if (!empty($res['timed_out'])) {
310 throw new \Kafka\Exception\SocketTimeout('Timed out reading '.$len.' bytes from stream');
314 throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (not readable)');
319 // {{{ public function write()
322 * Write to the socket.
324 * @param string $buf The data to write
327 * @throws \Kafka\Exception\SocketEOF
329 public function write($buf)
332 $write = array($this->stream);
334 // fwrite to a socket may be partial, so loop until we
335 // are done with the entire buffer
337 $buflen = strlen($buf);
338 while ( $written < $buflen ) {
339 // wait for stream to become available for writing
340 $writable = stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec);
342 if ($buflen - $written > self::MAX_WRITE_BUFFER) {
343 // write max buffer size
344 $wrote = fwrite($this->stream, substr($buf, $written, self::MAX_WRITE_BUFFER));
346 // write remaining buffer bytes to stream
347 $wrote = fwrite($this->stream, substr($buf, $written));
349 if ($wrote === -1 || $wrote === false) {
350 throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream, completed writing only ' . $written . ' bytes');
355 if (false !== $writable) {
356 $res = stream_get_meta_data($this->stream);
357 if (!empty($res['timed_out'])) {
358 throw new \Kafka\Exception\SocketTimeout('Timed out writing ' . strlen($buf) . ' bytes to stream after writing ' . $written . ' bytes');
361 throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream');
367 // {{{ public function rewind()
374 public function rewind()
376 if (is_resource($this->stream)) {
377 rewind($this->stream);