Merge "Remove unused message 'livepreview-loading'"
[lhc/web/wiklou.git] / includes / SquidPurgeClient.php
1 <?php
2 /**
3 * Squid and Varnish cache purging.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 */
22
23 /**
24 * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
25 * Uses asynchronous I/O, allowing purges to be done in a highly parallel
26 * manner.
27 *
28 * Could be replaced by curl_multi_exec() or some such.
29 */
30 class SquidPurgeClient {
31 var $host, $port, $ip;
32
33 var $readState = 'idle';
34 var $writeBuffer = '';
35 var $requests = array();
36 var $currentRequestIndex;
37
38 const EINTR = 4;
39 const EAGAIN = 11;
40 const EINPROGRESS = 115;
41 const BUFFER_SIZE = 8192;
42
43 /**
44 * The socket resource, or null for unconnected, or false for disabled due to error
45 */
46 var $socket;
47
48 var $readBuffer;
49
50 var $bodyRemaining;
51
52 /**
53 * @param string $server
54 * @param array $options
55 */
56 public function __construct( $server, $options = array() ) {
57 $parts = explode( ':', $server, 2 );
58 $this->host = $parts[0];
59 $this->port = isset( $parts[1] ) ? $parts[1] : 80;
60 }
61
62 /**
63 * Open a socket if there isn't one open already, return it.
64 * Returns false on error.
65 *
66 * @return bool|resource
67 */
68 protected function getSocket() {
69 if ( $this->socket !== null ) {
70 return $this->socket;
71 }
72
73 $ip = $this->getIP();
74 if ( !$ip ) {
75 $this->log( "DNS error" );
76 $this->markDown();
77 return false;
78 }
79 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
80 socket_set_nonblock( $this->socket );
81 wfSuppressWarnings();
82 $ok = socket_connect( $this->socket, $ip, $this->port );
83 wfRestoreWarnings();
84 if ( !$ok ) {
85 $error = socket_last_error( $this->socket );
86 if ( $error !== self::EINPROGRESS ) {
87 $this->log( "connection error: " . socket_strerror( $error ) );
88 $this->markDown();
89 return false;
90 }
91 }
92
93 return $this->socket;
94 }
95
96 /**
97 * Get read socket array for select()
98 * @return array
99 */
100 public function getReadSocketsForSelect() {
101 if ( $this->readState == 'idle' ) {
102 return array();
103 }
104 $socket = $this->getSocket();
105 if ( $socket === false ) {
106 return array();
107 }
108 return array( $socket );
109 }
110
111 /**
112 * Get write socket array for select()
113 * @return array
114 */
115 public function getWriteSocketsForSelect() {
116 if ( !strlen( $this->writeBuffer ) ) {
117 return array();
118 }
119 $socket = $this->getSocket();
120 if ( $socket === false ) {
121 return array();
122 }
123 return array( $socket );
124 }
125
126 /**
127 * Get the host's IP address.
128 * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
129 * @return string
130 */
131 protected function getIP() {
132 if ( $this->ip === null ) {
133 if ( IP::isIPv4( $this->host ) ) {
134 $this->ip = $this->host;
135 } elseif ( IP::isIPv6( $this->host ) ) {
136 throw new MWException( '$wgSquidServers does not support IPv6' );
137 } else {
138 wfSuppressWarnings();
139 $this->ip = gethostbyname( $this->host );
140 if ( $this->ip === $this->host ) {
141 $this->ip = false;
142 }
143 wfRestoreWarnings();
144 }
145 }
146 return $this->ip;
147 }
148
149 /**
150 * Close the socket and ignore any future purge requests.
151 * This is called if there is a protocol error.
152 */
153 protected function markDown() {
154 $this->close();
155 $this->socket = false;
156 }
157
158 /**
159 * Close the socket but allow it to be reopened for future purge requests
160 */
161 public function close() {
162 if ( $this->socket ) {
163 wfSuppressWarnings();
164 socket_set_block( $this->socket );
165 socket_shutdown( $this->socket );
166 socket_close( $this->socket );
167 wfRestoreWarnings();
168 }
169 $this->socket = null;
170 $this->readBuffer = '';
171 // Write buffer is kept since it may contain a request for the next socket
172 }
173
174 /**
175 * Queue a purge operation
176 *
177 * @param string $url
178 */
179 public function queuePurge( $url ) {
180 global $wgSquidPurgeUseHostHeader;
181 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
182 $request = array();
183 if ( $wgSquidPurgeUseHostHeader ) {
184 $url = wfParseUrl( $url );
185 $host = $url['host'];
186 if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) {
187 $host .= ":" . $url['port'];
188 }
189 $path = $url['path'];
190 if ( isset( $url['query'] ) && is_string( $url['query'] ) ) {
191 $path = wfAppendQuery( $path, $url['query'] );
192 }
193 $request[] = "PURGE $path HTTP/1.1";
194 $request[] = "Host: $host";
195 } else {
196 $request[] = "PURGE $url HTTP/1.0";
197 }
198 $request[] = "Connection: Keep-Alive";
199 $request[] = "Proxy-Connection: Keep-Alive";
200 $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__;
201 // Two ''s to create \r\n\r\n
202 $request[] = '';
203 $request[] = '';
204
205 $this->requests[] = implode( "\r\n", $request );
206 if ( $this->currentRequestIndex === null ) {
207 $this->nextRequest();
208 }
209 }
210
211 /**
212 * @return bool
213 */
214 public function isIdle() {
215 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
216 }
217
218 /**
219 * Perform pending writes. Call this when socket_select() indicates that writing will not block.
220 */
221 public function doWrites() {
222 if ( !strlen( $this->writeBuffer ) ) {
223 return;
224 }
225 $socket = $this->getSocket();
226 if ( !$socket ) {
227 return;
228 }
229
230 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
231 $buf = $this->writeBuffer;
232 $flags = MSG_EOR;
233 } else {
234 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
235 $flags = 0;
236 }
237 wfSuppressWarnings();
238 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
239 wfRestoreWarnings();
240
241 if ( $bytesSent === false ) {
242 $error = socket_last_error( $socket );
243 if ( $error != self::EAGAIN && $error != self::EINTR ) {
244 $this->log( 'write error: ' . socket_strerror( $error ) );
245 $this->markDown();
246 }
247 return;
248 }
249
250 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
251 }
252
253 /**
254 * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
255 */
256 public function doReads() {
257 $socket = $this->getSocket();
258 if ( !$socket ) {
259 return;
260 }
261
262 $buf = '';
263 wfSuppressWarnings();
264 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
265 wfRestoreWarnings();
266 if ( $bytesRead === false ) {
267 $error = socket_last_error( $socket );
268 if ( $error != self::EAGAIN && $error != self::EINTR ) {
269 $this->log( 'read error: ' . socket_strerror( $error ) );
270 $this->markDown();
271 return;
272 }
273 } elseif ( $bytesRead === 0 ) {
274 // Assume EOF
275 $this->close();
276 return;
277 }
278
279 $this->readBuffer .= $buf;
280 while ( $this->socket && $this->processReadBuffer() === 'continue' );
281 }
282
283 /**
284 * @throws MWException
285 * @return string
286 */
287 protected function processReadBuffer() {
288 switch ( $this->readState ) {
289 case 'idle':
290 return 'done';
291 case 'status':
292 case 'header':
293 $lines = explode( "\r\n", $this->readBuffer, 2 );
294 if ( count( $lines ) < 2 ) {
295 return 'done';
296 }
297 if ( $this->readState == 'status' ) {
298 $this->processStatusLine( $lines[0] );
299 } else { // header
300 $this->processHeaderLine( $lines[0] );
301 }
302 $this->readBuffer = $lines[1];
303 return 'continue';
304 case 'body':
305 if ( $this->bodyRemaining !== null ) {
306 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
307 $this->bodyRemaining -= strlen( $this->readBuffer );
308 $this->readBuffer = '';
309 return 'done';
310 } else {
311 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
312 $this->bodyRemaining = 0;
313 $this->nextRequest();
314 return 'continue';
315 }
316 } else {
317 // No content length, read all data to EOF
318 $this->readBuffer = '';
319 return 'done';
320 }
321 default:
322 throw new MWException( __METHOD__ . ': unexpected state' );
323 }
324 }
325
326 /**
327 * @param string $line
328 * @return
329 */
330 protected function processStatusLine( $line ) {
331 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
332 $this->log( 'invalid status line' );
333 $this->markDown();
334 return;
335 }
336 list( , , , $status, $reason ) = $m;
337 $status = intval( $status );
338 if ( $status !== 200 && $status !== 404 ) {
339 $this->log( "unexpected status code: $status $reason" );
340 $this->markDown();
341 return;
342 }
343 $this->readState = 'header';
344 }
345
346 /**
347 * @param string $line
348 */
349 protected function processHeaderLine( $line ) {
350 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
351 $this->bodyRemaining = intval( $m[1] );
352 } elseif ( $line === '' ) {
353 $this->readState = 'body';
354 }
355 }
356
357 protected function nextRequest() {
358 if ( $this->currentRequestIndex !== null ) {
359 unset( $this->requests[$this->currentRequestIndex] );
360 }
361 if ( count( $this->requests ) ) {
362 $this->readState = 'status';
363 $this->currentRequestIndex = key( $this->requests );
364 $this->writeBuffer = $this->requests[$this->currentRequestIndex];
365 } else {
366 $this->readState = 'idle';
367 $this->currentRequestIndex = null;
368 $this->writeBuffer = '';
369 }
370 $this->bodyRemaining = null;
371 }
372
373 /**
374 * @param string $msg
375 */
376 protected function log( $msg ) {
377 wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg" );
378 }
379 }
380
381 class SquidPurgeClientPool {
382
383 /**
384 * @var array of SquidPurgeClient
385 */
386 var $clients = array();
387 var $timeout = 5;
388
389 /**
390 * @param array $options
391 */
392 function __construct( $options = array() ) {
393 if ( isset( $options['timeout'] ) ) {
394 $this->timeout = $options['timeout'];
395 }
396 }
397
398 /**
399 * @param SquidPurgeClient $client
400 * @return void
401 */
402 public function addClient( $client ) {
403 $this->clients[] = $client;
404 }
405
406 public function run() {
407 $done = false;
408 $startTime = microtime( true );
409 while ( !$done ) {
410 $readSockets = $writeSockets = array();
411 /**
412 * @var $client SquidPurgeClient
413 */
414 foreach ( $this->clients as $clientIndex => $client ) {
415 $sockets = $client->getReadSocketsForSelect();
416 foreach ( $sockets as $i => $socket ) {
417 $readSockets["$clientIndex/$i"] = $socket;
418 }
419 $sockets = $client->getWriteSocketsForSelect();
420 foreach ( $sockets as $i => $socket ) {
421 $writeSockets["$clientIndex/$i"] = $socket;
422 }
423 }
424 if ( !count( $readSockets ) && !count( $writeSockets ) ) {
425 break;
426 }
427 $exceptSockets = null;
428 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
429 wfSuppressWarnings();
430 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
431 wfRestoreWarnings();
432 if ( $numReady === false ) {
433 wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' .
434 socket_strerror( socket_last_error() ) . "\n" );
435 break;
436 }
437 // Check for timeout, use 1% tolerance since we aimed at having socket_select()
438 // exit at precisely the overall timeout
439 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
440 wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" );
441 break;
442 } elseif ( !$numReady ) {
443 continue;
444 }
445
446 foreach ( $readSockets as $key => $socket ) {
447 list( $clientIndex, ) = explode( '/', $key );
448 $client = $this->clients[$clientIndex];
449 $client->doReads();
450 }
451 foreach ( $writeSockets as $key => $socket ) {
452 list( $clientIndex, ) = explode( '/', $key );
453 $client = $this->clients[$clientIndex];
454 $client->doWrites();
455 }
456
457 $done = true;
458 foreach ( $this->clients as $client ) {
459 if ( !$client->isIdle() ) {
460 $done = false;
461 }
462 }
463 }
464 foreach ( $this->clients as $client ) {
465 $client->close();
466 }
467 }
468 }