From cb982cfa1cf5a99a53e704e25616a48825a176bd Mon Sep 17 00:00:00 2001 From: Tim Starling Date: Fri, 5 Feb 2010 03:36:04 +0000 Subject: [PATCH] * (bug 21551) Rewrote the Squid purge HTTP client to provide a more robust and general implementation of HTTP, allowing it to purge non-Squid caches such as Varnish. Tested against Squid (keep-alive on or off), Varnish, various kinds of network error, benchmarked at ~3k req/s. * Reverted what was left of r59178 * Removed $wgDebugSquid, didn't do anything anyway --- CREDITS | 1 - RELEASE-NOTES | 4 +- includes/AutoLoader.php | 2 + includes/DefaultSettings.php | 7 - includes/SquidPurgeClient.php | 380 ++++++++++++++++++++++++++++++++++ includes/SquidUpdate.php | 118 ++--------- 6 files changed, 401 insertions(+), 111 deletions(-) create mode 100644 includes/SquidPurgeClient.php diff --git a/CREDITS b/CREDITS index 71ef151cf9..e24bab9d4c 100644 --- a/CREDITS +++ b/CREDITS @@ -112,7 +112,6 @@ following names for their contribution to the product. * René Kijewski * Robert Treat * RockMFR -* Roi Avinoam * ST47 * Scott Colcord * Simon Walker diff --git a/RELEASE-NOTES b/RELEASE-NOTES index e5cee933cd..292dbe4e06 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -671,7 +671,9 @@ it from source control: http://www.mediawiki.org/wiki/Download_from_SVN * (bug 19391) Fix caching for Recent ChangesFeed. * (bug 21455) Fixed "Watch this page" checkbox appearing on some special pages even to non-logged in users -* (bug 21551) Make Squid reponse limit configurable +* (bug 21551) Rewrote the Squid purge HTTP client to provide a more robust and + general implementation of HTTP, allowing it to purge non-Squid caches such as + Varnish. * Fixed corruption of long UDP debug log messages by using socket_sendto() instead of fsockopen() with fwrite(). * (bug 16884) Fixed feed links in sidebar not complying with URL parameters diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index c3fe7b1453..7cd9b5856f 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -214,6 +214,8 @@ $wgAutoloadLocalClasses = array( 'SpecialRedirectToSpecial' => 'includes/SpecialPage.php', 'SqlBagOStuff' => 'includes/BagOStuff.php', 'SquidUpdate' => 'includes/SquidUpdate.php', + 'SquidPurgeClient' => 'includes/SquidPurgeClient.php', + 'SquidPurgeClientPool' => 'includes/SquidPurgeClient.php', 'Status' => 'includes/Status.php', 'StubContLang' => 'includes/StubObject.php', 'StubUser' => 'includes/StubObject.php', diff --git a/includes/DefaultSettings.php b/includes/DefaultSettings.php index 3b419c00bf..a7e57d7699 100644 --- a/includes/DefaultSettings.php +++ b/includes/DefaultSettings.php @@ -1834,11 +1834,6 @@ $wgSquidServers = array(); */ $wgSquidServersNoPurge = array(); -/** - * Default character limit for squid purge responses - */ -$wgSquidResponseLimit = 250; - /** Maximum number of titles to purge in any one client operation */ $wgMaxSquidPurgeTitles = 400; @@ -2011,8 +2006,6 @@ $wgUDPProfilerPort = '3811'; $wgDebugProfiling = false; /** Output debug message on every wfProfileIn/wfProfileOut */ $wgDebugFunctionEntry = 0; -/** Lots of debugging output from SquidUpdate.php */ -$wgDebugSquid = false; /* * Destination for wfIncrStats() data... diff --git a/includes/SquidPurgeClient.php b/includes/SquidPurgeClient.php new file mode 100644 index 0000000000..65da5c1acc --- /dev/null +++ b/includes/SquidPurgeClient.php @@ -0,0 +1,380 @@ +host = $parts[0]; + $this->port = isset( $parts[1] ) ? $parts[1] : 80; + } + + /** + * Open a socket if there isn't one open already, return it. + * Returns false on error. + */ + protected function getSocket() { + if ( $this->socket !== null ) { + return $this->socket; + } + + $ip = $this->getIP(); + if ( !$ip ) { + $this->log( "DNS error" ); + $this->markDown(); + return false; + } + $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); + socket_set_nonblock( $this->socket ); + wfSuppressWarnings(); + $ok = socket_connect( $this->socket, $ip, $this->port ); + wfRestoreWarnings(); + if ( !$ok ) { + $error = socket_last_error( $this->socket ); + if ( $error !== self::EINPROGRESS ) { + $this->log( "connection error: " . socket_strerror( $error ) ); + $this->markDown(); + return false; + } + } + + return $this->socket; + } + + /** + * Get read socket array for select() + */ + public function getReadSocketsForSelect() { + if ( $this->readState == 'idle' ) { + return array(); + } + $socket = $this->getSocket(); + if ( $socket === false ) { + return array(); + } + return array( $socket ); + } + + /** + * Get write socket array for select() + */ + public function getWriteSocketsForSelect() { + if ( !strlen( $this->writeBuffer ) ) { + return array(); + } + $socket = $this->getSocket(); + if ( $socket === false ) { + return array(); + } + return array( $socket ); + } + + /** + * Get the host's IP address. + * Does not support IPv6 at present due to the lack of a convenient interface in PHP. + */ + protected function getIP() { + if ( $this->ip === null ) { + if ( IP::isIPv4( $this->host ) ) { + $this->ip = $this->host; + } elseif ( IP::isIPv6( $this->host ) ) { + throw new MWException( '$wgSquidServers does not support IPv6' ); + } else { + wfSuppressWarnings(); + $this->ip = gethostbyname( $this->host ); + if ( $this->ip === $this->host ) { + $this->ip = false; + } + wfRestoreWarnings(); + } + } + return $this->ip; + } + + /** + * Close the socket and ignore any future purge requests. + * This is called if there is a protocol error. + */ + protected function markDown() { + $this->close(); + $this->socket = false; + } + + /** + * Close the socket but allow it to be reopened for future purge requests + */ + public function close() { + if ( $this->socket ) { + wfSuppressWarnings(); + socket_set_block( $this->socket ); + socket_shutdown( $this->socket ); + socket_close( $this->socket ); + wfRestoreWarnings(); + } + $this->socket = null; + $this->readBuffer = ''; + // Write buffer is kept since it may contain a request for the next socket + } + + /** + * Queue a purge operation + */ + public function queuePurge( $url ) { + $url = str_replace( "\n", '', $url ); + $this->requests[] = "PURGE $url HTTP/1.0\r\n" . + "Connection: Keep-Alive\r\n" . + "Proxy-Connection: Keep-Alive\r\n" . + "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n"; + if ( $this->currentRequestIndex === null ) { + $this->nextRequest(); + } + } + + public function isIdle() { + return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle'; + } + + /** + * Perform pending writes. Call this when socket_select() indicates that writing will not block. + */ + public function doWrites() { + if ( !strlen( $this->writeBuffer ) ) { + return; + } + $socket = $this->getSocket(); + if ( !$socket ) { + return; + } + + if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) { + $buf = $this->writeBuffer; + $flags = MSG_EOR; + } else { + $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE ); + $flags = 0; + } + wfSuppressWarnings(); + $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags ); + wfRestoreWarnings(); + + if ( $bytesSent === false ) { + $error = socket_last_error( $socket ); + if ( $error != self::EAGAIN && $error != self::EINTR ) { + $this->log( 'write error: ' . socket_strerror( $error ) ); + $this->markDown(); + } + return; + } + + $this->writeBuffer = substr( $this->writeBuffer, $bytesSent ); + } + + /** + * Read some data. Call this when socket_select() indicates that the read buffer is non-empty. + */ + public function doReads() { + $socket = $this->getSocket(); + if ( !$socket ) { + return; + } + + $buf = ''; + wfSuppressWarnings(); + $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 ); + wfRestoreWarnings(); + if ( $bytesRead === false ) { + $error = socket_last_error( $socket ); + if ( $error != self::EAGAIN && $error != self::EINTR ) { + $this->log( 'read error: ' . socket_strerror( $error ) ); + $this->markDown(); + return; + } + } elseif ( $bytesRead === 0 ) { + // Assume EOF + $this->close(); + return; + } + + $this->readBuffer .= $buf; + while ( $this->socket && $this->processReadBuffer() === 'continue' ); + } + + protected function processReadBuffer() { + switch ( $this->readState ) { + case 'idle': + return 'done'; + case 'status': + case 'header': + $lines = explode( "\r\n", $this->readBuffer, 2 ); + if ( count( $lines ) < 2 ) { + return 'done'; + } + if ( $this->readState == 'status' ) { + $this->processStatusLine( $lines[0] ); + } else { // header + $this->processHeaderLine( $lines[0] ); + } + $this->readBuffer = $lines[1]; + return 'continue'; + case 'body': + if ( $this->bodyRemaining !== null ) { + if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) { + $this->bodyRemaining -= strlen( $this->readBuffer ); + $this->readBuffer = ''; + return 'done'; + } else { + $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining ); + $this->bodyRemaining = 0; + $this->nextRequest(); + return 'continue'; + } + } else { + // No content length, read all data to EOF + $this->readBuffer = ''; + return 'done'; + } + default: + throw new MWException( __METHOD__.': unexpected state' ); + } + } + + protected function processStatusLine( $line ) { + if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) { + $this->log( 'invalid status line' ); + $this->markDown(); + return; + } + list( $all, $major, $minor, $status, $reason ) = $m; + $status = intval( $status ); + if ( $status !== 200 && $status !== 404 ) { + $this->log( "unexpected status code: $status $reason" ); + $this->markDown(); + return; + } + $this->readState = 'header'; + } + + protected function processHeaderLine( $line ) { + if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) { + $this->bodyRemaining = intval( $m[1] ); + } elseif ( $line === '' ) { + $this->readState = 'body'; + } + } + + protected function nextRequest() { + if ( $this->currentRequestIndex !== null ) { + unset( $this->requests[$this->currentRequestIndex] ); + } + if ( count( $this->requests ) ) { + $this->readState = 'status'; + $this->currentRequestIndex = key( $this->requests ); + $this->writeBuffer = $this->requests[$this->currentRequestIndex]; + } else { + $this->readState = 'idle'; + $this->currentRequestIndex = null; + $this->writeBuffer = ''; + } + $this->bodyRemaining = null; + } + + protected function log( $msg ) { + wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" ); + } +} + +class SquidPurgeClientPool { + var $clients = array(); + var $timeout = 5; + + function __construct( $options = array() ) { + if ( isset( $options['timeout'] ) ) { + $this->timeout = $options['timeout']; + } + } + + public function addClient( $client ) { + $this->clients[] = $client; + } + + public function run() { + $done = false; + $startTime = microtime( true ); + while ( !$done ) { + $readSockets = $writeSockets = array(); + foreach ( $this->clients as $clientIndex => $client ) { + $sockets = $client->getReadSocketsForSelect(); + foreach ( $sockets as $i => $socket ) { + $readSockets["$clientIndex/$i"] = $socket; + } + $sockets = $client->getWriteSocketsForSelect(); + foreach ( $sockets as $i => $socket ) { + $writeSockets["$clientIndex/$i"] = $socket; + } + } + if ( !count( $readSockets ) && !count( $writeSockets ) ) { + break; + } + $exceptSockets = null; + $timeout = min( $startTime + $this->timeout - microtime( true ), 1 ); + wfSuppressWarnings(); + $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout ); + wfRestoreWarnings(); + if ( $numReady === false ) { + wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' . + socket_strerror( socket_last_error() ) . "\n" ); + break; + } + // Check for timeout, use 1% tolerance since we aimed at having socket_select() + // exit at precisely the overall timeout + if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) { + wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" ); + break; + } elseif ( !$numReady ) { + continue; + } + + foreach ( $readSockets as $key => $socket ) { + list( $clientIndex, $i ) = explode( '/', $key ); + $client = $this->clients[$clientIndex]; + $client->doReads(); + } + foreach ( $writeSockets as $key => $socket ) { + list( $clientIndex, $i ) = explode( '/', $key ); + $client = $this->clients[$clientIndex]; + $client->doWrites(); + } + + $done = true; + foreach ( $this->clients as $client ) { + if ( !$client->isIdle() ) { + $done = false; + } + } + } + foreach ( $this->clients as $client ) { + $client->close(); + } + } +} diff --git a/includes/SquidUpdate.php b/includes/SquidUpdate.php index 1323ffad39..66517719a7 100644 --- a/includes/SquidUpdate.php +++ b/includes/SquidUpdate.php @@ -81,14 +81,14 @@ class SquidUpdate { XXX report broken Squids per mail or log */ static function purge( $urlArr ) { - global $wgSquidServers, $wgHTCPMulticastAddress, $wgHTCPPort, $wgSquidResponseLimit; + global $wgSquidServers, $wgHTCPMulticastAddress, $wgHTCPPort; /*if ( (@$wgSquidServers[0]) == 'echo' ) { echo implode("
\n", $urlArr) . "
\n"; return; }*/ - if( empty( $urlArr ) ) { + if( !$urlArr ) { return; } @@ -98,105 +98,26 @@ class SquidUpdate { wfProfileIn( __METHOD__ ); - $maxsocketspersquid = 8; // socket cap per Squid - $urlspersocket = 400; // 400 seems to be a good tradeoff, opening a socket takes a while - $firsturl = SquidUpdate::expand( $urlArr[0] ); - unset($urlArr[0]); - $urlArr = array_values($urlArr); - $sockspersq = max(ceil(count($urlArr) / $urlspersocket ),1); - if ($sockspersq == 1) { - /* the most common case */ - $urlspersocket = count($urlArr); - } else if ($sockspersq > $maxsocketspersquid ) { - $urlspersocket = ceil(count($urlArr) / $maxsocketspersquid); - $sockspersq = $maxsocketspersquid; + $maxSocketsPerSquid = 8; // socket cap per Squid + $urlsPerSocket = 400; // 400 seems to be a good tradeoff, opening a socket takes a while + $socketsPerSquid = ceil( count( $urlArr ) / $urlsPerSocket ); + if ( $socketsPerSquid > $maxSocketsPerSquid ) { + $socketsPerSquid = $maxSocketsPerSquid; } - $totalsockets = count($wgSquidServers) * $sockspersq; - $sockets = Array(); - /* this sets up the sockets and tests the first socket for each server. */ - for ($ss=0;$ss < count($wgSquidServers);$ss++) { - $failed = false; - $so = 0; - while ($so < $sockspersq && !$failed) { - if ($so == 0) { - /* first socket for this server, do the tests */ - @list($server, $port) = explode(':', $wgSquidServers[$ss]); - if(!isset($port)) $port = 80; - #$this->debug("Opening socket to $server:$port"); - $error = $errstr = false; - $socket = @fsockopen($server, $port, $error, $errstr, 3); - #$this->debug("\n"); - if (!$socket) { - $failed = true; - $totalsockets -= $sockspersq; - } else { - $msg = 'PURGE ' . $firsturl . " HTTP/1.0\r\n". - "Connection: Keep-Alive\r\n\r\n"; - #$this->debug($msg); - @fputs($socket,$msg); - #$this->debug("..."); - $res = @fread($socket,512); - #$this->debug("\n"); - /* Squid only returns http headers with 200 or 404 status, - if there's more returned something's wrong */ - if (strlen($res) > $wgSquidResponseLimit) { - fclose($socket); - $failed = true; - $totalsockets -= $sockspersq; - } else { - @stream_set_blocking($socket,false); - $sockets[] = $socket; - } - } - } else { - /* open the remaining sockets for this server */ - list($server, $port) = explode(':', $wgSquidServers[$ss]); - if(!isset($port)) $port = 80; - $socket = @fsockopen($server, $port, $error, $errstr, 2); - @stream_set_blocking($socket,false); - $sockets[] = $socket; + $pool = new SquidPurgeClientPool; + $chunks = array_chunk( $urlArr, ceil( count( $urlArr ) / $socketsPerSquid ) ); + foreach ( $wgSquidServers as $server ) { + foreach ( $chunks as $chunk ) { + $client = new SquidPurgeClient( $server ); + foreach ( $chunk as $url ) { + $client->queuePurge( $url ); } - $so++; + $pool->addClient( $client ); } } + $pool->run(); - if ($urlspersocket > 0) { - /* now do the heavy lifting. The fread() relies on Squid returning only the headers */ - for ($r=0;$r < $urlspersocket;$r++) { - for ($s=0;$s < $totalsockets;$s++) { - if($r != 0) { - $res = ''; - $esc = 0; - while (strlen($res) < 100 && $esc < 200 ) { - $res .= @fread($sockets[$s],512); - $esc++; - usleep(20); - } - } - $urindex = $r + $urlspersocket * ($s - $sockspersq * floor($s / $sockspersq)); - $url = SquidUpdate::expand( $urlArr[$urindex] ); - $msg = 'PURGE ' . $url . " HTTP/1.0\r\n". - "Connection: Keep-Alive\r\n\r\n"; - #$this->debug($msg); - @fputs($sockets[$s],$msg); - #$this->debug("\n"); - } - } - } - #$this->debug("Reading response..."); - foreach ($sockets as $socket) { - $res = ''; - $esc = 0; - while (strlen($res) < 100 && $esc < 200 ) { - $res .= @fread($socket,1024); - $esc++; - usleep(20); - } - - @fclose($socket); - } - #$this->debug("\n"); wfProfileOut( __METHOD__ ); } @@ -259,13 +180,6 @@ class SquidUpdate { wfProfileOut( __METHOD__ ); } - function debug( $text ) { - global $wgDebugSquid; - if ( $wgDebugSquid ) { - wfDebug( $text ); - } - } - /** * Expand local URLs to fully-qualified URLs using the internal protocol * and host defined in $wgInternalServer. Input that's already fully- -- 2.20.1