]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blobdiff - includes/memcached-client.php
MediaWiki 1.15.4-scripts
[autoinstalls/mediawiki.git] / includes / memcached-client.php
index 4ff0da91ced1cae282aae8bbcb12e9902c7fa6a7..79745309293740973b465cb12ea6de58078aeb2c 100644 (file)
@@ -43,9 +43,9 @@
  * Usage example:
  *
  * require_once 'memcached.php';
- * 
+ *
  * $mc = new memcached(array(
- *              'servers' => array('127.0.0.1:10000', 
+ *              'servers' => array('127.0.0.1:10000',
  *                                 array('192.0.0.1:10010', 2),
  *                                 '127.0.0.1:10020'),
  *              'debug'   => false,
  * $val = $mc->get('key');
  *
  * @author  Ryan T. Dean <rtdean@cytherianage.net>
- * @package memcached-client
  * @version 0.1.2
  */
 
 // {{{ requirements
 // }}}
 
-// {{{ constants
-// {{{ flags
-
-/**
- * Flag: indicates data is serialized
- */
-define("MEMCACHE_SERIALIZED", 1<<0);
-
-/**
- * Flag: indicates data is compressed
- */
-define("MEMCACHE_COMPRESSED", 1<<1);
-
-// }}}
-
-/**
- * Minimum savings to store data compressed
- */
-define("COMPRESSION_SAVINGS", 0.20);
-
-// }}}
-
 // {{{ class memcached
 /**
  * memcached client class implemented using (p)fsockopen()
  *
  * @author  Ryan T. Dean <rtdean@cytherianage.net>
- * @package memcached-client
+ * @ingroup Cache
  */
 class memcached
 {
    // {{{ properties
    // {{{ public
 
+               // {{{ constants
+               // {{{ flags
+
+               /**
+                * Flag: indicates data is serialized
+                */
+               const SERIALIZED = 1;
+
+               /**
+                * Flag: indicates data is compressed
+                */
+               const COMPRESSED = 2;
+
+               // }}}
+
+               /**
+                * Minimum savings to store data compressed
+                */
+               const COMPRESSION_SAVINGS = 0.20;
+
+               // }}}
+
+
    /**
     * Command statistics
     *
@@ -105,7 +105,7 @@ class memcached
     * @access  public
     */
    var $stats;
-   
+
    // }}}
    // {{{ private
 
@@ -116,7 +116,7 @@ class memcached
     * @access  private
     */
    var $_cache_sock;
-   
+
    /**
     * Current debug status; 0 - none to 9 - profiling
     *
@@ -124,7 +124,7 @@ class memcached
     * @access  private
     */
    var $_debug;
-   
+
    /**
     * Dead hosts, assoc array, 'host'=>'unixtime when ok to check again'
     *
@@ -132,7 +132,7 @@ class memcached
     * @access  private
     */
    var $_host_dead;
-   
+
    /**
     * Is compression available?
     *
@@ -140,7 +140,7 @@ class memcached
     * @access  private
     */
    var $_have_zlib;
-   
+
    /**
     * Do we want to use compression?
     *
@@ -148,15 +148,15 @@ class memcached
     * @access  private
     */
    var $_compress_enable;
-   
+
    /**
     * At how many bytes should we compress?
     *
-    * @var     interger
+    * @var     integer
     * @access  private
     */
    var $_compress_threshold;
-   
+
    /**
     * Are we using persistant links?
     *
@@ -164,7 +164,7 @@ class memcached
     * @access  private
     */
    var $_persistant;
-   
+
    /**
     * If only using one server; contains ip:port to connect to
     *
@@ -172,7 +172,7 @@ class memcached
     * @access  private
     */
    var $_single_sock;
-   
+
    /**
     * Array containing ip:port or array(ip:port, weight)
     *
@@ -180,7 +180,7 @@ class memcached
     * @access  private
     */
    var $_servers;
-   
+
    /**
     * Our bit buckets
     *
@@ -188,19 +188,19 @@ class memcached
     * @access  private
     */
    var $_buckets;
-   
+
    /**
     * Total # of bit buckets we have
     *
-    * @var     interger
+    * @var     integer
     * @access  private
     */
    var $_bucketcount;
-   
+
    /**
     * # of total servers we have
     *
-    * @var     interger
+    * @var     integer
     * @access  private
     */
    var $_active;
@@ -221,6 +221,16 @@ class memcached
     */
    var $_timeout_microseconds;
 
+   /**
+    * Connect timeout in seconds
+    */
+   var $_connect_timeout;
+
+   /**
+    * Number of connection attempts for each server
+    */
+   var $_connect_attempts;
+
    // }}}
    // }}}
    // {{{ methods
@@ -244,24 +254,27 @@ class memcached
       $this->_persistant = array_key_exists('persistant', $args) ? (@$args['persistant']) : false;
       $this->_compress_enable = true;
       $this->_have_zlib = function_exists("gzcompress");
-      
+
       $this->_cache_sock = array();
       $this->_host_dead = array();
 
       $this->_timeout_seconds = 1;
       $this->_timeout_microseconds = 0;
+
+      $this->_connect_timeout = 0.01;
+      $this->_connect_attempts = 3;
    }
 
    // }}}
    // {{{ add()
 
    /**
-    * Adds a key/value to the memcache server if one isn't already set with 
+    * Adds a key/value to the memcache server if one isn't already set with
     * that key
     *
-    * @param   string   $key     Key to set with data
-    * @param   mixed    $val     Value to store
-    * @param   interger $exp     (optional) Time to expire data at
+    * @param   string  $key     Key to set with data
+    * @param   mixed   $val     Value to store
+    * @param   integer $exp     (optional) Time to expire data at
     *
     * @return  boolean
     * @access  public
@@ -278,7 +291,7 @@ class memcached
     * Decriment a value stored on the memcache server
     *
     * @param   string   $key     Key to decriment
-    * @param   interger $amt     (optional) Amount to decriment
+    * @param   integer  $amt     (optional) Amount to decriment
     *
     * @return  mixed    FALSE on failure, value on success
     * @access  public
@@ -295,7 +308,7 @@ class memcached
     * Deletes a key from the server, optionally after $time
     *
     * @param   string   $key     Key to delete
-    * @param   interger $time    (optional) How long to wait before deleting
+    * @param   integer  $time    (optional) How long to wait before deleting
     *
     * @return  boolean  TRUE on success, FALSE on failure
     * @access  public
@@ -304,25 +317,25 @@ class memcached
    {
       if (!$this->_active)
          return false;
-         
+
       $sock = $this->get_sock($key);
       if (!is_resource($sock))
          return false;
-      
+
       $key = is_array($key) ? $key[1] : $key;
-      
+
       @$this->stats['delete']++;
       $cmd = "delete $key $time\r\n";
-      if(!fwrite($sock, $cmd, strlen($cmd)))
+      if(!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
       {
          $this->_dead_sock($sock);
          return false;
       }
       $res = trim(fgets($sock));
-      
+
       if ($this->_debug)
          $this->_debugprint(sprintf("MemCache: delete %s (%s)\n", $key, $res));
-      
+
       if ($res == "DELETED")
          return true;
       return false;
@@ -385,30 +398,43 @@ class memcached
     */
    function get ($key)
    {
-      if (!$this->_active)
+      $fname = 'memcached::get';
+      wfProfileIn( $fname );
+
+      if ( $this->_debug ) {
+         $this->_debugprint( "get($key)\n" );
+      }
+
+      if (!$this->_active) {
+        wfProfileOut( $fname );
          return false;
-         
+      }
+
       $sock = $this->get_sock($key);
-      
-      if (!is_resource($sock))
+
+      if (!is_resource($sock)) {
+        wfProfileOut( $fname );
          return false;
-         
+      }
+
       @$this->stats['get']++;
-      
+
       $cmd = "get $key\r\n";
-      if (!fwrite($sock, $cmd, strlen($cmd)))
+      if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
       {
          $this->_dead_sock($sock);
+        wfProfileOut( $fname );
          return false;
       }
-      
+
       $val = array();
       $this->_load_items($sock, $val);
-      
+
       if ($this->_debug)
          foreach ($val as $k => $v)
-            $this->_debugprint(@sprintf("MemCache: sock %s got %s => %s\r\n", serialize($sock), $k, $v));
+            $this->_debugprint(sprintf("MemCache: sock %s got %s\n", serialize($sock), $k));
 
+      wfProfileOut( $fname );
       return @$val[$key];
    }
 
@@ -427,9 +453,10 @@ class memcached
    {
       if (!$this->_active)
          return false;
-         
-      $this->stats['get_multi']++;
-      
+
+      @$this->stats['get_multi']++;
+      $sock_keys = array();
+
       foreach ($keys as $key)
       {
          $sock = $this->get_sock($key);
@@ -442,7 +469,7 @@ class memcached
          }
          $sock_keys[$sock][] = $key;
       }
-      
+
       // Send out the requests
       foreach ($socks as $sock)
       {
@@ -452,8 +479,8 @@ class memcached
             $cmd .= " ". $key;
          }
          $cmd .= "\r\n";
-         
-         if (fwrite($sock, $cmd, strlen($cmd)))
+
+         if ($this->_safe_fwrite($sock, $cmd, strlen($cmd)))
          {
             $gather[] = $sock;
          } else
@@ -461,18 +488,18 @@ class memcached
             $this->_dead_sock($sock);
          }
       }
-      
+
       // Parse responses
       $val = array();
       foreach ($gather as $sock)
       {
          $this->_load_items($sock, $val);
       }
-      
+
       if ($this->_debug)
          foreach ($val as $k => $v)
-            $this->_debugprint(sprintf("MemCache: got %s => %s\r\n", $k, $v));
-            
+            $this->_debugprint(sprintf("MemCache: got %s\n", $k));
+
       return $val;
    }
 
