Worked around lack of timeouts in fwrite(), using non-blocking I/O. Also flush the...
authorTim Starling <tstarling@users.mediawiki.org>
Wed, 7 Sep 2005 10:10:57 +0000 (10:10 +0000)
committerTim Starling <tstarling@users.mediawiki.org>
Wed, 7 Sep 2005 10:10:57 +0000 (10:10 +0000)
includes/memcached-client.php

index 4ff0da9..7d08964 100644 (file)
@@ -313,7 +313,7 @@ class memcached
       
       @$this->stats['delete']++;
       $cmd = "delete $key $time\r\n";
-      if(!fwrite($sock, $cmd, strlen($cmd)))
+      if(!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
       {
          $this->_dead_sock($sock);
          return false;
@@ -385,20 +385,28 @@ class memcached
     */
    function get ($key)
    {
-      if (!$this->_active)
+      $fname = 'memcached::get';
+      wfProfileIn( $fname );
+
+      if (!$this->_active) {
+        wfProfileOut( $fname );
          return false;
+      }
          
       $sock = $this->get_sock($key);
       
-      if (!is_resource($sock))
+      if (!is_resource($sock)) {
+        wfProfileOut( $fname );
          return false;
+      }
          
       @$this->stats['get']++;
       
       $cmd = "get $key\r\n";
-      if (!fwrite($sock, $cmd, strlen($cmd)))
+      if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
       {
          $this->_dead_sock($sock);
+        wfProfileOut( $fname );
          return false;
       }
       
@@ -409,6 +417,7 @@ class memcached
          foreach ($val as $k => $v)
             $this->_debugprint(@sprintf("MemCache: sock %s got %s => %s\r\n", serialize($sock), $k, $v));
 
+      wfProfileOut( $fname );
       return @$val[$key];
    }
 
@@ -453,7 +462,7 @@ class memcached
          }
          $cmd .= "\r\n";
          
-         if (fwrite($sock, $cmd, strlen($cmd)))
+         if ($this->_safe_fwrite($sock, $cmd, strlen($cmd)))
          {
             $gather[] = $sock;
          } else
@@ -535,7 +544,7 @@ class memcached
       if (!is_resource($sock))
          return array();
       
-      if (!fwrite($sock, $cmd, strlen($cmd)))
+      if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
          return array();
          
       while (true)
@@ -729,8 +738,10 @@ class memcached
       if (!$this->_active)
          return false;
 
-      if ($this->_single_sock !== null)
+      if ($this->_single_sock !== null) {
+         $this->_flush_read_buffer($this->_single_sock);
          return $this->sock_to_host($this->_single_sock);
+      }
       
       $hv = is_array($key) ? intval($key[0]) : $this->_hashfunc($key);
       
@@ -756,8 +767,10 @@ class memcached
       {
          $host = $this->_buckets[$hv % $this->_bucketcount];
          $sock = $this->sock_to_host($host);
-         if (is_resource($sock))
+         if (is_resource($sock)) {
+            $this->_flush_read_buffer($sock);
             return $sock;
+         }
          $hv += $this->_hashfunc($tries . $realkey);
       }
       
@@ -807,7 +820,7 @@ class memcached
          
       $key = is_array($key) ? $key[1] : $key;
       @$this->stats[$cmd]++;
-      if (!fwrite($sock, "$cmd $key $amt\r\n"))
+      if (!$this->_safe_fwrite($sock, "$cmd $key $amt\r\n"))
          return $this->_dead_sock($sock);
          
       stream_set_timeout($sock, 1, 0);
@@ -932,7 +945,7 @@ class memcached
             $flags |= MEMCACHE_COMPRESSED;
          }
       }
-      if (!fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
+      if (!$this->_safe_fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
          return $this->_dead_sock($sock);
          
       $line = trim(fgets($sock));
@@ -985,10 +998,46 @@ class memcached
       print($str);
    }
 
+   /**
+    * Write to a stream, timing out after the correct amount of time
+    * 
+    * @return bool false on failure, true on success
+    */
+   function _safe_fwrite($f, $buf, $len = false) {
+      stream_set_blocking($f, 0);
+
+      if ($len === false) {
+         $bytesWritten = fwrite($f, $buf);
+      } else {
+         $bytesWritten = fwrite($f, $buf, $len);
+      }
+      $n = stream_select($r=NULL, $w = array($f), $e = NULL, 
+         $this->_timeout_seconds, $this->_timeout_microseconds);
+
+      stream_set_blocking($f, 1);
+      return $n == 1;
+   }
+
+   /**
+    * Flush the read buffer of a stream
+    */
+   function _flush_read_buffer($f) {
+      if (!is_resource($f)) {
+         return;
+      }
+      $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+      while ($n == 1 && !feof($f)) {
+         fread($f, 1024);
+         $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+      }
+   }
+
    // }}}
    // }}}
    // }}}
 }
 
+// vim: sts=3 sw=3 et
+
 // }}}
 ?>