<?php
-
+/**
+ * This code should not require MediaWiki setup or PHP files.
+ */
if ( php_sapi_name() !== 'cli' ) {
die( "This is not a valid entry point.\n" );
}
LockServerDaemon::init(
getopt( '', array(
'address:', 'port:', 'authKey:',
- 'connTimeout::', 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
+ 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
) )
)->main();
/**
- * Simple lock server daemon that accepts lock/unlock requests.
- * This should not require MediaWiki setup or PHP files.
+ * Simple lock server daemon that accepts lock/unlock requests
*/
class LockServerDaemon {
/** @var resource */
/** @var LockHolder */
protected $lockHolder;
- protected $address; // string (IP/hostname)
+ protected $address; // string IP address
protected $port; // integer
protected $authKey; // string key
- protected $connTimeout; // array ( 'sec' => integer, 'usec' => integer )
protected $lockTimeout; // integer number of seconds
protected $maxBacklog; // integer
protected $maxClients; // integer
protected $startTime; // integer UNIX timestamp
protected $ticks = 0; // integer counter
+ /* @var LockServerDaemon */
protected static $instance = null;
/**
if ( !isset( $config[$par] ) ) {
die( "Usage: php LockServerDaemon.php " .
"--address <address> --port <port> --authkey <key> " .
- "[--connTimeout <seconds>] [--lockTimeout <seconds>] " .
+ "[--lockTimeout <seconds>] " .
"[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]"
);
}
$this->port = $config['port'];
$this->authKey = $config['authKey'];
// Parameters with defaults...
- $connTimeout = isset( $config['connTimeout'] )
- ? $config['connTimeout']
- : 1.5;
- $this->connTimeout = array(
- 'sec' => floor( $connTimeout ),
- 'usec' => floor( ( $connTimeout - floor( $connTimeout ) ) * 1e6 )
- );
$this->lockTimeout = isset( $config['lockTimeout'] )
? (int)$config['lockTimeout']
: 60;
: 100;
$maxLocks = isset( $config['maxLocks'] )
? (int)$config['maxLocks']
- : 5000;
+ : 10000;
$this->lockHolder = new LockHolder( $maxLocks );
}
/**
* @return void
*/
- protected function setupSocket() {
+ protected function setupServerSocket() {
if ( !function_exists( 'socket_create' ) ) {
throw new Exception( "PHP sockets extension missing from PHP CLI mode." );
}
socket_strerror( socket_last_error( $sock ) ) );
}
$this->sock = $sock;
-
$this->startTime = time();
}
/**
- * @return void
+ * Entry-point function that listens to the server socket, accepts
+ * new clients, and recieves/responds to requests to lock resources.
*/
public function main() {
- // Setup socket and start listing
- $this->setupSocket();
- // Create a list of all the clients that will be connected to us.
- $clients = array( $this->sock ); // start off with listening socket
+ $this->setupServerSocket(); // setup listening socket
+ $socketArray = new SocketArray(); // sockets being serviced
+ $socketArray->addSocket( $this->sock ); // add listening socket
do {
- // Create a copy, so $clients doesn't get modified by socket_select()
- $read = $clients; // clients-with-data (plus listening socket)
- // Get a list of all the clients that have data to be read from
- $changed = socket_select( $read, $write = NULL, $except = NULL, NULL );
- if ( $changed === false ) {
- trigger_error( 'socket_listen(): ' . socket_strerror( socket_last_error() ) );
- continue;
- } elseif ( $changed < 1 ) {
+ list( $read, $write ) = $socketArray->socketsForSelect();
+ if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) {
continue; // wait
}
// Check if there is a client trying to connect...
- if ( in_array( $this->sock, $read ) && count( $clients ) < $this->maxClients ) {
- // Accept the new client...
- $newsock = socket_accept( $this->sock );
- if ( $newsock ) {
- socket_set_option( $newsock, SOL_SOCKET, SO_KEEPALIVE, 1 );
- socket_set_option( $newsock, SOL_SOCKET, SO_RCVTIMEO, $this->connTimeout );
- socket_set_option( $newsock, SOL_SOCKET, SO_SNDTIMEO, $this->connTimeout );
- $clients[] = $newsock;
- // Remove the listening socket from the clients-with-data array...
- $key = array_search( $this->sock, $read );
- unset( $read[$key] );
+ if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) {
+ $newSock = socket_accept( $this->sock );
+ if ( $newSock ) {
+ socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 );
+ socket_set_nonblock( $newSock ); // don't block on read()/write()
+ $socketArray->addSocket( $newSock );
}
}
// Loop through all the clients that have data to read...
foreach ( $read as $read_sock ) {
- // Read until newline or 65535 bytes are recieved.
- // socket_read show errors when the client is disconnected.
- $data = @socket_read( $read_sock, 65535, PHP_NORMAL_READ );
+ if ( $read_sock === $this->sock ) {
+ continue; // skip listening socket
+ }
+ // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471
+ $data = socket_read( $read_sock, 65535 );
// Check if the client is disconnected
- if ( $data === false ) {
- // Remove client from $clients list
- $key = array_search( $read_sock, $clients );
- unset( $clients[$key] );
- // Remove socket's session from tracking (if it exists)
- $session = array_search( $read_sock, $this->sessions );
- if ( $session !== false ) {
- unset( $this->sessions[$session] );
- // Record recently killed sessions that still have locks
- if ( isset( $this->sessionIndexSh[$session] )
- || isset( $this->sessionIndexEx[$session] ) )
- {
- $this->deadSessions[$session] = time();
- }
- }
- } else {
+ if ( $data === false || $data === '' ) {
+ $socketArray->closeSocket( $read_sock );
+ $this->recordDeadSocket( $read_sock ); // remove session
+ // Check if we reached the end of a message
+ } elseif ( substr( $data, -1 ) === "\n" ) {
+ // Newline is the last char (given ping-pong message usage)
+ $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data;
// Perform the requested command...
- $response = $this->doCommand( trim( $data ), $read_sock );
+ $response = $this->doCommand( rtrim( $cmd ), $read_sock );
// Send the response to the client...
- if ( socket_write( $read_sock, "$response\n" ) === false ) {
- trigger_error( 'socket_write(): ' .
- socket_strerror( socket_last_error( $read_sock ) ) );
- }
+ $socketArray->appendSndBuffer( $read_sock, $response . "\n" );
+ // Otherwise, we just have more message data to append
+ } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) {
+ $socketArray->closeSocket( $read_sock ); // too big
+ $this->recordDeadSocket( $read_sock ); // remove session
}
}
- // Prune dead locks every 10 socket events...
+ // Loop through all the clients that have data to write...
+ foreach ( $write as $write_sock ) {
+ $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) );
+ // Check if the client is disconnected
+ if ( $bytes === false ) {
+ $socketArray->closeSocket( $write_sock );
+ $this->recordDeadSocket( $write_sock ); // remove session
+ // Otherwise, truncate these bytes from the start of the write buffer
+ } else {
+ $socketArray->consumeSndBuffer( $write_sock, $bytes );
+ }
+ }
+ // Prune dead locks every few socket events...
if ( ++$this->ticks >= 9 ) {
$this->ticks = 0;
$this->purgeExpiredLocks();
// On first command, track the session => sock correspondence
if ( !isset( $this->sessions[$session] ) ) {
$this->sessions[$session] = $sourceSock;
+ unset( $this->deadSessions[$session] ); // renew if dead
}
if ( $function === 'ACQUIRE' ) {
return $this->lockHolder->lock( $session, $type, $resources );
return 'BAD_FORMAT';
}
+ /**
+ * Remove a socket's corresponding session from tracking and
+ * store it in the dead session tracking if it still has locks.
+ *
+ * @param $socket resource
+ * @return book
+ */
+ protected function recordDeadSocket( $socket ) {
+ $session = array_search( $socket, $this->sessions );
+ if ( $session !== false ) {
+ unset( $this->sessions[$session] );
+ // Record recently killed sessions that still have locks
+ if ( $this->lockHolder->sessionHasLocks( $session ) ) {
+ $this->deadSessions[$session] = time();
+ }
+ return true;
+ }
+ return false;
+ }
+
/**
* Clear locks for sessions that have been dead for a while
*
* @return integer Number of sessions purged
*/
protected function purgeExpiredLocks() {
- $now = time();
$count = 0;
+ $now = time();
foreach ( $this->deadSessions as $session => $timestamp ) {
if ( ( $now - $timestamp ) > $this->lockTimeout ) {
$this->lockHolder->release( $session );
}
/**
- * LockServerDaemon helper class that keeps track of the locks.
- * This should not require MediaWiki setup or PHP files.
+ * LockServerDaemon helper class that keeps track socket states
+ */
+class SocketArray {
+ /* @var Array */
+ protected $clients = array(); // array of client sockets
+ /* @var Array */
+ protected $rBuffers = array(); // corresponding socket read buffers
+ /* @var Array */
+ protected $wBuffers = array(); // corresponding socket write buffers
+
+ const BUFFER_SIZE = 65535;
+
+ /**
+ * @return Array (list of sockets to read, list of sockets to write)
+ */
+ public function socketsForSelect() {
+ $rSockets = array();
+ $wSockets = array();
+ foreach ( $this->clients as $key => $socket ) {
+ if ( $this->wBuffers[$key] !== '' ) {
+ $wSockets[] = $socket; // wait for writing to unblock
+ } else {
+ $rSockets[] = $socket; // wait for reading to unblock
+ }
+ }
+ return array( $rSockets, $wSockets );
+ }
+
+ /**
+ * @return integer Number of client sockets
+ */
+ public function size() {
+ return count( $this->clients );
+ }
+
+ /**
+ * @param $sock resource
+ * @return bool
+ */
+ public function addSocket( $sock ) {
+ $this->clients[] = $sock;
+ $this->rBuffers[] = '';
+ $this->wBuffers[] = '';
+ return true;
+ }
+
+ /**
+ * @param $sock resource
+ * @return bool
+ */
+ public function closeSocket( $sock ) {
+ $key = array_search( $sock, $this->clients );
+ if ( $key === false ) {
+ return false;
+ }
+ socket_close( $sock );
+ unset( $this->clients[$key] );
+ unset( $this->rBuffers[$key] );
+ unset( $this->wBuffers[$key] );
+ return true;
+ }
+
+ /**
+ * @param $sock resource
+ * @param $data string
+ * @return bool
+ */
+ public function appendRcvBuffer( $sock, $data ) {
+ $key = array_search( $sock, $this->clients );
+ if ( $key === false ) {
+ return false;
+ } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
+ return false;
+ }
+ $this->rBuffers[$key] .= $data;
+ return true;
+ }
+
+ /**
+ * @param $sock resource
+ * @return string|false
+ */
+ public function readRcvBuffer( $sock ) {
+ $key = array_search( $sock, $this->clients );
+ if ( $key === false ) {
+ return false;
+ }
+ $data = $this->rBuffers[$key];
+ $this->rBuffers[$key] = ''; // consume data
+ return $data;
+ }
+
+ /**
+ * @param $sock resource
+ * @param $data string
+ * @return bool
+ */
+ public function appendSndBuffer( $sock, $data ) {
+ $key = array_search( $sock, $this->clients );
+ if ( $key === false ) {
+ return false;
+ } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
+ return false;
+ }
+ $this->wBuffers[$key] .= $data;
+ return true;
+ }
+
+ /**
+ * @param $sock resource
+ * @return bool
+ */
+ public function readSndBuffer( $sock ) {
+ $key = array_search( $sock, $this->clients );
+ if ( $key === false ) {
+ return false;
+ }
+ return $this->wBuffers[$key];
+ }
+
+ /**
+ * @param $sock resource
+ * @param $bytes integer
+ * @return bool
+ */
+ public function consumeSndBuffer( $sock, $bytes ) {
+ $key = array_search( $sock, $this->clients );
+ if ( $key === false ) {
+ return false;
+ }
+ $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes );
+ return true;
+ }
+}
+
+/**
+ * LockServerDaemon helper class that keeps track of the locks
*/
class LockHolder {
/** @var Array */
$this->maxLocks = $maxLocks;
}
+ /**
+ * @param $session string
+ * @return bool
+ */
+ public function sessionHasLocks( $session ) {
+ return isset( $this->sessionIndexSh[$session] )
+ || isset( $this->sessionIndexEx[$session] );
+ }
+
/**
* @param $session string
* @param $type string