@@ -483,9 +510,9 @@ class memcached
     * Increments $key (optionally) by $amt
     *
     * @param   string   $key     Key to increment
-    * @param   interger $amt     (optional) amount to increment
+    * @param   integer  $amt     (optional) amount to increment
     *
-    * @return  interger New key value?
+    * @return  integer  New key value?
     * @access  public
     */
    function incr ($key, $amt=1)
@@ -501,7 +528,7 @@ class memcached
     *
     * @param   string   $key     Key to set value as
     * @param   mixed    $value   Value to store
-    * @param   interger $exp     (optional) Experiation time
+    * @param   integer  $exp     (optional) Experiation time
     *
     * @return  boolean
     * @access  public
@@ -515,7 +542,7 @@ class memcached
    // {{{ run_command()
 
    /**
-    * Passes through $cmd to the memcache server connected by $sock; returns 
+    * Passes through $cmd to the memcache server connected by $sock; returns
     * output as an array (null array if no output)
     *
     * NOTE: due to a possible bug in how PHP reads while using fgets(), each
@@ -534,10 +561,10 @@ class memcached
    {
       if (!is_resource($sock))
          return array();
-      
-      if (!fwrite($sock, $cmd, strlen($cmd)))
+
+      if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
          return array();
-         
+
       while (true)
       {
          $res = fgets($sock);
@@ -559,7 +586,7 @@ class memcached
     *
     * @param   string   $key     Key to set value as
     * @param   mixed    $value   Value to set
-    * @param   interger $exp     (optional) Experiation time
+    * @param   integer  $exp     (optional) Experiation time
     *
     * @return  boolean  TRUE on success
     * @access  public
@@ -575,7 +602,7 @@ class memcached
    /**
     * Sets the compression threshold
     *
-    * @param   interger $thresh  Threshold to compress if larger than
+    * @param   integer  $thresh  Threshold to compress if larger than
     *
     * @access  public
     */
