Added pipelining and max connection support to MultiHttpClient
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 17 Jan 2014 21:32:46 +0000 (13:32 -0800)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 11 Feb 2014 01:56:21 +0000 (17:56 -0800)
* Also added timeout options to the run()/runMulti() methods

Change-Id: I2b080567e17fa29804c36e3b259b1ce214e6f465

includes/libs/MultiHttpClient.php

index 0675b38..350790f 100644 (file)
@@ -24,12 +24,20 @@ class MultiHttpClient {
        /** @var string|null SSL certificates path  */
        protected $caBundlePath;
        /** @var integer */
-       protected $connTimeout;
+       protected $connTimeout = 10;
        /** @var integer */
-       protected $reqTimeout;
+       protected $reqTimeout = 300;
+       /** @var bool */
+       protected $usePipelining = false;
+       /** @var integer */
+       protected $maxConnsPerHost = 50;
 
        /**
         * @param array $options
+        *   - connTimeout     : default connection timeout
+        *   - reqTimeout      : default request timeout
+        *   - usePipelining   : whether to use HTTP pipelining if possible (for all hosts)
+        *   - maxConnsPerHost : maximum number of concurrent connections (per host)
         */
        public function __construct( array $options ) {
                if ( isset( $options['caBundlePath'] ) ) {
@@ -38,9 +46,11 @@ class MultiHttpClient {
                                throw new Exception( "Cannot find CA bundle: " . $this->caBundlePath );
                        }
                }
-               static $defaults = array( 'connTimeout' => 10, 'reqTimeout' => 300 );
-               foreach ( $defaults as $key => $default ) {
-                       $this->$key = isset( $options[$key] ) ? $options[$key] : $default;
+               static $opts = array( 'connTimeout', 'reqTimeout', 'usePipelining', 'maxConnsPerHost' );
+               foreach ( $opts as $key ) {
+                       if ( isset( $options[$key] ) ) {
+                               $this->$key = $options[$key];
+                       }
                }
        }
 
@@ -58,10 +68,13 @@ class MultiHttpClient {
         *              list( $rcode, $rdesc, $rhdrs, $rbody, $rerr ) = $req;
         *  </code>
         * @param array $req HTTP request array
+        * @param array $opts
+        *   - connTimeout    : connection timeout per request
+        *   - reqTimeout     : post-connection timeout per request
         * @return array Response array for request
         */
-       public function run( array $req ) {
-               $req = $this->runMulti( array( $req ) );
+       public function run( array $req, array $opts = array() ) {
+               $req = $this->runMulti( array( $req ), $opts );
                return $req[0]['response'];
        }
 
@@ -82,10 +95,15 @@ class MultiHttpClient {
         * This is true for the request headers and the response headers.
         *
         * @param array $req Map of HTTP request arrays
+        * @param array $opts
+        *   - connTimeout     : connection timeout per request
+        *   - reqTimeout      : post-connection timeout per request
+        *   - usePipelining   : whether to use HTTP pipelining if possible
+        *   - maxConnsPerHost : maximum number of concurrent connections (per host)
         * @return array $reqs With response array populated for each
         */
-       public function runMulti( array $reqs ) {
-               $multiHandle = $this->getCurlMulti();
+       public function runMulti( array $reqs, array $opts = array() ) {
+               $chm = $this->getCurlMulti();
 
                // Normalize $reqs and add all of the required cURL handles...
                $handles = array();
@@ -114,34 +132,53 @@ class MultiHttpClient {
                                $req['body'] = '';
                                $req['headers']['content-length'] = 0;
                        }
-                       $handles[$index] = $this->getCurlHandle( $req );
+                       $handles[$index] = $this->getCurlHandle( $req, $opts );
                        if ( count( $reqs ) > 1 ) {
                                // https://github.com/guzzle/guzzle/issues/349
                                curl_setopt( $handles[$index], CURLOPT_FORBID_REUSE, true );
                        }
-                       curl_multi_add_handle( $multiHandle, $handles[$index] );
                }
 
-               // Execute the cURL handles concurrently...
-               $active = null; // handles still being processed
-               do {
-                       // Do any available work...
+               $indexes = array_keys( $reqs );
+               if ( function_exists( 'curl_multi_setopt' ) ) { // PHP 5.5
+                       if ( isset( $opts['usePipelining'] ) ) {
+                               curl_multi_setopt( $chm, CURLMOPT_PIPELINING, (int)$opts['usePipelining'] );
+                       }
+                       if ( isset( $opts['maxConnsPerHost'] ) ) {
+                               // Keep these sockets around as they may be needed later in the request
+                               curl_multi_setopt( $chm, CURLMOPT_MAXCONNECTS, (int)$opts['maxConnsPerHost'] );
+                       }
+               }
+
+               // @TODO: use a per-host rolling handle window (e.g. CURLMOPT_MAX_HOST_CONNECTIONS)
+               $batches = array_chunk( $indexes, $this->maxConnsPerHost );
+
+               foreach ( $batches as $batch ) {
+                       // Attach all cURL handles for this batch
+                       foreach ( $batch as $index ) {
+                               curl_multi_add_handle( $chm, $handles[$index] );
+                       }
+                       // Execute the cURL handles concurrently...
+                       $active = null; // handles still being processed
                        do {
-                               $mrc = curl_multi_exec( $multiHandle, $active );
-                       } while ( $mrc == CURLM_CALL_MULTI_PERFORM );
-                       // Wait (if possible) for available work...
-                       if ( $active > 0 && $mrc == CURLM_OK ) {
-                               if ( curl_multi_select( $multiHandle, 10 ) == -1 ) {
-                                       // PHP bug 63411; http://curl.haxx.se/libcurl/c/curl_multi_fdset.html
-                                       usleep( 5000 ); // 5ms
+                               // Do any available work...
+                               do {
+                                       $mrc = curl_multi_exec( $chm, $active );
+                               } while ( $mrc == CURLM_CALL_MULTI_PERFORM );
+                               // Wait (if possible) for available work...
+                               if ( $active > 0 && $mrc == CURLM_OK ) {
+                                       if ( curl_multi_select( $chm, 10 ) == -1 ) {
+                                               // PHP bug 63411; http://curl.haxx.se/libcurl/c/curl_multi_fdset.html
+                                               usleep( 5000 ); // 5ms
+                                       }
                                }
-                       }
-               } while ( $active > 0 && $mrc == CURLM_OK );
+                       } while ( $active > 0 && $mrc == CURLM_OK );
+               }
 
                // Remove all of the added cURL handles and check for errors...
                foreach ( $reqs as $index => &$req ) {
                        $ch = $handles[$index];
-                       curl_multi_remove_handle( $multiHandle, $ch );
+                       curl_multi_remove_handle( $chm, $ch );
                        if ( curl_errno( $ch ) !== 0 ) {
                                $req['error'] = "(curl error: " . curl_errno( $ch ) . ") " . curl_error( $ch );
                        }
@@ -159,18 +196,29 @@ class MultiHttpClient {
                        }
                }
 
+               // Restore the default settings
+               if ( function_exists( 'curl_multi_setopt' ) ) { // PHP 5.5
+                       curl_multi_setopt( $chm, CURLMOPT_PIPELINING, (int)$this->usePipelining );
+                       curl_multi_setopt( $chm, CURLMOPT_MAXCONNECTS, (int)$this->maxConnsPerHost );
+               }
+
                return $reqs;
        }
 
        /**
         * @param array $req HTTP request map
+        * @param array $opts
+        *   - connTimeout    : default connection timeout
+        *   - reqTimeout     : default request timeout
         * @return resource
         */
-       protected function getCurlHandle( array &$req ) {
+       protected function getCurlHandle( array &$req, array $opts = array() ) {
                $ch = curl_init();
 
-               curl_setopt( $ch, CURLOPT_CONNECTTIMEOUT, $this->connTimeout );
-               curl_setopt( $ch, CURLOPT_TIMEOUT, $this->reqTimeout );
+               curl_setopt( $ch, CURLOPT_CONNECTTIMEOUT,
+                       isset( $opts['connTimeout'] ) ? $opts['connTimeout'] : $this->connTimeout );
+               curl_setopt( $ch, CURLOPT_TIMEOUT,
+                       isset( $opts['reqTimeout'] ) ? $opts['reqTimeout'] : $this->reqTimeout );
                curl_setopt( $ch, CURLOPT_FOLLOWLOCATION, 1 );
                curl_setopt( $ch, CURLOPT_MAXREDIRS, 4 );
                curl_setopt( $ch, CURLOPT_HEADER, 0 );
@@ -290,7 +338,12 @@ class MultiHttpClient {
         */
        protected function getCurlMulti() {
                if ( !$this->multiHandle ) {
-                       $this->multiHandle = curl_multi_init();
+                       $cmh = curl_multi_init();
+                       if ( function_exists( 'curl_multi_setopt' ) ) { // PHP 5.5
+                               curl_multi_setopt( $cmh, CURLMOPT_PIPELINING, (int)$this->usePipelining );
+                               curl_multi_setopt( $cmh, CURLMOPT_MAXCONNECTS, (int)$this->maxConnsPerHost );
+                       }
+                       $this->multiHandle = $cmh;
                }
                return $this->multiHandle;
        }