From: Aaron Schulz Date: Sat, 28 Jan 2012 20:54:19 +0000 (+0000) Subject: In LockServerDaemon: X-Git-Tag: 1.31.0-rc.0~25035 X-Git-Url: http://git.cyclocoop.org/%7B%24www_url%7Dadmin/compta/exercices/?a=commitdiff_plain;h=da8a394d3e25951238bac72c1d4f33a4623864ac;p=lhc%2Fweb%2Fwiklou.git In LockServerDaemon: * r109802: fixed references to bogus vars that are now in LockHolder. * Tweaked doCommand() so that if a client re-connects, it's removed from the deadSessions member. * Made all socket operations non-blocking. Uses new SocketArray helper class. * Bumped default 'maxLocks' parameter to 10000. --- diff --git a/maintenance/locking/LockServerDaemon.php b/maintenance/locking/LockServerDaemon.php index cba0454659..629daf533a 100644 --- a/maintenance/locking/LockServerDaemon.php +++ b/maintenance/locking/LockServerDaemon.php @@ -1,5 +1,7 @@ main(); /** - * Simple lock server daemon that accepts lock/unlock requests. - * This should not require MediaWiki setup or PHP files. + * Simple lock server daemon that accepts lock/unlock requests */ class LockServerDaemon { /** @var resource */ @@ -29,10 +30,9 @@ class LockServerDaemon { /** @var LockHolder */ protected $lockHolder; - protected $address; // string (IP/hostname) + protected $address; // string IP address protected $port; // integer protected $authKey; // string key - protected $connTimeout; // array ( 'sec' => integer, 'usec' => integer ) protected $lockTimeout; // integer number of seconds protected $maxBacklog; // integer protected $maxClients; // integer @@ -40,6 +40,7 @@ class LockServerDaemon { protected $startTime; // integer UNIX timestamp protected $ticks = 0; // integer counter + /* @var LockServerDaemon */ protected static $instance = null; /** @@ -54,7 +55,7 @@ class LockServerDaemon { if ( !isset( $config[$par] ) ) { die( "Usage: php LockServerDaemon.php " . "--address
--port --authkey " . - "[--connTimeout ] [--lockTimeout ] " . + "[--lockTimeout ] " . "[--maxLocks ] [--maxClients ] [--maxBacklog ]" ); } @@ -72,13 +73,6 @@ class LockServerDaemon { $this->port = $config['port']; $this->authKey = $config['authKey']; // Parameters with defaults... - $connTimeout = isset( $config['connTimeout'] ) - ? $config['connTimeout'] - : 1.5; - $this->connTimeout = array( - 'sec' => floor( $connTimeout ), - 'usec' => floor( ( $connTimeout - floor( $connTimeout ) ) * 1e6 ) - ); $this->lockTimeout = isset( $config['lockTimeout'] ) ? (int)$config['lockTimeout'] : 60; @@ -90,7 +84,7 @@ class LockServerDaemon { : 100; $maxLocks = isset( $config['maxLocks'] ) ? (int)$config['maxLocks'] - : 5000; + : 10000; $this->lockHolder = new LockHolder( $maxLocks ); } @@ -98,7 +92,7 @@ class LockServerDaemon { /** * @return void */ - protected function setupSocket() { + protected function setupServerSocket() { if ( !function_exists( 'socket_create' ) ) { throw new Exception( "PHP sockets extension missing from PHP CLI mode." ); } @@ -116,75 +110,69 @@ class LockServerDaemon { socket_strerror( socket_last_error( $sock ) ) ); } $this->sock = $sock; - $this->startTime = time(); } /** - * @return void + * Entry-point function that listens to the server socket, accepts + * new clients, and recieves/responds to requests to lock resources. */ public function main() { - // Setup socket and start listing - $this->setupSocket(); - // Create a list of all the clients that will be connected to us. - $clients = array( $this->sock ); // start off with listening socket + $this->setupServerSocket(); // setup listening socket + $socketArray = new SocketArray(); // sockets being serviced + $socketArray->addSocket( $this->sock ); // add listening socket do { - // Create a copy, so $clients doesn't get modified by socket_select() - $read = $clients; // clients-with-data (plus listening socket) - // Get a list of all the clients that have data to be read from - $changed = socket_select( $read, $write = NULL, $except = NULL, NULL ); - if ( $changed === false ) { - trigger_error( 'socket_listen(): ' . socket_strerror( socket_last_error() ) ); - continue; - } elseif ( $changed < 1 ) { + list( $read, $write ) = $socketArray->socketsForSelect(); + if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) { continue; // wait } // Check if there is a client trying to connect... - if ( in_array( $this->sock, $read ) && count( $clients ) < $this->maxClients ) { - // Accept the new client... - $newsock = socket_accept( $this->sock ); - if ( $newsock ) { - socket_set_option( $newsock, SOL_SOCKET, SO_KEEPALIVE, 1 ); - socket_set_option( $newsock, SOL_SOCKET, SO_RCVTIMEO, $this->connTimeout ); - socket_set_option( $newsock, SOL_SOCKET, SO_SNDTIMEO, $this->connTimeout ); - $clients[] = $newsock; - // Remove the listening socket from the clients-with-data array... - $key = array_search( $this->sock, $read ); - unset( $read[$key] ); + if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) { + $newSock = socket_accept( $this->sock ); + if ( $newSock ) { + socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 ); + socket_set_nonblock( $newSock ); // don't block on read()/write() + $socketArray->addSocket( $newSock ); } } // Loop through all the clients that have data to read... foreach ( $read as $read_sock ) { - // Read until newline or 65535 bytes are recieved. - // socket_read show errors when the client is disconnected. - $data = @socket_read( $read_sock, 65535, PHP_NORMAL_READ ); + if ( $read_sock === $this->sock ) { + continue; // skip listening socket + } + // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471 + $data = socket_read( $read_sock, 65535 ); // Check if the client is disconnected - if ( $data === false ) { - // Remove client from $clients list - $key = array_search( $read_sock, $clients ); - unset( $clients[$key] ); - // Remove socket's session from tracking (if it exists) - $session = array_search( $read_sock, $this->sessions ); - if ( $session !== false ) { - unset( $this->sessions[$session] ); - // Record recently killed sessions that still have locks - if ( isset( $this->sessionIndexSh[$session] ) - || isset( $this->sessionIndexEx[$session] ) ) - { - $this->deadSessions[$session] = time(); - } - } - } else { + if ( $data === false || $data === '' ) { + $socketArray->closeSocket( $read_sock ); + $this->recordDeadSocket( $read_sock ); // remove session + // Check if we reached the end of a message + } elseif ( substr( $data, -1 ) === "\n" ) { + // Newline is the last char (given ping-pong message usage) + $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data; // Perform the requested command... - $response = $this->doCommand( trim( $data ), $read_sock ); + $response = $this->doCommand( rtrim( $cmd ), $read_sock ); // Send the response to the client... - if ( socket_write( $read_sock, "$response\n" ) === false ) { - trigger_error( 'socket_write(): ' . - socket_strerror( socket_last_error( $read_sock ) ) ); - } + $socketArray->appendSndBuffer( $read_sock, $response . "\n" ); + // Otherwise, we just have more message data to append + } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) { + $socketArray->closeSocket( $read_sock ); // too big + $this->recordDeadSocket( $read_sock ); // remove session } } - // Prune dead locks every 10 socket events... + // Loop through all the clients that have data to write... + foreach ( $write as $write_sock ) { + $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) ); + // Check if the client is disconnected + if ( $bytes === false ) { + $socketArray->closeSocket( $write_sock ); + $this->recordDeadSocket( $write_sock ); // remove session + // Otherwise, truncate these bytes from the start of the write buffer + } else { + $socketArray->consumeSndBuffer( $write_sock, $bytes ); + } + } + // Prune dead locks every few socket events... if ( ++$this->ticks >= 9 ) { $this->ticks = 0; $this->purgeExpiredLocks(); @@ -206,6 +194,7 @@ class LockServerDaemon { // On first command, track the session => sock correspondence if ( !isset( $this->sessions[$session] ) ) { $this->sessions[$session] = $sourceSock; + unset( $this->deadSessions[$session] ); // renew if dead } if ( $function === 'ACQUIRE' ) { return $this->lockHolder->lock( $session, $type, $resources ); @@ -259,14 +248,34 @@ class LockServerDaemon { return 'BAD_FORMAT'; } + /** + * Remove a socket's corresponding session from tracking and + * store it in the dead session tracking if it still has locks. + * + * @param $socket resource + * @return book + */ + protected function recordDeadSocket( $socket ) { + $session = array_search( $socket, $this->sessions ); + if ( $session !== false ) { + unset( $this->sessions[$session] ); + // Record recently killed sessions that still have locks + if ( $this->lockHolder->sessionHasLocks( $session ) ) { + $this->deadSessions[$session] = time(); + } + return true; + } + return false; + } + /** * Clear locks for sessions that have been dead for a while * * @return integer Number of sessions purged */ protected function purgeExpiredLocks() { - $now = time(); $count = 0; + $now = time(); foreach ( $this->deadSessions as $session => $timestamp ) { if ( ( $now - $timestamp ) > $this->lockTimeout ) { $this->lockHolder->release( $session ); @@ -288,8 +297,143 @@ class LockServerDaemon { } /** - * LockServerDaemon helper class that keeps track of the locks. - * This should not require MediaWiki setup or PHP files. + * LockServerDaemon helper class that keeps track socket states + */ +class SocketArray { + /* @var Array */ + protected $clients = array(); // array of client sockets + /* @var Array */ + protected $rBuffers = array(); // corresponding socket read buffers + /* @var Array */ + protected $wBuffers = array(); // corresponding socket write buffers + + const BUFFER_SIZE = 65535; + + /** + * @return Array (list of sockets to read, list of sockets to write) + */ + public function socketsForSelect() { + $rSockets = array(); + $wSockets = array(); + foreach ( $this->clients as $key => $socket ) { + if ( $this->wBuffers[$key] !== '' ) { + $wSockets[] = $socket; // wait for writing to unblock + } else { + $rSockets[] = $socket; // wait for reading to unblock + } + } + return array( $rSockets, $wSockets ); + } + + /** + * @return integer Number of client sockets + */ + public function size() { + return count( $this->clients ); + } + + /** + * @param $sock resource + * @return bool + */ + public function addSocket( $sock ) { + $this->clients[] = $sock; + $this->rBuffers[] = ''; + $this->wBuffers[] = ''; + return true; + } + + /** + * @param $sock resource + * @return bool + */ + public function closeSocket( $sock ) { + $key = array_search( $sock, $this->clients ); + if ( $key === false ) { + return false; + } + socket_close( $sock ); + unset( $this->clients[$key] ); + unset( $this->rBuffers[$key] ); + unset( $this->wBuffers[$key] ); + return true; + } + + /** + * @param $sock resource + * @param $data string + * @return bool + */ + public function appendRcvBuffer( $sock, $data ) { + $key = array_search( $sock, $this->clients ); + if ( $key === false ) { + return false; + } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) { + return false; + } + $this->rBuffers[$key] .= $data; + return true; + } + + /** + * @param $sock resource + * @return string|false + */ + public function readRcvBuffer( $sock ) { + $key = array_search( $sock, $this->clients ); + if ( $key === false ) { + return false; + } + $data = $this->rBuffers[$key]; + $this->rBuffers[$key] = ''; // consume data + return $data; + } + + /** + * @param $sock resource + * @param $data string + * @return bool + */ + public function appendSndBuffer( $sock, $data ) { + $key = array_search( $sock, $this->clients ); + if ( $key === false ) { + return false; + } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) { + return false; + } + $this->wBuffers[$key] .= $data; + return true; + } + + /** + * @param $sock resource + * @return bool + */ + public function readSndBuffer( $sock ) { + $key = array_search( $sock, $this->clients ); + if ( $key === false ) { + return false; + } + return $this->wBuffers[$key]; + } + + /** + * @param $sock resource + * @param $bytes integer + * @return bool + */ + public function consumeSndBuffer( $sock, $bytes ) { + $key = array_search( $sock, $this->clients ); + if ( $key === false ) { + return false; + } + $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes ); + return true; + } +} + +/** + * LockServerDaemon helper class that keeps track of the locks */ class LockHolder { /** @var Array */ @@ -312,6 +456,15 @@ class LockHolder { $this->maxLocks = $maxLocks; } + /** + * @param $session string + * @return bool + */ + public function sessionHasLocks( $session ) { + return isset( $this->sessionIndexSh[$session] ) + || isset( $this->sessionIndexEx[$session] ); + } + /** * @param $session string * @param $type string