]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blob - vendor/nmred/kafka-php/src/Kafka/Socket.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / vendor / nmred / kafka-php / src / Kafka / Socket.php
1 <?php
2 /* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
3 // +---------------------------------------------------------------------------
4 // | SWAN [ $_SWANBR_SLOGAN_$ ]
5 // +---------------------------------------------------------------------------
6 // | Copyright $_SWANBR_COPYRIGHT_$
7 // +---------------------------------------------------------------------------
8 // | Version  $_SWANBR_VERSION_$
9 // +---------------------------------------------------------------------------
10 // | Licensed ( $_SWANBR_LICENSED_URL_$ )
11 // +---------------------------------------------------------------------------
12 // | $_SWANBR_WEB_DOMAIN_$
13 // +---------------------------------------------------------------------------
14
15 namespace Kafka;
16
17 /**
18 +------------------------------------------------------------------------------
19 * Kafka protocol since Kafka v0.8
20 +------------------------------------------------------------------------------
21 *
22 * @package
23 * @version $_SWANBR_VERSION_$
24 * @copyright Copyleft
25 * @author $_SWANBR_AUTHOR_$
26 +------------------------------------------------------------------------------
27 */
28
29 class Socket
30 {
31     // {{{ consts
32
33     const READ_MAX_LEN = 5242880; // read socket max length 5MB
34
35     /**
36      * max write socket buffer
37      * fixed:send of 8192 bytes failed with errno=11 Resource temporarily
38      * unavailable error info
39      */
40     const MAX_WRITE_BUFFER = 4096;
41
42     // }}}
43     // {{{ members
44
45     /**
46      * Send timeout in seconds.
47      *
48      * @var float
49      * @access private
50      */
51     private $sendTimeoutSec = 0;
52
53     /**
54      * Send timeout in microseconds.
55      *
56      * @var float
57      * @access private
58      */
59     private $sendTimeoutUsec = 100000;
60
61     /**
62      * Recv timeout in seconds
63      *
64      * @var float
65      * @access private
66      */
67     private $recvTimeoutSec = 0;
68
69     /**
70      * Recv timeout in microseconds
71      *
72      * @var float
73      * @access private
74      */
75     private $recvTimeoutUsec = 750000;
76
77     /**
78      * Stream resource
79      *
80      * @var mixed
81      * @access private
82      */
83     private $stream = null;
84
85     /**
86      * Socket host
87      *
88      * @var mixed
89      * @access private
90      */
91     private $host = null;
92
93     /**
94      * Socket port
95      *
96      * @var mixed
97      * @access private
98      */
99     private $port = -1;
100
101     // }}}
102     // {{{ functions
103     // {{{ public function __construct()
104
105     /**
106      * __construct
107      *
108      * @access public
109      * @param $host
110      * @param $port
111      * @param int $recvTimeoutSec
112      * @param int $recvTimeoutUsec
113      * @param int $sendTimeoutSec
114      * @param int $sendTimeoutUsec
115      */
116     public function __construct($host, $port, $recvTimeoutSec = 0, $recvTimeoutUsec = 750000, $sendTimeoutSec = 0, $sendTimeoutUsec = 100000)
117     {
118         $this->host = $host;
119         $this->port = $port;
120         $this->setRecvTimeoutSec($recvTimeoutSec);
121         $this->setRecvTimeoutUsec($recvTimeoutUsec);
122         $this->setSendTimeoutSec($sendTimeoutSec);
123         $this->setSendTimeoutUsec($sendTimeoutUsec);
124     }
125
126     /**
127      * @param float $sendTimeoutSec
128      */
129     public function setSendTimeoutSec($sendTimeoutSec)
130     {
131         $this->sendTimeoutSec = $sendTimeoutSec;
132     }
133
134     /**
135      * @param float $sendTimeoutUsec
136      */
137     public function setSendTimeoutUsec($sendTimeoutUsec)
138     {
139         $this->sendTimeoutUsec = $sendTimeoutUsec;
140     }
141
142     /**
143      * @param float $recvTimeoutSec
144      */
145     public function setRecvTimeoutSec($recvTimeoutSec)
146     {
147         $this->recvTimeoutSec = $recvTimeoutSec;
148     }
149
150     /**
151      * @param float $recvTimeoutUsec
152      */
153     public function setRecvTimeoutUsec($recvTimeoutUsec)
154     {
155         $this->recvTimeoutUsec = $recvTimeoutUsec;
156     }
157
158
159
160     // }}}
161     // {{{ public static function createFromStream()
162
163     /**
164      * Optional method to set the internal stream handle
165      *
166      * @static
167      * @access public
168      * @param $stream
169      * @return Socket
170      */
171     public static function createFromStream($stream)
172     {
173         $socket = new self('localhost', 0);
174         $socket->setStream($stream);
175         return $socket;
176     }
177
178     // }}}
179     // {{{ public function setStream()
180
181     /**
182      * Optional method to set the internal stream handle
183      *
184      * @param mixed $stream
185      * @access public
186      * @return void
187      */
188     public function setStream($stream)
189     {
190         $this->stream = $stream;
191     }
192
193     // }}}
194     // {{{ public function connect()
195
196     /**
197      * Connects the socket
198      *
199      * @access public
200      * @return void
201      */
202     public function connect()
203     {
204         if (is_resource($this->stream)) {
205             return;
206         }
207
208         if (empty($this->host)) {
209             throw new \Kafka\Exception('Cannot open null host.');
210         }
211         if ($this->port <= 0) {
212             throw new \Kafka\Exception('Cannot open without port.');
213         }
214
215         $this->stream = @fsockopen(
216             $this->host,
217             $this->port,
218             $errno,
219             $errstr,
220             $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000)
221         );
222
223         if ($this->stream == false) {
224             $error = 'Could not connect to '
225                     . $this->host . ':' . $this->port
226                     . ' ('.$errstr.' ['.$errno.'])';
227             throw new \Kafka\Exception\SocketConnect($error);
228         }
229
230         stream_set_blocking($this->stream, 0);
231     }
232
233     // }}}
234     // {{{ public function close()
235
236     /**
237      * close the socket
238      *
239      * @access public
240      * @return void
241      */
242     public function close()
243     {
244         if (is_resource($this->stream)) {
245             fclose($this->stream);
246         }
247     }
248
249     // }}}
250     // {{{ public function read()
251
252     /**
253      * Read from the socket at most $len bytes.
254      *
255      * This method will not wait for all the requested data, it will return as
256      * soon as any data is received.
257      *
258      * @param integer $len               Maximum number of bytes to read.
259      * @param boolean $verifyExactLength Throw an exception if the number of read bytes is less than $len
260      *
261      * @return string Binary data
262      * @throws \Kafka\Exception\SocketEOF
263      */
264     public function read($len, $verifyExactLength = false)
265     {
266         if ($len > self::READ_MAX_LEN) {
267             throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream, length too longer.');
268         }
269
270         $null = null;
271         $read = array($this->stream);
272         $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
273         if ($readable > 0) {
274             $remainingBytes = $len;
275             $data = $chunk = '';
276             while ($remainingBytes > 0) {
277                 $chunk = fread($this->stream, $remainingBytes);
278                 if ($chunk === false) {
279                     $this->close();
280                     throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
281                 }
282                 if (strlen($chunk) === 0) {
283                     // Zero bytes because of EOF?
284                     if (feof($this->stream)) {
285                         $this->close();
286                         throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
287                     }
288                     // Otherwise wait for bytes
289                     $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
290                     if ($readable !== 1) {
291                         throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
292                     }
293                     continue; // attempt another read
294                 }
295                 $data .= $chunk;
296                 $remainingBytes -= strlen($chunk);
297             }
298             if ($len === $remainingBytes || ($verifyExactLength && $len !== strlen($data))) {
299                 // couldn't read anything at all OR reached EOF sooner than expected
300                 $this->close();
301                 throw new \Kafka\Exception\SocketEOF('Read ' . strlen($data) . ' bytes instead of the requested ' . $len . ' bytes');
302             }
303
304             return $data;
305         }
306         if (false !== $readable) {
307             $res = stream_get_meta_data($this->stream);
308             if (!empty($res['timed_out'])) {
309                 $this->close();
310                 throw new \Kafka\Exception\SocketTimeout('Timed out reading '.$len.' bytes from stream');
311             }
312         }
313         $this->close();
314         throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (not readable)');
315
316     }
317
318     // }}}
319     // {{{ public function write()
320
321     /**
322      * Write to the socket.
323      *
324      * @param string $buf The data to write
325      *
326      * @return integer
327      * @throws \Kafka\Exception\SocketEOF
328      */
329     public function write($buf)
330     {
331         $null = null;
332         $write = array($this->stream);
333
334         // fwrite to a socket may be partial, so loop until we
335         // are done with the entire buffer
336         $written = 0;
337         $buflen = strlen($buf);
338         while ( $written < $buflen ) {
339             // wait for stream to become available for writing
340             $writable = stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec);
341             if ($writable > 0) {
342                 if ($buflen - $written > self::MAX_WRITE_BUFFER) {
343                     // write max buffer size
344                     $wrote = fwrite($this->stream, substr($buf, $written, self::MAX_WRITE_BUFFER));
345                 } else {
346                     // write remaining buffer bytes to stream
347                     $wrote = fwrite($this->stream, substr($buf, $written));
348                 }
349                 if ($wrote === -1 || $wrote === false) {
350                     throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream, completed writing only ' . $written . ' bytes');
351                 }
352                 $written += $wrote;
353                 continue;
354             }
355             if (false !== $writable) {
356                 $res = stream_get_meta_data($this->stream);
357                 if (!empty($res['timed_out'])) {
358                     throw new \Kafka\Exception\SocketTimeout('Timed out writing ' . strlen($buf) . ' bytes to stream after writing ' . $written . ' bytes');
359                 }
360             }
361             throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream');
362         }
363         return $written;
364     }
365
366     // }}}
367     // {{{ public function rewind()
368
369     /**
370      * Rewind the stream
371      *
372      * @return void
373      */
374     public function rewind()
375     {
376         if (is_resource($this->stream)) {
377             rewind($this->stream);
378         }
379     }
380
381     // }}}
382     // }}}
383 }