From af1b39c4e0a705a98e571e3b8593a7c38bfb8951 Mon Sep 17 00:00:00 2001 From: Aaron Date: Wed, 20 Jun 2012 14:09:22 -0700 Subject: [PATCH] [LockManager] Added a memcached lock manager class. Change-Id: Ie99a7f1335bb5cceb0cb1c33a40094b3a22101ea --- includes/AutoLoader.php | 1 + .../backend/lockmanager/LockManager.php | 13 +- .../backend/lockmanager/MemcLockManager.php | 306 ++++++++++++++++++ 3 files changed, 315 insertions(+), 5 deletions(-) create mode 100644 includes/filerepo/backend/lockmanager/MemcLockManager.php diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 2aa5eee48f..d7927de7f5 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -572,6 +572,7 @@ $wgAutoloadLocalClasses = array( 'FSLockManager' => 'includes/filerepo/backend/lockmanager/FSLockManager.php', 'DBLockManager' => 'includes/filerepo/backend/lockmanager/DBLockManager.php', 'LSLockManager' => 'includes/filerepo/backend/lockmanager/LSLockManager.php', + 'MemcLockManager' => 'includes/filerepo/backend/lockmanager/MemcLockManager.php', 'QuorumLockManager' => 'includes/filerepo/backend/lockmanager/LockManager.php', 'MySqlLockManager'=> 'includes/filerepo/backend/lockmanager/DBLockManager.php', 'NullLockManager' => 'includes/filerepo/backend/lockmanager/LockManager.php', diff --git a/includes/filerepo/backend/lockmanager/LockManager.php b/includes/filerepo/backend/lockmanager/LockManager.php index 0fd3fb6622..07853f8757 100644 --- a/includes/filerepo/backend/lockmanager/LockManager.php +++ b/includes/filerepo/backend/lockmanager/LockManager.php @@ -268,12 +268,11 @@ abstract class QuorumLockManager extends LockManager { } } - // Remove these single locks if the medium supports it + // Remove these specific locks if possible, or at least release + // all locks once this process is currently not holding any locks. foreach ( $pathsToUnlock as $bucket => $paths ) { $status->merge( $this->doUnlockingRequestBucket( $bucket, $paths, $type ) ); } - - // Reference count the locks held and release locks when zero if ( !count( $this->locksHeld ) ) { $status->merge( $this->releaseAllLocks() ); } @@ -378,7 +377,9 @@ abstract class QuorumLockManager extends LockManager { abstract protected function getLocksOnServer( $lockSrv, array $paths, $type ); /** - * Get a connection to a lock server and release locks on $paths + * Get a connection to a lock server and release locks on $paths. + * + * Subclasses must effectively implement this or releaseAllLocks(). * * @param $lockSrv string * @param $paths array @@ -388,7 +389,9 @@ abstract class QuorumLockManager extends LockManager { abstract protected function freeLocksOnServer( $lockSrv, array $paths, $type ); /** - * Release all locks that this session is holding + * Release all locks that this session is holding. + * + * Subclasses must effectively implement this or freeLocksOnServer(). * * @return Status */ diff --git a/includes/filerepo/backend/lockmanager/MemcLockManager.php b/includes/filerepo/backend/lockmanager/MemcLockManager.php new file mode 100644 index 0000000000..add1f2c9e6 --- /dev/null +++ b/includes/filerepo/backend/lockmanager/MemcLockManager.php @@ -0,0 +1,306 @@ + self::LOCK_SH, + self::LOCK_UW => self::LOCK_SH, + self::LOCK_EX => self::LOCK_EX + ); + + /** @var Array Map server names to MemcachedBagOStuff objects */ + protected $bagOStuffs = array(); + /** @var Array */ + protected $serversUp = array(); // (server name => bool) + + protected $lockExpiry; // integer; maximum time locks can be held + protected $session = ''; // string; random SHA-1 UUID + protected $wikiId = ''; // string + + /** + * Construct a new instance from configuration. + * + * $config paramaters include: + * 'lockServers' : Associative array of server names to : strings. + * 'srvsByBucket' : Array of 1-16 consecutive integer keys, starting from 0, + * each having an odd-numbered list of server names (peers) as values. + * 'memcConfig' : Configuration array for ObjectCache::newFromParams. [optional] + * If set, this must use one of the memcached classes. + * 'wikiId' : Wiki ID string that all resources are relative to. [optional] + * + * @param Array $config + */ + public function __construct( array $config ) { + parent::__construct( $config ); + + // Sanitize srvsByBucket config to prevent PHP errors + $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' ); + $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive + + $memcConfig = isset( $config['memcConfig'] ) + ? $config['memcConfig'] + : array( 'class' => 'MemcachedPhpBagOStuff' ); + + foreach ( $config['lockServers'] as $name => $address ) { + $params = array( 'servers' => array( $address ) ) + $memcConfig; + $cache = ObjectCache::newFromParams( $params ); + if ( $cache instanceof MemcachedBagOStuff ) { + $this->bagOStuffs[$name] = $cache; + } else { + throw new MWException( + 'Only MemcachedBagOStuff classes are supported by MemcLockManager.' ); + } + } + + $this->wikiId = isset( $config['wikiId'] ) ? $config['wikiId'] : wfWikiID(); + + $met = ini_get( 'max_execution_time' ); // this is 0 in CLI mode + $this->lockExpiry = $met ? 2*(int)$met : 2*3600; + + $this->session = wfRandomString( 31 ); + } + + /** + * @see QuorumLockManager::getLocksOnServer() + * @return Status + */ + protected function getLocksOnServer( $lockSrv, array $paths, $type ) { + $status = Status::newGood(); + + $memc = $this->getCache( $lockSrv ); + $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records + + // Lock all of the active lock record keys... + if ( !$this->acquireMutexes( $memc, $keys ) ) { + foreach ( $paths as $path ) { + $status->fatal( 'lockmanager-fail-acquirelock', $path ); + } + return; + } + + // Fetch all the existing lock records... + $lockRecords = $memc->getMulti( $keys ); + + $now = time(); + // Check if the requested locks conflict with existing ones... + foreach ( $paths as $path ) { + $locksKey = $this->recordKeyForPath( $path ); + $locksHeld = isset( $lockRecords[$locksKey] ) + ? $lockRecords[$locksKey] + : array( self::LOCK_SH => array(), self::LOCK_EX => array() ); // init + foreach ( $locksHeld[self::LOCK_EX] as $session => $expiry ) { + if ( $expiry < $now ) { // stale? + unset( $locksHeld[self::LOCK_EX][$session] ); + } elseif ( $session !== $this->session ) { + $status->fatal( 'lockmanager-fail-acquirelock', $path ); + } + } + if ( $type === self::LOCK_EX ) { + foreach ( $locksHeld[self::LOCK_SH] as $session => $expiry ) { + if ( $expiry < $now ) { // stale? + unset( $locksHeld[self::LOCK_SH][$session] ); + } elseif ( $session !== $this->session ) { + $status->fatal( 'lockmanager-fail-acquirelock', $path ); + } + } + } + if ( $status->isOK() ) { + // Register the session in the lock record array + $locksHeld[$type][$this->session] = $now + $this->lockExpiry; + // We will update this record if none of the other locks conflict + $lockRecords[$locksKey] = $locksHeld; + } + } + + // If there were no lock conflicts, update all the lock records... + if ( $status->isOK() ) { + foreach ( $lockRecords as $locksKey => $locksHeld ) { + $memc->set( $locksKey, $locksHeld ); + wfDebug( __METHOD__ . ": acquired lock on key $locksKey.\n" ); + } + } + + // Unlock all of the active lock record keys... + $this->releaseMutexes( $memc, $keys ); + + return $status; + } + + /** + * @see QuorumLockManager::freeLocksOnServer() + * @return Status + */ + protected function freeLocksOnServer( $lockSrv, array $paths, $type ) { + $status = Status::newGood(); + + $memc = $this->getCache( $lockSrv ); + $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records + + // Lock all of the active lock record keys... + if ( !$this->acquireMutexes( $memc, $keys ) ) { + foreach ( $paths as $path ) { + $status->fatal( 'lockmanager-fail-releaselock', $path ); + } + return; + } + + // Fetch all the existing lock records... + $lockRecords = $memc->getMulti( $keys ); + + // Remove the requested locks from all records... + foreach ( $paths as $path ) { + $locksKey = $this->recordKeyForPath( $path ); // lock record + if ( !isset( $lockRecords[$locksKey] ) ) { + continue; // nothing to do + } + $locksHeld = $lockRecords[$locksKey]; + if ( is_array( $locksHeld ) && isset( $locksHeld[$type] ) ) { + unset( $locksHeld[$type][$this->session] ); + $ok = $memc->set( $locksKey, $locksHeld ); + } else { + $ok = true; + } + if ( !$ok ) { + $status->fatal( 'lockmanager-fail-releaselock', $path ); + } + wfDebug( __METHOD__ . ": released lock on key $locksKey.\n" ); + } + + // Unlock all of the active lock record keys... + $this->releaseMutexes( $memc, $keys ); + + return $status; + } + + /** + * @see QuorumLockManager::releaseAllLocks() + * @return Status + */ + protected function releaseAllLocks() { + return Status::newGood(); // not supported + } + + /** + * @see QuorumLockManager::isServerUp() + * @return bool + */ + protected function isServerUp( $lockSrv ) { + return (bool)$this->getCache( $lockSrv ); + } + + /** + * Get the MemcachedBagOStuff object for a $lockSrv + * + * @param $lockSrv string Server name + * @return MemcachedBagOStuff|null + */ + protected function getCache( $lockSrv ) { + $memc = null; + if ( isset( $this->bagOStuffs[$lockSrv] ) ) { + $memc = $this->bagOStuffs[$lockSrv]; + if ( !isset( $this->serversUp[$lockSrv] ) ) { + $this->serversUp[$lockSrv] = $memc->set( 'MemcLockManager:ping', 1, 1 ); + if ( !$this->serversUp[$lockSrv] ) { + trigger_error( __METHOD__ . ": Could not contact $lockSrv.", E_USER_WARNING ); + } + } + if ( !$this->serversUp[$lockSrv] ) { + return null; // server appears to be down + } + } + return $memc; + } + + /** + * @param $path string + * @return string + */ + protected function recordKeyForPath( $path ) { + $hash = LockManager::sha1Base36( $path ); + list( $db, $prefix ) = wfSplitWikiID( $this->wikiId ); + return wfForeignMemcKey( $db, $prefix, __CLASS__, 'locks', $hash ); + } + + /** + * @param $memc MemcachedBagOStuff + * @param $keys Array List of keys to acquire + * @return bool + */ + protected function acquireMutexes( MemcachedBagOStuff $memc, array $keys ) { + $lockedKeys = array(); + + $start = microtime( true ); + do { + foreach ( array_diff( $keys, $lockedKeys ) as $key ) { + if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record + $lockedKeys[] = $key; + } + } + } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 6 ); + + if ( count( $lockedKeys ) != count( $keys ) ) { + $this->releaseMutexes( $lockedKeys ); // failed; release what was locked + return false; + } + + return true; + } + + /** + * @param $memc MemcachedBagOStuff + * @param $keys Array List of acquired keys + * @return void + */ + protected function releaseMutexes( MemcachedBagOStuff $memc, array $keys ) { + foreach ( $keys as $key ) { + $memc->delete( "$key:mutex" ); + } + } + + /** + * Make sure remaining locks get cleared for sanity + */ + function __destruct() { + while ( count( $this->locksHeld ) ) { + foreach ( $this->locksHeld as $path => $locks ) { + $this->doUnlock( array( $path ), self::LOCK_EX ); + $this->doUnlock( array( $path ), self::LOCK_SH ); + } + } + } +} -- 2.20.1