@@ -619,7 +646,7 @@ class memcached
       $this->_active = count($list);
       $this->_buckets = null;
       $this->_bucketcount = 0;
-      
+
       $this->_single_sock = null;
       if ($this->_active == 1)
          $this->_single_sock = $this->_servers[0];
@@ -630,7 +657,7 @@ class memcached
     *
     * @param   integer  $seconds Number of seconds
     * @param   integer  $microseconds  Number of microseconds
-    * 
+    *
     * @access  public
     */
    function set_timeout ($seconds, $microseconds)
@@ -638,7 +665,7 @@ class memcached
       $this->_timeout_seconds = $seconds;
       $this->_timeout_microseconds = $microseconds;
    }
-   
+
    // }}}
    // }}}
    // {{{ private methods
@@ -664,24 +691,36 @@ class memcached
    /**
     * Connects $sock to $host, timing out after $timeout
     *
-    * @param   interger $sock    Socket to connect
+    * @param   integer  $sock    Socket to connect
     * @param   string   $host    Host:IP to connect to
-    * @param   float    $timeout (optional) Timeout value, defaults to 0.25s
     *
     * @return  boolean
     * @access  private
     */
-   function _connect_sock (&$sock, $host, $timeout = 0.25)
+   function _connect_sock (&$sock, $host)
    {
       list ($ip, $port) = explode(":", $host);
-      if ($this->_persistant == 1)
-      {
-         $sock = @pfsockopen($ip, $port, $errno, $errstr, $timeout);
-      } else
-      {
-         $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout);
+      $sock = false;
+      $timeout = $this->_connect_timeout;
+      $errno = $errstr = null;
+      for ($i = 0; !$sock && $i < $this->_connect_attempts; $i++) {
+         if ($i > 0) {
+            # Sleep until the timeout, in case it failed fast
+            $elapsed = microtime(true) - $t;
+            if ( $elapsed < $timeout ) {
+               usleep(($timeout - $elapsed) * 1e6);
+            }
+            $timeout *= 2;
+         }
+         $t = microtime(true);
+         if ($this->_persistant == 1)
+         {
+            $sock = @pfsockopen($ip, $port, $errno, $errstr, $timeout);
+         } else
+         {
+            $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout);
+         }
       }
