Added merge() function to BagOStuff for CAS-like functionality.
authorMatthias Mullie <git@mullie.eu>
Mon, 1 Oct 2012 14:05:22 +0000 (16:05 +0200)
committerMatthias Mullie <git@mullie.eu>
Thu, 10 Jan 2013 08:03:09 +0000 (09:03 +0100)
* merge() will use CAS if supported or use locking otherwise
* The lock()/unlock() methods now have a default implementation
* added unit tests for merge

Change-Id: Ic27088488f8532f149cb4b36e156516f22880134

16 files changed:
includes/objectcache/APCBagOStuff.php
includes/objectcache/BagOStuff.php
includes/objectcache/DBABagOStuff.php
includes/objectcache/EhcacheBagOStuff.php
includes/objectcache/EmptyBagOStuff.php
includes/objectcache/HashBagOStuff.php
includes/objectcache/MemcachedBagOStuff.php
includes/objectcache/MemcachedClient.php
includes/objectcache/MemcachedPeclBagOStuff.php
includes/objectcache/MultiWriteBagOStuff.php
includes/objectcache/RedisBagOStuff.php
includes/objectcache/SqlBagOStuff.php
includes/objectcache/WinCacheBagOStuff.php
includes/objectcache/XCacheBagOStuff.php
tests/phpunit/MediaWikiPHPUnitCommand.php
tests/phpunit/includes/objectcache/BagOStuffTest.php [new file with mode: 0644]

index 1a0de21..cc306e8 100644 (file)
 class APCBagOStuff extends BagOStuff {
        /**
         * @param $key string
+        * @param $casToken[optional] int
         * @return mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                $val = apc_fetch( $key );
 
+               $casToken = $val;
+
                if ( is_string( $val ) ) {
                        if ( $this->isInteger( $val ) ) {
                                $val = intval( $val );
@@ -61,6 +64,18 @@ class APCBagOStuff extends BagOStuff {
                return true;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               // APC's CAS functions only work on integers
+               throw new MWException( "CAS is not implemented in " . __CLASS__ );
+       }
+
        /**
         * @param $key string
         * @param $time int
@@ -72,6 +87,17 @@ class APCBagOStuff extends BagOStuff {
                return true;
        }
 
+       /**
+        * @param $key string
+        * @param $callback closure Callback method to be executed
+        * @param $exptime int Either an interval in seconds or a unix timestamp for expiry
+        * @param $attempts int The amount of times to attempt a merge in case of failure
+        * @return bool success
+        */
+       public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) {
+               return $this->mergeViaLock( $key, $callback, $exptime, $attempts );
+       }
+
        public function incr( $key, $value = 1 ) {
                return apc_inc( $key, $value );
        }
