--- /dev/null
+<?php
+/**
+ * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
+ * Uses asynchronous I/O, allowing purges to be done in a highly parallel
+ * manner.
+ *
+ * Could be replaced by curl_multi_exec() or some such.
+ */
+class SquidPurgeClient {
+ var $host, $port, $ip;
+
+ var $readState = 'idle';
+ var $writeBuffer = '';
+ var $requests = array();
+ var $currentRequestIndex;
+
+ const EINTR = 4;
+ const EAGAIN = 11;
+ const EINPROGRESS = 115;
+ const BUFFER_SIZE = 8192;
+
+ /**
+ * The socket resource, or null for unconnected, or false for disabled due to error
+ */
+ var $socket;
+
+ public function __construct( $server, $options = array() ) {
+ $parts = explode( ':', $server, 2 );
+ $this->host = $parts[0];
+ $this->port = isset( $parts[1] ) ? $parts[1] : 80;
+ }
+
+ /**
+ * Open a socket if there isn't one open already, return it.
+ * Returns false on error.
+ */
+ protected function getSocket() {
+ if ( $this->socket !== null ) {
+ return $this->socket;
+ }
+
+ $ip = $this->getIP();
+ if ( !$ip ) {
+ $this->log( "DNS error" );
+ $this->markDown();
+ return false;
+ }
+ $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
+ socket_set_nonblock( $this->socket );
+ wfSuppressWarnings();
+ $ok = socket_connect( $this->socket, $ip, $this->port );
+ wfRestoreWarnings();
+ if ( !$ok ) {
+ $error = socket_last_error( $this->socket );
+ if ( $error !== self::EINPROGRESS ) {
+ $this->log( "connection error: " . socket_strerror( $error ) );
+ $this->markDown();
+ return false;
+ }
+ }
+
+ return $this->socket;
+ }
+
+ /**
+ * Get read socket array for select()
+ */
+ public function getReadSocketsForSelect() {
+ if ( $this->readState == 'idle' ) {
+ return array();
+ }
+ $socket = $this->getSocket();
+ if ( $socket === false ) {
+ return array();
+ }
+ return array( $socket );
+ }
+
+ /**
+ * Get write socket array for select()
+ */
+ public function getWriteSocketsForSelect() {
+ if ( !strlen( $this->writeBuffer ) ) {
+ return array();
+ }
+ $socket = $this->getSocket();
+ if ( $socket === false ) {
+ return array();
+ }
+ return array( $socket );
+ }
+
+ /**
+ * Get the host's IP address.
+ * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
+ */
+ protected function getIP() {
+ if ( $this->ip === null ) {
+ if ( IP::isIPv4( $this->host ) ) {
+ $this->ip = $this->host;
+ } elseif ( IP::isIPv6( $this->host ) ) {
+ throw new MWException( '$wgSquidServers does not support IPv6' );
+ } else {
+ wfSuppressWarnings();
+ $this->ip = gethostbyname( $this->host );
+ if ( $this->ip === $this->host ) {
+ $this->ip = false;
+ }
+ wfRestoreWarnings();
+ }
+ }
+ return $this->ip;
+ }
+
+ /**
+ * Close the socket and ignore any future purge requests.
+ * This is called if there is a protocol error.
+ */
+ protected function markDown() {
+ $this->close();
+ $this->socket = false;
+ }
+
+ /**
+ * Close the socket but allow it to be reopened for future purge requests
+ */
+ public function close() {
+ if ( $this->socket ) {
+ wfSuppressWarnings();
+ socket_set_block( $this->socket );
+ socket_shutdown( $this->socket );
+ socket_close( $this->socket );
+ wfRestoreWarnings();
+ }
+ $this->socket = null;
+ $this->readBuffer = '';
+ // Write buffer is kept since it may contain a request for the next socket
+ }
+
+ /**
+ * Queue a purge operation
+ */
+ public function queuePurge( $url ) {
+ $url = str_replace( "\n", '', $url );
+ $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
+ "Connection: Keep-Alive\r\n" .
+ "Proxy-Connection: Keep-Alive\r\n" .
+ "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
+ if ( $this->currentRequestIndex === null ) {
+ $this->nextRequest();
+ }
+ }
+
+ public function isIdle() {
+ return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
+ }
+
+ /**
+ * Perform pending writes. Call this when socket_select() indicates that writing will not block.
+ */
+ public function doWrites() {
+ if ( !strlen( $this->writeBuffer ) ) {
+ return;
+ }
+ $socket = $this->getSocket();
+ if ( !$socket ) {
+ return;
+ }
+
+ if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
+ $buf = $this->writeBuffer;
+ $flags = MSG_EOR;
+ } else {
+ $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
+ $flags = 0;
+ }
+ wfSuppressWarnings();
+ $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
+ wfRestoreWarnings();
+
+ if ( $bytesSent === false ) {
+ $error = socket_last_error( $socket );
+ if ( $error != self::EAGAIN && $error != self::EINTR ) {
+ $this->log( 'write error: ' . socket_strerror( $error ) );
+ $this->markDown();
+ }
+ return;
+ }
+
+ $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
+ }
+
+ /**
+ * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
+ */
+ public function doReads() {
+ $socket = $this->getSocket();
+ if ( !$socket ) {
+ return;
+ }
+
+ $buf = '';
+ wfSuppressWarnings();
+ $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
+ wfRestoreWarnings();
+ if ( $bytesRead === false ) {
+ $error = socket_last_error( $socket );
+ if ( $error != self::EAGAIN && $error != self::EINTR ) {
+ $this->log( 'read error: ' . socket_strerror( $error ) );
+ $this->markDown();
+ return;
+ }
+ } elseif ( $bytesRead === 0 ) {
+ // Assume EOF
+ $this->close();
+ return;
+ }
+
+ $this->readBuffer .= $buf;
+ while ( $this->socket && $this->processReadBuffer() === 'continue' );
+ }
+
+ protected function processReadBuffer() {
+ switch ( $this->readState ) {
+ case 'idle':
+ return 'done';
+ case 'status':
+ case 'header':
+ $lines = explode( "\r\n", $this->readBuffer, 2 );
+ if ( count( $lines ) < 2 ) {
+ return 'done';
+ }
+ if ( $this->readState == 'status' ) {
+ $this->processStatusLine( $lines[0] );
+ } else { // header
+ $this->processHeaderLine( $lines[0] );
+ }
+ $this->readBuffer = $lines[1];
+ return 'continue';
+ case 'body':
+ if ( $this->bodyRemaining !== null ) {
+ if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
+ $this->bodyRemaining -= strlen( $this->readBuffer );
+ $this->readBuffer = '';
+ return 'done';
+ } else {
+ $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
+ $this->bodyRemaining = 0;
+ $this->nextRequest();
+ return 'continue';
+ }
+ } else {
+ // No content length, read all data to EOF
+ $this->readBuffer = '';
+ return 'done';
+ }
+ default:
+ throw new MWException( __METHOD__.': unexpected state' );
+ }
+ }
+
+ protected function processStatusLine( $line ) {
+ if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
+ $this->log( 'invalid status line' );
+ $this->markDown();
+ return;
+ }
+ list( $all, $major, $minor, $status, $reason ) = $m;
+ $status = intval( $status );
+ if ( $status !== 200 && $status !== 404 ) {
+ $this->log( "unexpected status code: $status $reason" );
+ $this->markDown();
+ return;
+ }
+ $this->readState = 'header';
+ }
+
+ protected function processHeaderLine( $line ) {
+ if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
+ $this->bodyRemaining = intval( $m[1] );
+ } elseif ( $line === '' ) {
+ $this->readState = 'body';
+ }
+ }
+
+ protected function nextRequest() {
+ if ( $this->currentRequestIndex !== null ) {
+ unset( $this->requests[$this->currentRequestIndex] );
+ }
+ if ( count( $this->requests ) ) {
+ $this->readState = 'status';
+ $this->currentRequestIndex = key( $this->requests );
+ $this->writeBuffer = $this->requests[$this->currentRequestIndex];
+ } else {
+ $this->readState = 'idle';
+ $this->currentRequestIndex = null;
+ $this->writeBuffer = '';
+ }
+ $this->bodyRemaining = null;
+ }
+
+ protected function log( $msg ) {
+ wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
+ }
+}
+
+class SquidPurgeClientPool {
+ var $clients = array();
+ var $timeout = 5;
+
+ function __construct( $options = array() ) {
+ if ( isset( $options['timeout'] ) ) {
+ $this->timeout = $options['timeout'];
+ }
+ }
+
+ public function addClient( $client ) {
+ $this->clients[] = $client;
+ }
+
+ public function run() {
+ $done = false;
+ $startTime = microtime( true );
+ while ( !$done ) {
+ $readSockets = $writeSockets = array();
+ foreach ( $this->clients as $clientIndex => $client ) {
+ $sockets = $client->getReadSocketsForSelect();
+ foreach ( $sockets as $i => $socket ) {
+ $readSockets["$clientIndex/$i"] = $socket;
+ }
+ $sockets = $client->getWriteSocketsForSelect();
+ foreach ( $sockets as $i => $socket ) {
+ $writeSockets["$clientIndex/$i"] = $socket;
+ }
+ }
+ if ( !count( $readSockets ) && !count( $writeSockets ) ) {
+ break;
+ }
+ $exceptSockets = null;
+ $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
+ wfSuppressWarnings();
+ $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
+ wfRestoreWarnings();
+ if ( $numReady === false ) {
+ wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' .
+ socket_strerror( socket_last_error() ) . "\n" );
+ break;
+ }
+ // Check for timeout, use 1% tolerance since we aimed at having socket_select()
+ // exit at precisely the overall timeout
+ if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
+ wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
+ break;
+ } elseif ( !$numReady ) {
+ continue;
+ }
+
+ foreach ( $readSockets as $key => $socket ) {
+ list( $clientIndex, $i ) = explode( '/', $key );
+ $client = $this->clients[$clientIndex];
+ $client->doReads();
+ }
+ foreach ( $writeSockets as $key => $socket ) {
+ list( $clientIndex, $i ) = explode( '/', $key );
+ $client = $this->clients[$clientIndex];
+ $client->doWrites();
+ }
+
+ $done = true;
+ foreach ( $this->clients as $client ) {
+ if ( !$client->isIdle() ) {
+ $done = false;
+ }
+ }
+ }
+ foreach ( $this->clients as $client ) {
+ $client->close();
+ }
+ }
+}
XXX report broken Squids per mail or log */
static function purge( $urlArr ) {
- global $wgSquidServers, $wgHTCPMulticastAddress, $wgHTCPPort, $wgSquidResponseLimit;
+ global $wgSquidServers, $wgHTCPMulticastAddress, $wgHTCPPort;
/*if ( (@$wgSquidServers[0]) == 'echo' ) {
echo implode("<br />\n", $urlArr) . "<br />\n";
return;
}*/
- if( empty( $urlArr ) ) {
+ if( !$urlArr ) {
return;
}
wfProfileIn( __METHOD__ );
- $maxsocketspersquid = 8; // socket cap per Squid
- $urlspersocket = 400; // 400 seems to be a good tradeoff, opening a socket takes a while
- $firsturl = SquidUpdate::expand( $urlArr[0] );
- unset($urlArr[0]);
- $urlArr = array_values($urlArr);
- $sockspersq = max(ceil(count($urlArr) / $urlspersocket ),1);
- if ($sockspersq == 1) {
- /* the most common case */
- $urlspersocket = count($urlArr);
- } else if ($sockspersq > $maxsocketspersquid ) {
- $urlspersocket = ceil(count($urlArr) / $maxsocketspersquid);
- $sockspersq = $maxsocketspersquid;
+ $maxSocketsPerSquid = 8; // socket cap per Squid
+ $urlsPerSocket = 400; // 400 seems to be a good tradeoff, opening a socket takes a while
+ $socketsPerSquid = ceil( count( $urlArr ) / $urlsPerSocket );
+ if ( $socketsPerSquid > $maxSocketsPerSquid ) {
+ $socketsPerSquid = $maxSocketsPerSquid;
}
- $totalsockets = count($wgSquidServers) * $sockspersq;
- $sockets = Array();
- /* this sets up the sockets and tests the first socket for each server. */
- for ($ss=0;$ss < count($wgSquidServers);$ss++) {
- $failed = false;
- $so = 0;
- while ($so < $sockspersq && !$failed) {
- if ($so == 0) {
- /* first socket for this server, do the tests */
- @list($server, $port) = explode(':', $wgSquidServers[$ss]);
- if(!isset($port)) $port = 80;
- #$this->debug("Opening socket to $server:$port");
- $error = $errstr = false;
- $socket = @fsockopen($server, $port, $error, $errstr, 3);
- #$this->debug("\n");
- if (!$socket) {
- $failed = true;
- $totalsockets -= $sockspersq;
- } else {
- $msg = 'PURGE ' . $firsturl . " HTTP/1.0\r\n".
- "Connection: Keep-Alive\r\n\r\n";
- #$this->debug($msg);
- @fputs($socket,$msg);
- #$this->debug("...");
- $res = @fread($socket,512);
- #$this->debug("\n");
- /* Squid only returns http headers with 200 or 404 status,
- if there's more returned something's wrong */
- if (strlen($res) > $wgSquidResponseLimit) {
- fclose($socket);
- $failed = true;
- $totalsockets -= $sockspersq;
- } else {
- @stream_set_blocking($socket,false);
- $sockets[] = $socket;
- }
- }
- } else {
- /* open the remaining sockets for this server */
- list($server, $port) = explode(':', $wgSquidServers[$ss]);
- if(!isset($port)) $port = 80;
- $socket = @fsockopen($server, $port, $error, $errstr, 2);
- @stream_set_blocking($socket,false);
- $sockets[] = $socket;
+ $pool = new SquidPurgeClientPool;
+ $chunks = array_chunk( $urlArr, ceil( count( $urlArr ) / $socketsPerSquid ) );
+ foreach ( $wgSquidServers as $server ) {
+ foreach ( $chunks as $chunk ) {
+ $client = new SquidPurgeClient( $server );
+ foreach ( $chunk as $url ) {
+ $client->queuePurge( $url );
}
- $so++;
+ $pool->addClient( $client );
}
}
+ $pool->run();
- if ($urlspersocket > 0) {
- /* now do the heavy lifting. The fread() relies on Squid returning only the headers */
- for ($r=0;$r < $urlspersocket;$r++) {
- for ($s=0;$s < $totalsockets;$s++) {
- if($r != 0) {
- $res = '';
- $esc = 0;
- while (strlen($res) < 100 && $esc < 200 ) {
- $res .= @fread($sockets[$s],512);
- $esc++;
- usleep(20);
- }
- }
- $urindex = $r + $urlspersocket * ($s - $sockspersq * floor($s / $sockspersq));
- $url = SquidUpdate::expand( $urlArr[$urindex] );
- $msg = 'PURGE ' . $url . " HTTP/1.0\r\n".
- "Connection: Keep-Alive\r\n\r\n";
- #$this->debug($msg);
- @fputs($sockets[$s],$msg);
- #$this->debug("\n");
- }
- }
- }
- #$this->debug("Reading response...");
- foreach ($sockets as $socket) {
- $res = '';
- $esc = 0;
- while (strlen($res) < 100 && $esc < 200 ) {
- $res .= @fread($socket,1024);
- $esc++;
- usleep(20);
- }
-
- @fclose($socket);
- }
- #$this->debug("\n");
wfProfileOut( __METHOD__ );
}
wfProfileOut( __METHOD__ );
}
- function debug( $text ) {
- global $wgDebugSquid;
- if ( $wgDebugSquid ) {
- wfDebug( $text );
- }
- }
-
/**
* Expand local URLs to fully-qualified URLs using the internal protocol
* and host defined in $wgInternalServer. Input that's already fully-