]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blob - includes/SquidPurgeClient.php
MediaWiki 1.16.0
[autoinstallsdev/mediawiki.git] / includes / SquidPurgeClient.php
1 <?php
2 /**
3  * An HTTP 1.0 client built for the purposes of purging Squid and Varnish. 
4  * Uses asynchronous I/O, allowing purges to be done in a highly parallel 
5  * manner. 
6  *
7  * Could be replaced by curl_multi_exec() or some such.
8  */
9 class SquidPurgeClient {
10         var $host, $port, $ip;
11
12         var $readState = 'idle';
13         var $writeBuffer = '';
14         var $requests = array();
15         var $currentRequestIndex;
16
17         const EINTR = 4;
18         const EAGAIN = 11;
19         const EINPROGRESS = 115;
20         const BUFFER_SIZE = 8192;
21
22         /**
23          * The socket resource, or null for unconnected, or false for disabled due to error
24          */
25         var $socket;
26         
27         public function __construct( $server, $options = array() ) {
28                 $parts = explode( ':', $server, 2 );
29                 $this->host = $parts[0];
30                 $this->port = isset( $parts[1] ) ? $parts[1] : 80;
31         }
32
33         /**
34          * Open a socket if there isn't one open already, return it.
35          * Returns false on error.
36          */
37         protected function getSocket() {
38                 if ( $this->socket !== null ) {
39                         return $this->socket;
40                 }
41
42                 $ip = $this->getIP();
43                 if ( !$ip ) {
44                         $this->log( "DNS error" );
45                         $this->markDown();
46                         return false;
47                 }
48                 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
49                 socket_set_nonblock( $this->socket );
50                 wfSuppressWarnings();
51                 $ok = socket_connect( $this->socket, $ip, $this->port );
52                 wfRestoreWarnings();
53                 if ( !$ok ) {
54                         $error = socket_last_error( $this->socket );
55                         if ( $error !== self::EINPROGRESS ) {
56                                 $this->log( "connection error: " . socket_strerror( $error ) );
57                                 $this->markDown();
58                                 return false;
59                         }
60                 }
61
62                 return $this->socket;
63         }
64
65         /**
66          * Get read socket array for select()
67          */
68         public function getReadSocketsForSelect() {
69                 if ( $this->readState == 'idle' ) {
70                         return array();
71                 }
72                 $socket = $this->getSocket();
73                 if ( $socket === false ) {
74                         return array();
75                 }
76                 return array( $socket );
77         }
78
79         /**
80          * Get write socket array for select()
81          */
82         public function getWriteSocketsForSelect() {
83                 if ( !strlen( $this->writeBuffer ) ) {
84                         return array();
85                 }
86                 $socket = $this->getSocket();
87                 if ( $socket === false ) {
88                         return array();
89                 }
90                 return array( $socket );
91         }
92
93         /** 
94          * Get the host's IP address.
95          * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
96          */
97         protected function getIP() {
98                 if ( $this->ip === null ) {
99                         if ( IP::isIPv4( $this->host ) ) {
100                                 $this->ip = $this->host;
101                         } elseif ( IP::isIPv6( $this->host ) ) {
102                                 throw new MWException( '$wgSquidServers does not support IPv6' );
103                         } else {
104                                 wfSuppressWarnings();
105                                 $this->ip = gethostbyname( $this->host );
106                                 if ( $this->ip === $this->host ) {
107                                         $this->ip = false;
108                                 }
109                                 wfRestoreWarnings();
110                         }
111                 }
112                 return $this->ip;
113         }
114
115         /**
116          * Close the socket and ignore any future purge requests.
117          * This is called if there is a protocol error.
118          */
119         protected function markDown() {
120                 $this->close();
121                 $this->socket = false;
122         }
123
124         /**
125          * Close the socket but allow it to be reopened for future purge requests
126          */
127         public function close() {
128                 if ( $this->socket ) {
129                         wfSuppressWarnings();
130                         socket_set_block( $this->socket );
131                         socket_shutdown( $this->socket );
132                         socket_close( $this->socket );
133                         wfRestoreWarnings();
134                 }
135                 $this->socket = null;
136                 $this->readBuffer = '';
137                 // Write buffer is kept since it may contain a request for the next socket
138         }
139
140         /**
141          * Queue a purge operation
142          */
143         public function queuePurge( $url ) {
144                 $url = str_replace( "\n", '', $url );
145                 $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
146                         "Connection: Keep-Alive\r\n" .
147                         "Proxy-Connection: Keep-Alive\r\n" .
148                         "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
149                 if ( $this->currentRequestIndex === null ) {
150                         $this->nextRequest();
151                 }
152         }
153
154         public function isIdle() {
155                 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
156         }
157
158         /**
159          * Perform pending writes. Call this when socket_select() indicates that writing will not block.
160          */
161         public function doWrites() {
162                 if ( !strlen( $this->writeBuffer ) ) {
163                         return;
164                 }
165                 $socket = $this->getSocket();
166                 if ( !$socket ) {
167                         return;
168                 }
169
170                 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
171                         $buf = $this->writeBuffer;
172                         $flags = MSG_EOR;
173                 } else {
174                         $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
175                         $flags = 0;
176                 }
177                 wfSuppressWarnings();
178                 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
179                 wfRestoreWarnings();
180
181                 if ( $bytesSent === false ) {
182                         $error = socket_last_error( $socket );
183                         if ( $error != self::EAGAIN && $error != self::EINTR ) {
184                                 $this->log( 'write error: ' . socket_strerror( $error ) );
185                                 $this->markDown();
186                         }
187                         return;
188                 }
189
190                 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
191         }
192
193         /**
194          * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
195          */
196         public function doReads() {
197                 $socket = $this->getSocket();
198                 if ( !$socket ) {
199                         return;
200                 }
201
202                 $buf = '';
203                 wfSuppressWarnings();
204                 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
205                 wfRestoreWarnings();
206                 if ( $bytesRead === false ) {
207                         $error = socket_last_error( $socket );
208                         if ( $error != self::EAGAIN && $error != self::EINTR ) {
209                                 $this->log( 'read error: ' . socket_strerror( $error ) );
210                                 $this->markDown();
211                                 return;
212                         }
213                 } elseif ( $bytesRead === 0 ) {
214                         // Assume EOF
215                         $this->close();
216                         return;
217                 }
218
219                 $this->readBuffer .= $buf;
220                 while ( $this->socket && $this->processReadBuffer() === 'continue' );
221         }
222
223         protected function processReadBuffer() {
224                 switch ( $this->readState ) {
225                 case 'idle':
226                         return 'done';
227                 case 'status':
228                 case 'header':
229                         $lines = explode( "\r\n", $this->readBuffer, 2 );
230                         if ( count( $lines ) < 2 ) {
231                                 return 'done';
232                         }
233                         if ( $this->readState == 'status' )  {
234                                 $this->processStatusLine( $lines[0] );
235                         } else { // header
236                                 $this->processHeaderLine( $lines[0] );
237                         }
238                         $this->readBuffer = $lines[1];
239                         return 'continue';
240                 case 'body':
241                         if ( $this->bodyRemaining !== null ) {
242                                 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
243                                         $this->bodyRemaining -= strlen( $this->readBuffer );
244                                         $this->readBuffer = '';
245                                         return 'done';
246                                 } else {
247                                         $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
248                                         $this->bodyRemaining = 0;
249                                         $this->nextRequest();
250                                         return 'continue';
251                                 }
252                         } else {
253                                 // No content length, read all data to EOF
254                                 $this->readBuffer = '';
255                                 return 'done';
256                         }
257                 default:
258                         throw new MWException( __METHOD__.': unexpected state' );
259                 }
260         }
261
262         protected function processStatusLine( $line ) {
263                 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
264                         $this->log( 'invalid status line' );
265                         $this->markDown();
266                         return;
267                 }
268                 list( $all, $major, $minor, $status, $reason ) = $m;
269                 $status = intval( $status );
270                 if ( $status !== 200 && $status !== 404 ) {
271                         $this->log( "unexpected status code: $status $reason" );
272                         $this->markDown();
273                         return;
274                 }
275                 $this->readState = 'header';
276         }
277
278         protected function processHeaderLine( $line ) {
279                 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
280                         $this->bodyRemaining = intval( $m[1] );
281                 } elseif ( $line === '' ) {
282                         $this->readState = 'body';
283                 }
284         }
285
286         protected function nextRequest() {
287                 if ( $this->currentRequestIndex !== null ) {
288                         unset( $this->requests[$this->currentRequestIndex] );
289                 }
290                 if ( count( $this->requests ) ) {
291                         $this->readState = 'status';
292                         $this->currentRequestIndex = key( $this->requests );
293                         $this->writeBuffer = $this->requests[$this->currentRequestIndex];
294                 } else {
295                         $this->readState = 'idle';
296                         $this->currentRequestIndex = null;
297                         $this->writeBuffer = '';
298                 }
299                 $this->bodyRemaining = null;
300         }
301
302         protected function log( $msg ) {
303                 wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
304         }
305 }
306
307 class SquidPurgeClientPool {
308         var $clients = array();
309         var $timeout = 5;
310
311         function __construct( $options = array() ) {
312                 if ( isset( $options['timeout'] ) ) {
313                         $this->timeout = $options['timeout'];
314                 }
315         }
316
317         public function addClient( $client ) {
318                 $this->clients[] = $client;
319         }
320
321         public function run() {
322                 $done = false;
323                 $startTime = microtime( true );
324                 while ( !$done ) {
325                         $readSockets = $writeSockets = array();
326                         foreach ( $this->clients as $clientIndex => $client ) {
327                                 $sockets = $client->getReadSocketsForSelect();
328                                 foreach ( $sockets as $i => $socket ) {
329                                         $readSockets["$clientIndex/$i"] = $socket;
330                                 }
331                                 $sockets = $client->getWriteSocketsForSelect();
332                                 foreach ( $sockets as $i => $socket ) {
333                                         $writeSockets["$clientIndex/$i"] = $socket;
334                                 }
335                         }
336                         if ( !count( $readSockets ) && !count( $writeSockets ) ) {
337                                 break;
338                         }
339                         $exceptSockets = null;
340                         $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
341                         wfSuppressWarnings();
342                         $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
343                         wfRestoreWarnings();
344                         if ( $numReady === false ) {
345                                 wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' . 
346                                         socket_strerror( socket_last_error() ) . "\n" );
347                                 break;
348                         }
349                         // Check for timeout, use 1% tolerance since we aimed at having socket_select()
350                         // exit at precisely the overall timeout
351                         if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
352                                 wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
353                                 break;
354                         } elseif ( !$numReady ) {
355                                 continue;
356                         }
357
358                         foreach ( $readSockets as $key => $socket ) {
359                                 list( $clientIndex, $i ) = explode( '/', $key );
360                                 $client = $this->clients[$clientIndex];
361                                 $client->doReads();
362                         }
363                         foreach ( $writeSockets as $key => $socket ) {
364                                 list( $clientIndex, $i ) = explode( '/', $key );
365                                 $client = $this->clients[$clientIndex];
366                                 $client->doWrites();
367                         }
368
369                         $done = true;
370                         foreach ( $this->clients as $client ) {
371                                 if ( !$client->isIdle() ) {
372                                         $done = false;
373                                 }
374                         }
375                 }
376                 foreach ( $this->clients as $client ) {
377                         $client->close();
378                 }
379         }
380 }