index 7bbaff9..32afe13 100644 (file)
@@ -56,9 +56,10 @@ abstract class BagOStuff {
        /**
         * Get an item with the given key. Returns false if it does not exist.
         * @param $key string
+        * @param $casToken[optional] mixed
         * @return mixed Returns false on failure
         */
-       abstract public function get( $key );
+       abstract public function get( $key, &$casToken = null );
 
        /**
         * Set an item.
@@ -69,6 +70,16 @@ abstract class BagOStuff {
         */
        abstract public function set( $key, $value, $exptime = 0 );
 
+       /**
+        * Check and set an item.
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int Either an interval in seconds or a unix timestamp for expiry
+        * @return bool success
+        */
+       abstract public function cas( $casToken, $key, $value, $exptime = 0 );
+
        /**
         * Delete an item.
         * @param $key string
@@ -78,13 +89,105 @@ abstract class BagOStuff {
        abstract public function delete( $key, $time = 0 );
 
        /**
+        * Merge changes into the existing cache value (possibly creating a new one)
+        *
         * @param $key string
-        * @param $timeout integer
+        * @param $callback closure Callback method to be executed
+        * @param $exptime int Either an interval in seconds or a unix timestamp for expiry
+        * @param $attempts int The amount of times to attempt a merge in case of failure
         * @return bool success
         */
-       public function lock( $key, $timeout = 0 ) {
-               /* stub */
-               return true;
+       public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) {
+               return $this->mergeViaCas( $key, $callback, $exptime, $attempts );
+       }
+
+       /**
+        * @see BagOStuff::merge()
+        *
+        * @param $key string
+        * @param $callback closure Callback method to be executed
+        * @param $exptime int Either an interval in seconds or a unix timestamp for expiry
+        * @param $attempts int The amount of times to attempt a merge in case of failure
+        * @return bool success
+        */
+       protected function mergeViaCas( $key, closure $callback, $exptime = 0, $attempts = 10 ) {
+               do {
+                       $casToken = null; // passed by reference
+                       $currentValue = $this->get( $key, $casToken ); // get the old value
+                       $value = $callback( $this, $key, $currentValue ); // derive the new value
+
+                       if ( $value === false ) {
+                               $success = true; // do nothing
+                       } elseif ( $currentValue === false ) {
+                               // Try to create the key, failing if it gets created in the meantime
+                               $success = $this->add( $key, $value, $exptime );
+                       } else {
+                               // Try to update the key, failing if it gets changed in the meantime
+                               $success = $this->cas( $casToken, $key, $value, $exptime );
+                       }
+               } while ( !$success && --$attempts );
+
+               return $success;
+       }
+
+       /**
+        * @see BagOStuff::merge()
+        *
+        * @param $key string
+        * @param $callback closure Callback method to be executed
+        * @param $exptime int Either an interval in seconds or a unix timestamp for expiry
+        * @param $attempts int The amount of times to attempt a merge in case of failure
+        * @return bool success
+        */
+       protected function mergeViaLock( $key, closure $callback, $exptime = 0, $attempts = 10 ) {
+               if ( !$this->lock( $key, 60 ) ) {
+                       return false;
+               }
+
+               $currentValue = $this->get( $key ); // get the old value
+               $value = $callback( $this, $key, $currentValue ); // derive the new value
+
+               if ( $value === false ) {
+                       $success = true; // do nothing
+               } else {
+                       $success = $this->set( $key, $value, $exptime ); // set the new value
+               }
+
+               if ( !$this->unlock( $key ) ) {
+                       // this should never happen
+                       trigger_error( "Could not release lock for key '$key'." );
+               }
+
+               return $success;
+       }
+
+       /**
+        * @param $key string
+        * @param $timeout integer [optional]
+        * @return bool success
+        */
+       public function lock( $key, $timeout = 60 ) {
+               $timestamp = microtime( true ); // starting UNIX timestamp
+               if ( $this->add( "{$key}:lock", $timeout ) ) {
+                       return true;
+               }
+
+               $uRTT  = ceil( 1e6 * ( microtime( true ) - $timestamp ) ); // estimate RTT (us)
+               $sleep = 2*$uRTT; // rough time to do get()+set()
+
+               $locked   = false; // lock acquired
+               $attempts = 0; // failed attempts
+               do {
+                       if ( ++$attempts >= 3 && $sleep <= 1e6 ) {
+                               // Exponentially back off after failed attempts to avoid network spam.
+                               // About 2*$uRTT*(2^n-1) us of "sleep" happen for the next n attempts.
+                               $sleep *= 2;
+                       }
+                       usleep( $sleep ); // back off
+                       $locked = $this->add( "{$key}:lock", $timeout );
+               } while( !$locked );
+
+               return $locked;
        }
 
        /**
@@ -92,8 +195,7 @@ abstract class BagOStuff {
         * @return bool success
         */
        public function unlock( $key ) {
-               /* stub */
-               return true;
+               return $this->delete( "{$key}:lock" );
        }
 
        /**
index 36ced49..ee2500d 100644 (file)
@@ -111,9 +111,10 @@ class DBABagOStuff extends BagOStuff {
 
        /**
         * @param $key string
+        * @param $casToken[optional] mixed
         * @return mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                wfProfileIn( __METHOD__ );
                wfDebug( __METHOD__ . "($key)\n" );
 
@@ -138,7 +139,10 @@ class DBABagOStuff extends BagOStuff {
                        $val = false;
                }
 
+               $casToken = $val;
+
                wfProfileOut( __METHOD__ );
+
                return $val;
        }
 
@@ -167,6 +171,41 @@ class DBABagOStuff extends BagOStuff {
                return $ret;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               wfProfileIn( __METHOD__ );
+               wfDebug( __METHOD__ . "($key)\n" );
+
+               $blob = $this->encode( $value, $exptime );
+
+               $handle = $this->getWriter();
+               if ( !$handle ) {
+                       wfProfileOut( __METHOD__ );
+                       return false;
+               }
+
+               // DBA is locked to any other write connection, so we can safely
+               // compare the current & previous value before saving new value
+               $val = dba_fetch( $key, $handle );
+               list( $val, $exptime ) = $this->decode( $val );
+               if ( $casToken !== $val ) {
+                       dba_close( $handle );
+                       return false;
+               }
+
+               $ret = dba_replace( $key, $blob, $handle );
+               dba_close( $handle );
+
+               wfProfileOut( __METHOD__ );
+               return $ret;
+       }
+
        /**
         * @param $key string
         * @param $time int
index 60d0645..b8a1bc2 100644 (file)
@@ -65,9 +65,10 @@ class EhcacheBagOStuff extends BagOStuff {
 
        /**
         * @param $key string
+        * @param $casToken[optional] mixed
         * @return bool|mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                wfProfileIn( __METHOD__ );
                $response = $this->doItemRequest( $key );
                if ( !$response || $response['http_code'] == 404 ) {
@@ -97,6 +98,8 @@ class EhcacheBagOStuff extends BagOStuff {
                        return false;
                }
 
+               $casToken = $body;
+
                wfProfileOut( __METHOD__ );
                return $data;
        }
@@ -144,6 +147,20 @@ class EhcacheBagOStuff extends BagOStuff {
                return $result;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               // Not sure if we can implement CAS for ehcache. There appears to be CAS-support per
+               // http://ehcache.org/documentation/get-started/consistency-options#cas-cache-operations,
+               // but I can't find any docs for our current implementation.
+               throw new MWException( "CAS is not implemented in " . __CLASS__ );
+       }
+
        /**
         * @param $key string
         * @param $time int
@@ -164,6 +181,14 @@ class EhcacheBagOStuff extends BagOStuff {
                return $result;
        }
 
+       /**
+        * @see BagOStuff::merge()
+        * @return bool success
+        */
+       public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) {
+               return $this->mergeViaLock( $key, $callback, $exptime, $attempts );
+       }
+
        /**
         * @param $key string
         * @return string
index bd28b24..cc57ff1 100644 (file)
@@ -30,9 +30,10 @@ class EmptyBagOStuff extends BagOStuff {
 
        /**
         * @param $key string
+        * @param $casToken[optional] mixed
         * @return bool
         */