-      
       if (!$sock) {
          if ($this->_debug)
             $this->_debugprint( "Error connecting to $host: $errstr\n" );
@@ -707,7 +746,7 @@ class memcached
    function _dead_sock ($sock)
    {
       $host = array_search($sock, $this->_cache_sock);
-      @list ($ip, $port) = explode(":", $host);
+      @list ($ip, /* $port */) = explode(":", $host);
       $this->_host_dead[$ip] = time() + 30 + intval(rand(0, 10));
       $this->_host_dead[$host] = $this->_host_dead[$ip];
       unset($this->_cache_sock[$host]);
@@ -729,11 +768,13 @@ class memcached
       if (!$this->_active)
          return false;
 
-      if ($this->_single_sock !== null)
+      if ($this->_single_sock !== null) {
+         $this->_flush_read_buffer($this->_single_sock);
          return $this->sock_to_host($this->_single_sock);
-      
+      }
+
       $hv = is_array($key) ? intval($key[0]) : $this->_hashfunc($key);
-      
+
       if ($this->_buckets === null)
       {
          foreach ($this->_servers as $v)
@@ -750,17 +791,19 @@ class memcached
          $this->_buckets = $bu;
          $this->_bucketcount = count($bu);
       }
-      
+
       $realkey = is_array($key) ? $key[1] : $key;
       for ($tries = 0; $tries<20; $tries++)
       {
          $host = $this->_buckets[$hv % $this->_bucketcount];
          $sock = $this->sock_to_host($host);
-         if (is_resource($sock))
+         if (is_resource($sock)) {
+            $this->_flush_read_buffer($sock);
             return $sock;
-         $hv += $this->_hashfunc($tries . $realkey);
+                }
+         $hv = $this->_hashfunc( $hv . $realkey );
       }
-      
+
       return false;
    }
 
@@ -768,17 +811,17 @@ class memcached
    // {{{ _hashfunc()
 
    /**
-    * Creates a hash interger based on the $key
+    * Creates a hash integer  based on the $key
     *
     * @param   string   $key     Key to hash
     *
-    * @return  interger Hash value
+    * @return  integer  Hash value
     * @access  private
     */
    function _hashfunc ($key)
    {
       # Hash function must on [0,0x7ffffff]
-      # We take the first 31 bits of the MD5 hash, which unlike the hash 
+      # We take the first 31 bits of the MD5 hash, which unlike the hash
       # function used in a previous version of this client, works
       return hexdec(substr(md5($key),0,8)) & 0x7fffffff;
    }
@@ -791,27 +834,28 @@ class memcached
     *
     * @param   string   $cmd     Command to perform
     * @param   string   $key     Key to perform it on
-    * @param   interger $amt     Amount to adjust
+    * @param   integer  $amt     Amount to adjust
     *
-    * @return  interger    New value of $key
+    * @return  integer     New value of $key
     * @access  private
     */
    function _incrdecr ($cmd, $key, $amt=1)
    {
       if (!$this->_active)
          return null;
-         
+
       $sock = $this->get_sock($key);
       if (!is_resource($sock))
          return null;
-         
+
       $key = is_array($key) ? $key[1] : $key;
       @$this->stats[$cmd]++;
-      if (!fwrite($sock, "$cmd $key $amt\r\n"))
+      if (!$this->_safe_fwrite($sock, "$cmd $key $amt\r\n"))
          return $this->_dead_sock($sock);
-         
+
       stream_set_timeout($sock, 1, 0);
       $line = fgets($sock);
+      $match = array();
       if (!preg_match('/^(\d+)/', $line, $match))
          return null;
       return $match[1];
@@ -841,7 +885,7 @@ class memcached
             list($rkey, $flags, $len) = array($match[1], $match[2], $match[3]);
             $bneed = $len+2;
             $offset = 0;
-            
+
             while ($bneed > 0)
             {
                $data = fread($sock, $bneed);
@@ -852,7 +896,7 @@ class memcached
                $bneed -= $n;
                @$ret[$rkey] .= $data;
             }
-            
+
             if ($offset != $len+2)
             {
                // Something is borked!
@@ -863,16 +907,16 @@ class memcached
                $this->_close_sock($sock);
                return false;
             }
-            
-            if ($this->_have_zlib && $flags & MEMCACHE_COMPRESSED)
+
+            if ($this->_have_zlib && $flags & memcached::COMPRESSED)
                $ret[$rkey] = gzuncompress($ret[$rkey]);
 
             $ret[$rkey] = rtrim($ret[$rkey]);
 
-            if ($flags & MEMCACHE_SERIALIZED)
+            if ($flags & memcached::SERIALIZED)
                $ret[$rkey] = unserialize($ret[$rkey]);
 
-         } else 
+         } else
          {
             $this->_debugprint("Error parsing memcached response\n");
             return 0;
@@ -889,7 +933,7 @@ class memcached
     * @param   string   $cmd     Command to perform
     * @param   string   $key     Key to act on
     * @param   mixed    $val     What we need to store
-    * @param   interger $exp     When it should expire
+    * @param   integer  $exp     When it should expire
     *
     * @return  boolean
     * @access  private
@@ -898,50 +942,48 @@ class memcached
    {
       if (!$this->_active)
          return false;
-         
+
       $sock = $this->get_sock($key);
       if (!is_resource($sock))
          return false;
-         
+
       @$this->stats[$cmd]++;
-      
+
       $flags = 0;
-      
+
       if (!is_scalar($val))
       {
          $val = serialize($val);
-         $flags |= MEMCACHE_SERIALIZED;
+         $flags |= memcached::SERIALIZED;
          if ($this->_debug)
             $this->_debugprint(sprintf("client: serializing data as it is not scalar\n"));
       }
-      
+
       $len = strlen($val);
-      
-      if ($this->_have_zlib && $this->_compress_enable && 
+
+      if ($this->_have_zlib && $this->_compress_enable &&
           $this->_compress_threshold && $len >= $this->_compress_threshold)
       {
          $c_val = gzcompress($val, 9);
          $c_len = strlen($c_val);
-         
-         if ($c_len < $len*(1 - COMPRESSION_SAVINGS))
+
+         if ($c_len < $len*(1 - memcached::COMPRESSION_SAVINGS))
          {
             if ($this->_debug)
                $this->_debugprint(sprintf("client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len));
             $val = $c_val;
             $len = $c_len;
-            $flags |= MEMCACHE_COMPRESSED;
+            $flags |= memcached::COMPRESSED;
          }
       }
-      if (!fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
+      if (!$this->_safe_fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
          return $this->_dead_sock($sock);
-         
+
       $line = trim(fgets($sock));
-      
+
       if ($this->_debug)
       {
-         if ($flags & MEMCACHE_COMPRESSED)
-            $val = 'compressed data';
-         $this->_debugprint(sprintf("MemCache: %s %s => %s (%s)\n", $cmd, $key, $val, $line));
+         $this->_debugprint(sprintf("%s %s (%s)\n", $cmd, $key, $line));
       }
       if ($line == "STORED")
          return true;
@@ -963,21 +1005,22 @@ class memcached
    {
       if (isset($this->_cache_sock[$host]))
          return $this->_cache_sock[$host];
-      
+
+      $sock = null;
       $now = time();
-      list ($ip, $port) = explode (":", $host);
+      list ($ip, /* $port */) = explode (":", $host);
       if (isset($this->_host_dead[$host]) && $this->_host_dead[$host] > $now ||
           isset($this->_host_dead[$ip]) && $this->_host_dead[$ip] > $now)
          return null;
-         
+
       if (!$this->_connect_sock($sock, $host))
          return $this->_dead_sock($host);
-         
+
       // Do not buffer writes
       stream_set_write_buffer($sock, 0);
-      
+
       $this->_cache_sock[$host] = $sock;
-      
+
       return $this->_cache_sock[$host];
    }
 
@@ -985,10 +1028,62 @@ class memcached
       print($str);
    }
 
+   /**
+    * Write to a stream, timing out after the correct amount of time
+    *
+    * @return bool false on failure, true on success
+    */
+    /*
+   function _safe_fwrite($f, $buf, $len = false) {
+      stream_set_blocking($f, 0);
+
+      if ($len === false) {
+         wfDebug("Writing " . strlen( $buf ) . " bytes\n");
+         $bytesWritten = fwrite($f, $buf);
+      } else {
+         wfDebug("Writing $len bytes\n");
+         $bytesWritten = fwrite($f, $buf, $len);
+      }
+      $n = stream_select($r=NULL, $w = array($f), $e = NULL, 10, 0);
+      #   $this->_timeout_seconds, $this->_timeout_microseconds);
+
+      wfDebug("stream_select returned $n\n");
+      stream_set_blocking($f, 1);
+      return $n == 1;
+      return $bytesWritten;
+   }*/
+
+   /**
+    * Original behaviour
+    */
+   function _safe_fwrite($f, $buf, $len = false) {
+      if ($len === false) {
+         $bytesWritten = fwrite($f, $buf);
+      } else {
+         $bytesWritten = fwrite($f, $buf, $len);
+      }
+      return $bytesWritten;
+   }
+
+   /**
+    * Flush the read buffer of a stream
+    */
+   function _flush_read_buffer($f) {
+      if (!is_resource($f)) {
+         return;
+      }
+      $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+      while ($n == 1 && !feof($f)) {
+         fread($f, 1024);
+         $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+      }
+   }
+
    // }}}
    // }}}
    // }}}
 }
 
+// vim: sts=3 sw=3 et
+
 // }}}
-?>