From: Aaron Schulz Date: Thu, 8 Oct 2015 07:36:22 +0000 (-0700) Subject: Move SquidPurgeClient under /clientpool X-Git-Tag: 1.31.0-rc.0~9478^2 X-Git-Url: http://git.cyclocoop.org/%22.%24h.%22?a=commitdiff_plain;h=a8ec2f8daf4ed06fbea09b215b5f0b27e71a113a;p=lhc%2Fweb%2Fwiklou.git Move SquidPurgeClient under /clientpool Each class has its own file now too. Change-Id: I11593d6efbfce8e3981895e84edb4e0dea3998e4 --- diff --git a/autoload.php b/autoload.php index a2f432f6fd..269b1427d6 100644 --- a/autoload.php +++ b/autoload.php @@ -1212,8 +1212,8 @@ $wgAutoloadLocalClasses = array( 'SqliteInstaller' => __DIR__ . '/includes/installer/SqliteInstaller.php', 'SqliteMaintenance' => __DIR__ . '/maintenance/sqlite.php', 'SqliteUpdater' => __DIR__ . '/includes/installer/SqliteUpdater.php', - 'SquidPurgeClient' => __DIR__ . '/includes/SquidPurgeClient.php', - 'SquidPurgeClientPool' => __DIR__ . '/includes/SquidPurgeClient.php', + 'SquidPurgeClient' => __DIR__ . '/includes/clientpool/SquidPurgeClient.php', + 'SquidPurgeClientPool' => __DIR__ . '/includes/clientpool/SquidPurgeClientPool.php', 'SquidUpdate' => __DIR__ . '/includes/deferred/SquidUpdate.php', 'SrConverter' => __DIR__ . '/languages/classes/LanguageSr.php', 'StatsOutput' => __DIR__ . '/maintenance/language/StatOutputs.php', diff --git a/includes/SquidPurgeClient.php b/includes/SquidPurgeClient.php deleted file mode 100644 index ca8f11aef1..0000000000 --- a/includes/SquidPurgeClient.php +++ /dev/null @@ -1,484 +0,0 @@ -host = $parts[0]; - $this->port = isset( $parts[1] ) ? $parts[1] : 80; - } - - /** - * Open a socket if there isn't one open already, return it. - * Returns false on error. - * - * @return bool|resource - */ - protected function getSocket() { - if ( $this->socket !== null ) { - return $this->socket; - } - - $ip = $this->getIP(); - if ( !$ip ) { - $this->log( "DNS error" ); - $this->markDown(); - return false; - } - $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); - socket_set_nonblock( $this->socket ); - MediaWiki\suppressWarnings(); - $ok = socket_connect( $this->socket, $ip, $this->port ); - MediaWiki\restoreWarnings(); - if ( !$ok ) { - $error = socket_last_error( $this->socket ); - if ( $error !== self::EINPROGRESS ) { - $this->log( "connection error: " . socket_strerror( $error ) ); - $this->markDown(); - return false; - } - } - - return $this->socket; - } - - /** - * Get read socket array for select() - * @return array - */ - public function getReadSocketsForSelect() { - if ( $this->readState == 'idle' ) { - return array(); - } - $socket = $this->getSocket(); - if ( $socket === false ) { - return array(); - } - return array( $socket ); - } - - /** - * Get write socket array for select() - * @return array - */ - public function getWriteSocketsForSelect() { - if ( !strlen( $this->writeBuffer ) ) { - return array(); - } - $socket = $this->getSocket(); - if ( $socket === false ) { - return array(); - } - return array( $socket ); - } - - /** - * Get the host's IP address. - * Does not support IPv6 at present due to the lack of a convenient interface in PHP. - * @throws MWException - * @return string - */ - protected function getIP() { - if ( $this->ip === null ) { - if ( IP::isIPv4( $this->host ) ) { - $this->ip = $this->host; - } elseif ( IP::isIPv6( $this->host ) ) { - throw new MWException( '$wgSquidServers does not support IPv6' ); - } else { - MediaWiki\suppressWarnings(); - $this->ip = gethostbyname( $this->host ); - if ( $this->ip === $this->host ) { - $this->ip = false; - } - MediaWiki\restoreWarnings(); - } - } - return $this->ip; - } - - /** - * Close the socket and ignore any future purge requests. - * This is called if there is a protocol error. - */ - protected function markDown() { - $this->close(); - $this->socket = false; - } - - /** - * Close the socket but allow it to be reopened for future purge requests - */ - public function close() { - if ( $this->socket ) { - MediaWiki\suppressWarnings(); - socket_set_block( $this->socket ); - socket_shutdown( $this->socket ); - socket_close( $this->socket ); - MediaWiki\restoreWarnings(); - } - $this->socket = null; - $this->readBuffer = ''; - // Write buffer is kept since it may contain a request for the next socket - } - - /** - * Queue a purge operation - * - * @param string $url - */ - public function queuePurge( $url ) { - global $wgSquidPurgeUseHostHeader; - $url = SquidUpdate::expand( str_replace( "\n", '', $url ) ); - $request = array(); - if ( $wgSquidPurgeUseHostHeader ) { - $url = wfParseUrl( $url ); - $host = $url['host']; - if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) { - $host .= ":" . $url['port']; - } - $path = $url['path']; - if ( isset( $url['query'] ) && is_string( $url['query'] ) ) { - $path = wfAppendQuery( $path, $url['query'] ); - } - $request[] = "PURGE $path HTTP/1.1"; - $request[] = "Host: $host"; - } else { - $request[] = "PURGE $url HTTP/1.0"; - } - $request[] = "Connection: Keep-Alive"; - $request[] = "Proxy-Connection: Keep-Alive"; - $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__; - // Two ''s to create \r\n\r\n - $request[] = ''; - $request[] = ''; - - $this->requests[] = implode( "\r\n", $request ); - if ( $this->currentRequestIndex === null ) { - $this->nextRequest(); - } - } - - /** - * @return bool - */ - public function isIdle() { - return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle'; - } - - /** - * Perform pending writes. Call this when socket_select() indicates that writing will not block. - */ - public function doWrites() { - if ( !strlen( $this->writeBuffer ) ) { - return; - } - $socket = $this->getSocket(); - if ( !$socket ) { - return; - } - - if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) { - $buf = $this->writeBuffer; - $flags = MSG_EOR; - } else { - $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE ); - $flags = 0; - } - MediaWiki\suppressWarnings(); - $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags ); - MediaWiki\restoreWarnings(); - - if ( $bytesSent === false ) { - $error = socket_last_error( $socket ); - if ( $error != self::EAGAIN && $error != self::EINTR ) { - $this->log( 'write error: ' . socket_strerror( $error ) ); - $this->markDown(); - } - return; - } - - $this->writeBuffer = substr( $this->writeBuffer, $bytesSent ); - } - - /** - * Read some data. Call this when socket_select() indicates that the read buffer is non-empty. - */ - public function doReads() { - $socket = $this->getSocket(); - if ( !$socket ) { - return; - } - - $buf = ''; - MediaWiki\suppressWarnings(); - $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 ); - MediaWiki\restoreWarnings(); - if ( $bytesRead === false ) { - $error = socket_last_error( $socket ); - if ( $error != self::EAGAIN && $error != self::EINTR ) { - $this->log( 'read error: ' . socket_strerror( $error ) ); - $this->markDown(); - return; - } - } elseif ( $bytesRead === 0 ) { - // Assume EOF - $this->close(); - return; - } - - $this->readBuffer .= $buf; - while ( $this->socket && $this->processReadBuffer() === 'continue' ); - } - - /** - * @throws MWException - * @return string - */ - protected function processReadBuffer() { - switch ( $this->readState ) { - case 'idle': - return 'done'; - case 'status': - case 'header': - $lines = explode( "\r\n", $this->readBuffer, 2 ); - if ( count( $lines ) < 2 ) { - return 'done'; - } - if ( $this->readState == 'status' ) { - $this->processStatusLine( $lines[0] ); - } else { // header - $this->processHeaderLine( $lines[0] ); - } - $this->readBuffer = $lines[1]; - return 'continue'; - case 'body': - if ( $this->bodyRemaining !== null ) { - if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) { - $this->bodyRemaining -= strlen( $this->readBuffer ); - $this->readBuffer = ''; - return 'done'; - } else { - $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining ); - $this->bodyRemaining = 0; - $this->nextRequest(); - return 'continue'; - } - } else { - // No content length, read all data to EOF - $this->readBuffer = ''; - return 'done'; - } - default: - throw new MWException( __METHOD__ . ': unexpected state' ); - } - } - - /** - * @param string $line - */ - protected function processStatusLine( $line ) { - if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) { - $this->log( 'invalid status line' ); - $this->markDown(); - return; - } - list( , , , $status, $reason ) = $m; - $status = intval( $status ); - if ( $status !== 200 && $status !== 404 ) { - $this->log( "unexpected status code: $status $reason" ); - $this->markDown(); - return; - } - $this->readState = 'header'; - } - - /** - * @param string $line - */ - protected function processHeaderLine( $line ) { - if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) { - $this->bodyRemaining = intval( $m[1] ); - } elseif ( $line === '' ) { - $this->readState = 'body'; - } - } - - protected function nextRequest() { - if ( $this->currentRequestIndex !== null ) { - unset( $this->requests[$this->currentRequestIndex] ); - } - if ( count( $this->requests ) ) { - $this->readState = 'status'; - $this->currentRequestIndex = key( $this->requests ); - $this->writeBuffer = $this->requests[$this->currentRequestIndex]; - } else { - $this->readState = 'idle'; - $this->currentRequestIndex = null; - $this->writeBuffer = ''; - } - $this->bodyRemaining = null; - } - - /** - * @param string $msg - */ - protected function log( $msg ) { - wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg" ); - } -} - -class SquidPurgeClientPool { - /** @var array Array of SquidPurgeClient */ - protected $clients = array(); - - /** @var int */ - protected $timeout = 5; - - /** - * @param array $options - */ - function __construct( $options = array() ) { - if ( isset( $options['timeout'] ) ) { - $this->timeout = $options['timeout']; - } - } - - /** - * @param SquidPurgeClient $client - * @return void - */ - public function addClient( $client ) { - $this->clients[] = $client; - } - - public function run() { - $done = false; - $startTime = microtime( true ); - while ( !$done ) { - $readSockets = $writeSockets = array(); - /** - * @var $client SquidPurgeClient - */ - foreach ( $this->clients as $clientIndex => $client ) { - $sockets = $client->getReadSocketsForSelect(); - foreach ( $sockets as $i => $socket ) { - $readSockets["$clientIndex/$i"] = $socket; - } - $sockets = $client->getWriteSocketsForSelect(); - foreach ( $sockets as $i => $socket ) { - $writeSockets["$clientIndex/$i"] = $socket; - } - } - if ( !count( $readSockets ) && !count( $writeSockets ) ) { - break; - } - $exceptSockets = null; - $timeout = min( $startTime + $this->timeout - microtime( true ), 1 ); - MediaWiki\suppressWarnings(); - $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout ); - MediaWiki\restoreWarnings(); - if ( $numReady === false ) { - wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' . - socket_strerror( socket_last_error() ) . "\n" ); - break; - } - // Check for timeout, use 1% tolerance since we aimed at having socket_select() - // exit at precisely the overall timeout - if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) { - wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" ); - break; - } elseif ( !$numReady ) { - continue; - } - - foreach ( $readSockets as $key => $socket ) { - list( $clientIndex, ) = explode( '/', $key ); - $client = $this->clients[$clientIndex]; - $client->doReads(); - } - foreach ( $writeSockets as $key => $socket ) { - list( $clientIndex, ) = explode( '/', $key ); - $client = $this->clients[$clientIndex]; - $client->doWrites(); - } - - $done = true; - foreach ( $this->clients as $client ) { - if ( !$client->isIdle() ) { - $done = false; - } - } - } - foreach ( $this->clients as $client ) { - $client->close(); - } - } -} diff --git a/includes/clientpool/SquidPurgeClient.php b/includes/clientpool/SquidPurgeClient.php new file mode 100644 index 0000000000..91100e9270 --- /dev/null +++ b/includes/clientpool/SquidPurgeClient.php @@ -0,0 +1,396 @@ +host = $parts[0]; + $this->port = isset( $parts[1] ) ? $parts[1] : 80; + } + + /** + * Open a socket if there isn't one open already, return it. + * Returns false on error. + * + * @return bool|resource + */ + protected function getSocket() { + if ( $this->socket !== null ) { + return $this->socket; + } + + $ip = $this->getIP(); + if ( !$ip ) { + $this->log( "DNS error" ); + $this->markDown(); + return false; + } + $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); + socket_set_nonblock( $this->socket ); + MediaWiki\suppressWarnings(); + $ok = socket_connect( $this->socket, $ip, $this->port ); + MediaWiki\restoreWarnings(); + if ( !$ok ) { + $error = socket_last_error( $this->socket ); + if ( $error !== self::EINPROGRESS ) { + $this->log( "connection error: " . socket_strerror( $error ) ); + $this->markDown(); + return false; + } + } + + return $this->socket; + } + + /** + * Get read socket array for select() + * @return array + */ + public function getReadSocketsForSelect() { + if ( $this->readState == 'idle' ) { + return array(); + } + $socket = $this->getSocket(); + if ( $socket === false ) { + return array(); + } + return array( $socket ); + } + + /** + * Get write socket array for select() + * @return array + */ + public function getWriteSocketsForSelect() { + if ( !strlen( $this->writeBuffer ) ) { + return array(); + } + $socket = $this->getSocket(); + if ( $socket === false ) { + return array(); + } + return array( $socket ); + } + + /** + * Get the host's IP address. + * Does not support IPv6 at present due to the lack of a convenient interface in PHP. + * @throws MWException + * @return string + */ + protected function getIP() { + if ( $this->ip === null ) { + if ( IP::isIPv4( $this->host ) ) { + $this->ip = $this->host; + } elseif ( IP::isIPv6( $this->host ) ) { + throw new MWException( '$wgSquidServers does not support IPv6' ); + } else { + MediaWiki\suppressWarnings(); + $this->ip = gethostbyname( $this->host ); + if ( $this->ip === $this->host ) { + $this->ip = false; + } + MediaWiki\restoreWarnings(); + } + } + return $this->ip; + } + + /** + * Close the socket and ignore any future purge requests. + * This is called if there is a protocol error. + */ + protected function markDown() { + $this->close(); + $this->socket = false; + } + + /** + * Close the socket but allow it to be reopened for future purge requests + */ + public function close() { + if ( $this->socket ) { + MediaWiki\suppressWarnings(); + socket_set_block( $this->socket ); + socket_shutdown( $this->socket ); + socket_close( $this->socket ); + MediaWiki\restoreWarnings(); + } + $this->socket = null; + $this->readBuffer = ''; + // Write buffer is kept since it may contain a request for the next socket + } + + /** + * Queue a purge operation + * + * @param string $url + */ + public function queuePurge( $url ) { + global $wgSquidPurgeUseHostHeader; + $url = SquidUpdate::expand( str_replace( "\n", '', $url ) ); + $request = array(); + if ( $wgSquidPurgeUseHostHeader ) { + $url = wfParseUrl( $url ); + $host = $url['host']; + if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) { + $host .= ":" . $url['port']; + } + $path = $url['path']; + if ( isset( $url['query'] ) && is_string( $url['query'] ) ) { + $path = wfAppendQuery( $path, $url['query'] ); + } + $request[] = "PURGE $path HTTP/1.1"; + $request[] = "Host: $host"; + } else { + $request[] = "PURGE $url HTTP/1.0"; + } + $request[] = "Connection: Keep-Alive"; + $request[] = "Proxy-Connection: Keep-Alive"; + $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__; + // Two ''s to create \r\n\r\n + $request[] = ''; + $request[] = ''; + + $this->requests[] = implode( "\r\n", $request ); + if ( $this->currentRequestIndex === null ) { + $this->nextRequest(); + } + } + + /** + * @return bool + */ + public function isIdle() { + return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle'; + } + + /** + * Perform pending writes. Call this when socket_select() indicates that writing will not block. + */ + public function doWrites() { + if ( !strlen( $this->writeBuffer ) ) { + return; + } + $socket = $this->getSocket(); + if ( !$socket ) { + return; + } + + if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) { + $buf = $this->writeBuffer; + $flags = MSG_EOR; + } else { + $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE ); + $flags = 0; + } + MediaWiki\suppressWarnings(); + $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags ); + MediaWiki\restoreWarnings(); + + if ( $bytesSent === false ) { + $error = socket_last_error( $socket ); + if ( $error != self::EAGAIN && $error != self::EINTR ) { + $this->log( 'write error: ' . socket_strerror( $error ) ); + $this->markDown(); + } + return; + } + + $this->writeBuffer = substr( $this->writeBuffer, $bytesSent ); + } + + /** + * Read some data. Call this when socket_select() indicates that the read buffer is non-empty. + */ + public function doReads() { + $socket = $this->getSocket(); + if ( !$socket ) { + return; + } + + $buf = ''; + MediaWiki\suppressWarnings(); + $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 ); + MediaWiki\restoreWarnings(); + if ( $bytesRead === false ) { + $error = socket_last_error( $socket ); + if ( $error != self::EAGAIN && $error != self::EINTR ) { + $this->log( 'read error: ' . socket_strerror( $error ) ); + $this->markDown(); + return; + } + } elseif ( $bytesRead === 0 ) { + // Assume EOF + $this->close(); + return; + } + + $this->readBuffer .= $buf; + while ( $this->socket && $this->processReadBuffer() === 'continue' ); + } + + /** + * @throws MWException + * @return string + */ + protected function processReadBuffer() { + switch ( $this->readState ) { + case 'idle': + return 'done'; + case 'status': + case 'header': + $lines = explode( "\r\n", $this->readBuffer, 2 ); + if ( count( $lines ) < 2 ) { + return 'done'; + } + if ( $this->readState == 'status' ) { + $this->processStatusLine( $lines[0] ); + } else { // header + $this->processHeaderLine( $lines[0] ); + } + $this->readBuffer = $lines[1]; + return 'continue'; + case 'body': + if ( $this->bodyRemaining !== null ) { + if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) { + $this->bodyRemaining -= strlen( $this->readBuffer ); + $this->readBuffer = ''; + return 'done'; + } else { + $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining ); + $this->bodyRemaining = 0; + $this->nextRequest(); + return 'continue'; + } + } else { + // No content length, read all data to EOF + $this->readBuffer = ''; + return 'done'; + } + default: + throw new MWException( __METHOD__ . ': unexpected state' ); + } + } + + /** + * @param string $line + */ + protected function processStatusLine( $line ) { + if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) { + $this->log( 'invalid status line' ); + $this->markDown(); + return; + } + list( , , , $status, $reason ) = $m; + $status = intval( $status ); + if ( $status !== 200 && $status !== 404 ) { + $this->log( "unexpected status code: $status $reason" ); + $this->markDown(); + return; + } + $this->readState = 'header'; + } + + /** + * @param string $line + */ + protected function processHeaderLine( $line ) { + if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) { + $this->bodyRemaining = intval( $m[1] ); + } elseif ( $line === '' ) { + $this->readState = 'body'; + } + } + + protected function nextRequest() { + if ( $this->currentRequestIndex !== null ) { + unset( $this->requests[$this->currentRequestIndex] ); + } + if ( count( $this->requests ) ) { + $this->readState = 'status'; + $this->currentRequestIndex = key( $this->requests ); + $this->writeBuffer = $this->requests[$this->currentRequestIndex]; + } else { + $this->readState = 'idle'; + $this->currentRequestIndex = null; + $this->writeBuffer = ''; + } + $this->bodyRemaining = null; + } + + /** + * @param string $msg + */ + protected function log( $msg ) { + wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg" ); + } +} diff --git a/includes/clientpool/SquidPurgeClientPool.php b/includes/clientpool/SquidPurgeClientPool.php new file mode 100644 index 0000000000..feb80df9f8 --- /dev/null +++ b/includes/clientpool/SquidPurgeClientPool.php @@ -0,0 +1,108 @@ +timeout = $options['timeout']; + } + } + + /** + * @param SquidPurgeClient $client + * @return void + */ + public function addClient( $client ) { + $this->clients[] = $client; + } + + public function run() { + $done = false; + $startTime = microtime( true ); + while ( !$done ) { + $readSockets = $writeSockets = array(); + /** + * @var $client SquidPurgeClient + */ + foreach ( $this->clients as $clientIndex => $client ) { + $sockets = $client->getReadSocketsForSelect(); + foreach ( $sockets as $i => $socket ) { + $readSockets["$clientIndex/$i"] = $socket; + } + $sockets = $client->getWriteSocketsForSelect(); + foreach ( $sockets as $i => $socket ) { + $writeSockets["$clientIndex/$i"] = $socket; + } + } + if ( !count( $readSockets ) && !count( $writeSockets ) ) { + break; + } + $exceptSockets = null; + $timeout = min( $startTime + $this->timeout - microtime( true ), 1 ); + MediaWiki\suppressWarnings(); + $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout ); + MediaWiki\restoreWarnings(); + if ( $numReady === false ) { + wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' . + socket_strerror( socket_last_error() ) . "\n" ); + break; + } + // Check for timeout, use 1% tolerance since we aimed at having socket_select() + // exit at precisely the overall timeout + if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) { + wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" ); + break; + } elseif ( !$numReady ) { + continue; + } + + foreach ( $readSockets as $key => $socket ) { + list( $clientIndex, ) = explode( '/', $key ); + $client = $this->clients[$clientIndex]; + $client->doReads(); + } + foreach ( $writeSockets as $key => $socket ) { + list( $clientIndex, ) = explode( '/', $key ); + $client = $this->clients[$clientIndex]; + $client->doWrites(); + } + + $done = true; + foreach ( $this->clients as $client ) { + if ( !$client->isIdle() ) { + $done = false; + } + } + } + foreach ( $this->clients as $client ) { + $client->close(); + } + } +}