-       function get( $key ) {
+       function get( $key, &$casToken = null ) {
                return false;
        }
 
@@ -46,6 +47,17 @@ class EmptyBagOStuff extends BagOStuff {
                return true;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exp int
+        * @return bool
+        */
+       function cas( $casToken, $key, $value, $exp = 0 ) {
+               return true;
+       }
+
        /**
         * @param $key string
         * @param $time int
@@ -54,6 +66,17 @@ class EmptyBagOStuff extends BagOStuff {
        function delete( $key, $time = 0 ) {
                return true;
        }
+
+       /**
+        * @param $key string
+        * @param $callback closure Callback method to be executed
+        * @param $exptime int Either an interval in seconds or a unix timestamp for expiry
+        * @param $attempts int The amount of times to attempt a merge in case of failure
+        * @return bool success
+        */
+       public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) {
+               return true;
+       }
 }
 
 /**
index 799f26a..ce0e4b6 100644 (file)
@@ -52,9 +52,10 @@ class HashBagOStuff extends BagOStuff {
 
        /**
         * @param $key string
+        * @param $casToken[optional] mixed
         * @return bool|mixed
         */
-       function get( $key ) {
+       function get( $key, &$casToken = null ) {
                if ( !isset( $this->bag[$key] ) ) {
                        return false;
                }
@@ -63,6 +64,8 @@ class HashBagOStuff extends BagOStuff {
                        return false;
                }
 
+               $casToken = $this->bag[$key][0];
+
                return $this->bag[$key][0];
        }
 
@@ -77,6 +80,21 @@ class HashBagOStuff extends BagOStuff {
                return true;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int
+        * @return bool
+        */
+       function cas( $casToken, $key, $value, $exptime = 0 ) {
+               if ( $this->get( $key ) === $casToken ) {
+                       return $this->set( $key, $value, $exptime );
+               }
+
+               return false;
+       }
+
        /**
         * @param $key string
         * @param $time int
index 643d2e9..922c8e6 100644 (file)
@@ -57,10 +57,11 @@ class MemcachedBagOStuff extends BagOStuff {
 
        /**
         * @param $key string
+        * @param $casToken[optional] mixed
         * @return Mixed
         */
-       public function get( $key ) {
-               return $this->client->get( $this->encodeKey( $key ) );
+       public function get( $key, &$casToken = null ) {
+               return $this->client->get( $this->encodeKey( $key ), $casToken );
        }
 
        /**
@@ -74,6 +75,18 @@ class MemcachedBagOStuff extends BagOStuff {
                        $this->fixExpiry( $exptime ) );
        }
 
+       /**
+        * @param $key string
+        * @param $casToken mixed
+        * @param $value
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               return $this->client->cas( $casToken, $this->encodeKey( $key ),
+                       $value, $this->fixExpiry( $exptime ) );
+       }
+
        /**
         * @param $key string
         * @param $time int
index 787a168..1ab59c6 100644 (file)
@@ -408,10 +408,11 @@ class MWMemcached {
         * Retrieves the value associated with the key from the memcache server
         *
         * @param $key array|string key to retrieve
+        * @param $casToken[optional] Float
         *
         * @return Mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                wfProfileIn( __METHOD__ );
 
                if ( $this->_debug ) {
@@ -437,14 +438,14 @@ class MWMemcached {
                        $this->stats['get'] = 1;
                }
 
-               $cmd = "get $key\r\n";
+               $cmd = "gets $key\r\n";
                if ( !$this->_fwrite( $sock, $cmd ) ) {
                        wfProfileOut( __METHOD__ );
                        return false;
                }
 
                $val = array();
-               $this->_load_items( $sock, $val );
+               $this->_load_items( $sock, $val, $casToken );
 
                if ( $this->_debug ) {
                        foreach ( $val as $k => $v ) {
@@ -498,7 +499,7 @@ class MWMemcached {
                $gather = array();
                // Send out the requests
                foreach ( $socks as $sock ) {
-                       $cmd = 'get';
+                       $cmd = 'gets';
                        foreach ( $sock_keys[ intval( $sock ) ] as $key ) {
                                $cmd .= ' ' . $key;
                        }
@@ -512,7 +513,7 @@ class MWMemcached {
                // Parse responses
                $val = array();
                foreach ( $gather as $sock ) {
-                       $this->_load_items( $sock, $val );
+                       $this->_load_items( $sock, $val, $casToken );
                }
 
                if ( $this->_debug ) {
@@ -617,6 +618,28 @@ class MWMemcached {
                return $this->_set( 'set', $key, $value, $exp );
        }
 
+       // }}}
+       // {{{ cas()
+
+       /**
+        * Sets a key to a given value in the memcache if the current value still corresponds
+        * to a known, given value.  Returns true if set successfully.
+        *
+        * @param $casToken Float: current known value
+        * @param $key String: key to set value as
+        * @param $value Mixed: value to set
+        * @param $exp Integer: (optional) Expiration time. This can be a number of seconds
+        * to cache for (up to 30 days inclusive).  Any timespans of 30 days + 1 second or
+        * longer must be the timestamp of the time at which the mapping should expire. It
+        * is safe to use timestamps in all cases, regardless of exipration
+        * eg: strtotime("+3 hour")
+        *
+        * @return Boolean: TRUE on success
+        */
+       public function cas( $casToken, $key, $value, $exp = 0 ) {
+               return $this->_set( 'cas', $key, $value, $exp, $casToken );
+       }
+
        // }}}
        // {{{ set_compress_threshold()
 
@@ -879,19 +902,20 @@ class MWMemcached {
         *
         * @param $sock Resource: socket to read from
         * @param $ret Array: returned values
+        * @param $casToken[optional] Float
         * @return boolean True for success, false for failure
         *
         * @access private
         */
-       function _load_items( $sock, &$ret ) {
+       function _load_items( $sock, &$ret, &$casToken = null ) {
                while ( 1 ) {
                        $decl = $this->_fgets( $sock );
                        if( $decl === false ) {
                                return false;
                        } elseif ( $decl == "END" ) {
                                return true;
-                       } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)$/', $decl, $match ) ) {
-                               list( $rkey, $flags, $len ) = array( $match[1], $match[2], $match[3] );
+                       } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+) (\d+)$/', $decl, $match ) ) {
+                               list( $rkey, $flags, $len, $casToken ) = array( $match[1], $match[2], $match[3], $match[4] );
                                $data = $this->_fread( $sock, $len + 2 );
                                if ( $data === false ) {
                                        return false;
@@ -933,11 +957,12 @@ class MWMemcached {
         * longer must be the timestamp of the time at which the mapping should expire. It
         * is safe to use timestamps in all cases, regardless of exipration
         * eg: strtotime("+3 hour")
+        * @param $casToken[optional] Float
         *
         * @return Boolean
         * @access private
         */
-       function _set( $cmd, $key, $val, $exp ) {
+       function _set( $cmd, $key, $val, $exp, $casToken = null ) {
                if ( !$this->_active ) {
                        return false;
                }
@@ -980,7 +1005,13 @@ class MWMemcached {
                                $flags |= self::COMPRESSED;
                        }
                }
-               if ( !$this->_fwrite( $sock, "$cmd $key $flags $exp $len\r\n$val\r\n" ) ) {
+
+               $command = "$cmd $key $flags $exp $len";
+               if ( $casToken ) {
+                       $command .= " $casToken";
+               }
+
+               if ( !$this->_fwrite( $sock, "$command\r\n$val\r\n" ) ) {
                        return false;
                }
 
index 7793710..9f06fa0 100644 (file)
@@ -104,14 +104,16 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
 
        /**
         * @param $key string
+        * @param $casToken[optional] float
         * @return Mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                wfProfileIn( __METHOD__ );
                $this->debugLog( "get($key)" );
-               $value = $this->checkResult( $key, parent::get( $key ) );
+               $result = $this->client->get( $this->encodeKey( $key ), null, $casToken );
+               $result = $this->checkResult( $key, $result );
                wfProfileOut( __METHOD__ );
-               return $value;
+               return $result;
        }
 
        /**
@@ -125,6 +127,18 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
                return $this->checkResult( $key, parent::set( $key, $value, $exptime ) );
        }
 
+       /**
+        * @param $casToken float
+        * @param $key string
+        * @param $value
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               $this->debugLog( "cas($key)" );
+               return $this->checkResult( $key, parent::cas( $casToken, $key, $value, $exptime ) );
+       }
+
        /**
         * @param $key string
         * @param $time int
index 2f37c23..4120749 100644 (file)
@@ -61,9 +61,10 @@ class MultiWriteBagOStuff extends BagOStuff {
 
        /**
         * @param $key string
+        * @param $casToken[optional] mixed
         * @return bool|mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                foreach ( $this->caches as $cache ) {
                        $value = $cache->get( $key );
                        if ( $value !== false ) {
@@ -73,6 +74,17 @@ class MultiWriteBagOStuff extends BagOStuff {
                return false;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               throw new MWException( "CAS is not implemented in " . __CLASS__ );
+       }
+
        /**
         * @param $key string
         * @param $value mixed
@@ -156,6 +168,17 @@ class MultiWriteBagOStuff extends BagOStuff {
                }
        }
 
+       /**
+        * @param $key string
+        * @param $callback closure Callback method to be executed
+        * @param $exptime int Either an interval in seconds or a unix timestamp for expiry
+        * @param $attempts int The amount of times to attempt a merge in case of failure
+        * @return bool success
+        */
+       public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) {
+               return $this->doWrite( 'merge', $key, $callback, $exptime );
+       }
+
        /**
         * @param $method string
         * @return bool
index 4715859..bd5b354 100644 (file)
@@ -90,7 +90,7 @@ class RedisBagOStuff extends BagOStuff {
                }
        }
 
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                wfProfileIn( __METHOD__ );
                list( $server, $conn ) = $this->getConnection( $key );
                if ( !$conn ) {
@@ -103,6 +103,7 @@ class RedisBagOStuff extends BagOStuff {
                        $result = false;
                        $this->handleException( $server, $e );
                }
+               $casToken = $result;
                $this->logRequest( 'get', $key, $server, $result );
                wfProfileOut( __METHOD__ );
                return $result;
@@ -133,6 +134,49 @@ class RedisBagOStuff extends BagOStuff {
                return $result;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $expiry = 0 ) {
+               wfProfileIn( __METHOD__ );
+               list( $server, $conn ) = $this->getConnection( $key );
+               if ( !$conn ) {
+                       wfProfileOut( __METHOD__ );
+                       return false;
+               }
+               $expiry = $this->convertToRelative( $expiry );
+               try {
+                       $conn->watch( $key );
+
+                       if ( $this->get( $key ) !== $casToken ) {
+                               wfProfileOut( __METHOD__ );
+                               return false;
+                       }
+
+                       $conn->multi();
+
+                       if ( !$expiry ) {
+                               // No expiry, that is very different from zero expiry in Redis
+                               $conn->set( $key, $value );
+                       } else {
+                               $conn->setex( $key, $expiry, $value );
+                       }
+
+                       $result = $conn->exec();
+               } catch ( RedisException $e ) {
+                       $result = false;
+                       $this->handleException( $server, $e );
+               }
+
+               $this->logRequest( 'cas', $key, $server, $result );
+               wfProfileOut( __METHOD__ );
+               return $result;
+       }
+
        public function delete( $key, $time = 0 ) {
                wfProfileIn( __METHOD__ );
                list( $server, $conn ) = $this->getConnection( $key );
index eccfe00..80a9468 100644 (file)
@@ -193,11 +193,16 @@ class SqlBagOStuff extends BagOStuff {
 
        /**
         * @param $key string
+        * @param $casToken[optional] mixed
         * @return mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                $values = $this->getMulti( array( $key ) );
-               return array_key_exists( $key, $values ) ? $values[$key] : false;
+               if ( array_key_exists( $key, $values ) ) {
+                       $casToken = $values[$key];
+                       return $values[$key];
+               }
+               return false;
        }
 
        /**
@@ -311,6 +316,55 @@ class SqlBagOStuff extends BagOStuff {
                return true;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               $db = $this->getDB();
+               $exptime = intval( $exptime );
+
+               if ( $exptime < 0 ) {
+                       $exptime = 0;
+               }
+
+               if ( $exptime == 0 ) {
+                       $encExpiry = $this->getMaxDateTime();
+               } else {
+                       if ( $exptime < 3.16e8 ) { # ~10 years
+                               $exptime += time();
+                       }
+
+                       $encExpiry = $db->timestamp( $exptime );
+               }
+               try {
+                       $db->begin( __METHOD__ );
+                       // (bug 24425) use a replace if the db supports it instead of
+                       // delete/insert to avoid clashes with conflicting keynames
+                       $db->update(
+                               $this->getTableByKey( $key ),
+                               array(
+                                       'keyname' => $key,
+                                       'value' => $db->encodeBlob( $this->serialize( $value ) ),
+                                       'exptime' => $encExpiry
+                               ),
+                               array(
+                                       'keyname' => $key,
+                                       'value' => $db->encodeBlob( $this->serialize( $casToken ) )
+                               ), __METHOD__ );
+                       $db->commit( __METHOD__ );
+               } catch ( DBQueryError $e ) {
+                       $this->handleWriteError( $e );
+
+                       return false;
+               }
+
+               return (bool) $db->affectedRows();
+       }
+
        /**
         * @param $key string
         * @param $time int
index 21aa39e..7ced561 100644 (file)
@@ -33,11 +33,14 @@ class WinCacheBagOStuff extends BagOStuff {
         * Get a value from the WinCache object cache
         *
         * @param $key String: cache key
+        * @param $casToken[optional] int: cas token
         * @return mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                $val = wincache_ucache_get( $key );
 
+               $casToken = $val;
+
                if ( is_string( $val ) ) {
                        $val = unserialize( $val );
                }
@@ -61,6 +64,19 @@ class WinCacheBagOStuff extends BagOStuff {
                return ( is_array( $result ) && $result === array() ) || $result;
        }
 
+       /**
+        * Store a value in the WinCache object cache, race condition-safe
+        *
+        * @param $casToken int: cas token
+        * @param $key String: cache key
+        * @param $value int: object to store
+        * @param $exptime Int: expiration time
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               return wincache_ucache_cas( $key, $casToken, serialize( $value ) );
+       }
+
        /**
         * Remove a value from the WinCache object cache
         *
index 6c88289..f740ae8 100644 (file)
@@ -32,9 +32,10 @@ class XCacheBagOStuff extends BagOStuff {
         * Get a value from the XCache object cache
         *
         * @param $key String: cache key
+        * @param $casToken mixed: cas token
         * @return mixed
         */
-       public function get( $key ) {
+       public function get( $key, &$casToken = null ) {
                $val = xcache_get( $key );
 
                if ( is_string( $val ) ) {
@@ -65,6 +66,18 @@ class XCacheBagOStuff extends BagOStuff {
                return true;
        }
 
+       /**
+        * @param $casToken mixed
+        * @param $key string
+        * @param $value mixed
+        * @param $exptime int
+        * @return bool
+        */
+       public function cas( $casToken, $key, $value, $exptime = 0 ) {
+               // Can't find any documentation on xcache cas
+               throw new MWException( "CAS is not implemented in " . __CLASS__ );
+       }
+
        /**
         * Remove a value from the XCache object cache
         *
@@ -77,6 +90,21 @@ class XCacheBagOStuff extends BagOStuff {
                return true;
        }
 
+       /**
+        * Merge an item.
+        * XCache does not seem to support any way of performing CAS - this however will
+        * provide a way to perform CAS-like functionality.
+        *
+        * @param $key string
+        * @param $callback closure Callback method to be executed
+        * @param $exptime int Either an interval in seconds or a unix timestamp for expiry
+        * @param $attempts int The amount of times to attempt a merge in case of failure
+        * @return bool success
+        */
+       public function merge( $key, closure $callback, $exptime = 0, $attempts = 10 ) {
+               return $this->mergeViaLock( $key, $callback, $exptime, $attempts );
+       }
+
        public function incr( $key, $value = 1 ) {
                return xcache_inc( $key, $value );
        }
index b675000..3894435 100644 (file)
@@ -6,6 +6,7 @@ class MediaWikiPHPUnitCommand extends PHPUnit_TextUI_Command {
                'regex=' => false,
                'file=' => false,
                'use-filebackend=' => false,
+               'use-bagostuff=' => false,
                'keep-uploads' => false,
                'use-normal-tables' => false,
                'reuse-db' => false,
diff --git a/tests/phpunit/includes/objectcache/BagOStuffTest.php b/tests/phpunit/includes/objectcache/BagOStuffTest.php
new file mode 100644 (file)
index 0000000..ab3d811
--- /dev/null
@@ -0,0 +1,108 @@
+<?php
+/**
+ * This class will test BagOStuff.
+ *
+ * @author     Matthias Mullie <mmullie@wikimedia.org>
+ */
+class BagOStuffTest extends MediaWikiTestCase {
+       private $cache;
+
+       protected function setUp() {
+               parent::setUp();
+
+               // type defined through parameter
+               if ( $this->getCliArg( 'use-bagostuff=' ) ) {
+                       $name = $this->getCliArg( 'use-bagostuff=' );
+
+                       $this->cache = ObjectCache::newFromId( $name );
+
+               // no type defined - use simple hash
+               } else {
+                       $this->cache = new HashBagOStuff;
+               }
+
+               $this->cache->delete( wfMemcKey( 'test' ) );
+       }
+
+       protected function tearDown() {
+       }
+
+       public function testMerge() {
+               $key = wfMemcKey( 'test' );
+
+               $usleep = 0;
+
+               /**
+                * Callback method: append "merged" to whatever is in cache.
+                *
+                * @param BagOStuff $cache
+                * @param string $key
+                * @param int $existingValue
+                * @use int $usleep
+                * @return int
+                */
+               $callback = function( BagOStuff $cache, $key, $existingValue ) use ( &$usleep ) {
+                       // let's pretend this is an expensive callback to test concurrent merge attempts
+                       usleep( $usleep );
+
+                       if ( $existingValue === false ) {
+                               return 'merged';
+                       }
+
+                       return $existingValue . 'merged';
+               };
+
+               // merge on non-existing value
+               $merged = $this->cache->merge( $key, $callback, 0 );
+               $this->assertTrue( $merged );
+               $this->assertEquals( $this->cache->get( $key ), 'merged' );
+
+               // merge on existing value
+               $merged = $this->cache->merge( $key, $callback, 0 );
+               $this->assertTrue( $merged );
+               $this->assertEquals( $this->cache->get( $key ), 'mergedmerged' );
+
+               /*
+                * Test concurrent merges by forking this process, if:
+                * - not manually called with --use-bagostuff
+                * - pcntl_fork is supported by the system
+                * - cache type will correctly support calls over forks
+                */
+               $fork = (bool) $this->getCliArg( 'use-bagostuff=' );
+               $fork &= function_exists( 'pcntl_fork' );
+               $fork &= !$this->cache instanceof HashBagOStuff;
+               $fork &= !$this->cache instanceof EmptyBagOStuff;
+               $fork &= !$this->cache instanceof MultiWriteBagOStuff;
+               if ( $fork ) {
+                       // callback should take awhile now so that we can test concurrent merge attempts
+                       $usleep = 5000;
+
+                       $pid = pcntl_fork();
+                       if ( $pid == -1 ) {
+                               // can't fork, ignore this test...
+                       } elseif ( $pid ) {
+                               // wait a little, making sure that the child process is calling merge
+                               usleep( 3000 );
+
+                               // attempt a merge - this should fail
+                               $merged = $this->cache->merge( $key, $callback, 0, 1 );
+
+                               // merge has failed because child process was merging (and we only attempted once)
+                               $this->assertFalse( $merged );
+
+                               // make sure the child's merge is completed and verify
+                               usleep( 3000 );
+                               $this->assertEquals( $this->cache->get( $key ), 'mergedmergedmerged' );
+                       } else {
+                               $this->cache->merge( $key, $callback, 0, 1 );
+
+                               // Note: I'm not even going to check if the merge worked, I'll
+                               // compare values in the parent process to test if this merge worked.
+                               // I'm just going to exit this child process, since I don't want the
+                               // child to output any test results (would be rather confusing to
+                               // have test output twice)
+                               exit;
+                       }
+               }
+       }
+}