From: Aaron Schulz Date: Wed, 2 Apr 2014 18:08:22 +0000 (-0700) Subject: Re-organized PoolCounter files X-Git-Tag: 1.31.0-rc.0~16397 X-Git-Url: https://git.cyclocoop.org/%242?a=commitdiff_plain;h=fbf97f572cd26f3dadd323ee31036ba7a1d9d11f;p=lhc%2Fweb%2Fwiklou.git Re-organized PoolCounter files * No actual changes to the classes Change-Id: I4e35a2b097e4bc72315688d0390d3f5c4c1ca4ed --- diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index d9bb4bf613..b7d7d8c762 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -155,11 +155,11 @@ $wgAutoloadLocalClasses = array( 'PathRouter' => 'includes/PathRouter.php', 'PathRouterPatternReplacer' => 'includes/PathRouter.php', 'PhpHttpRequest' => 'includes/HttpFunctions.php', - 'PoolCounter' => 'includes/PoolCounter.php', - 'PoolCounter_Stub' => 'includes/PoolCounter.php', - 'PoolCounterRedis' => 'includes/PoolCounterRedis.php', - 'PoolCounterWork' => 'includes/PoolCounter.php', - 'PoolCounterWorkViaCallback' => 'includes/PoolCounter.php', + 'PoolCounter' => 'includes/poolcounter/PoolCounter.php', + 'PoolCounter_Stub' => 'includes/poolcounter/PoolCounter.php', + 'PoolCounterRedis' => 'includes/poolcounter/PoolCounterRedis.php', + 'PoolCounterWork' => 'includes/poolcounter/PoolCounterWork.php', + 'PoolCounterWorkViaCallback' => 'includes/poolcounter/PoolCounterWork.php', 'PoolWorkArticleView' => 'includes/WikiPage.php', 'Preferences' => 'includes/Preferences.php', 'PreferencesForm' => 'includes/Preferences.php', diff --git a/includes/PoolCounter.php b/includes/PoolCounter.php deleted file mode 100644 index 85c4c79a34..0000000000 --- a/includes/PoolCounter.php +++ /dev/null @@ -1,339 +0,0 @@ -key = $key; - $this->workers = $conf['workers']; - $this->maxqueue = $conf['maxqueue']; - $this->timeout = $conf['timeout']; - } - - /** - * Create a Pool counter. This should only be called from the PoolWorks. - * - * @param $type - * @param $key - * - * @return PoolCounter - */ - public static function factory( $type, $key ) { - global $wgPoolCounterConf; - if ( !isset( $wgPoolCounterConf[$type] ) ) { - return new PoolCounter_Stub; - } - $conf = $wgPoolCounterConf[$type]; - $class = $conf['class']; - - return new $class( $conf, $type, $key ); - } - - /** - * @return string - */ - public function getKey() { - return $this->key; - } - - /** - * I want to do this task and I need to do it myself. - * - * @return Status Value is one of Locked/Error - */ - abstract public function acquireForMe(); - - /** - * I want to do this task, but if anyone else does it - * instead, it's also fine for me. I will read its cached data. - * - * @return Status Value is one of Locked/Done/Error - */ - abstract public function acquireForAnyone(); - - /** - * I have successfully finished my task. - * Lets another one grab the lock, and returns the workers - * waiting on acquireForAnyone() - * - * @return Status value is one of Released/NotLocked/Error - */ - abstract public function release(); -} - -class PoolCounter_Stub extends PoolCounter { - public function __construct() { - /* No parameters needed */ - } - - public function acquireForMe() { - return Status::newGood( PoolCounter::LOCKED ); - } - - public function acquireForAnyone() { - return Status::newGood( PoolCounter::LOCKED ); - } - - public function release() { - return Status::newGood( PoolCounter::RELEASED ); - } -} - -/** - * Class for dealing with PoolCounters using class members - */ -abstract class PoolCounterWork { - protected $cacheable = false; //Does this override getCachedWork() ? - - /** - * @param string $type The type of PoolCounter to use - * @param string $key Key that identifies the queue this work is placed on - */ - public function __construct( $type, $key ) { - $this->poolCounter = PoolCounter::factory( $type, $key ); - } - - /** - * Actually perform the work, caching it if needed - * @return mixed work result or false - */ - abstract public function doWork(); - - /** - * Retrieve the work from cache - * @return mixed work result or false - */ - public function getCachedWork() { - return false; - } - - /** - * A work not so good (eg. expired one) but better than an error - * message. - * @return mixed work result or false - */ - public function fallback() { - return false; - } - - /** - * Do something with the error, like showing it to the user. - * @return bool - */ - public function error( $status ) { - return false; - } - - /** - * Log an error - * - * @param $status Status - * @return void - */ - public function logError( $status ) { - $key = $this->poolCounter->getKey(); - - wfDebugLog( 'poolcounter', "Pool key '$key': " - . $status->getMessage()->inLanguage( 'en' )->useDatabase( false )->text() ); - } - - /** - * Get the result of the work (whatever it is), or the result of the error() function. - * This returns the result of the first applicable method that returns a non-false value, - * where the methods are checked in the following order: - * - a) doWork() : Applies if the work is exclusive or no another process - * is doing it, and on the condition that either this process - * successfully entered the pool or the pool counter is down. - * - b) doCachedWork() : Applies if the work is cacheable and this blocked on another - * process which finished the work. - * - c) fallback() : Applies for all remaining cases. - * If these all fall through (by returning false), then the result of error() is returned. - * - * @param $skipcache bool - * @return mixed - */ - public function execute( $skipcache = false ) { - if ( $this->cacheable && !$skipcache ) { - $status = $this->poolCounter->acquireForAnyone(); - } else { - $status = $this->poolCounter->acquireForMe(); - } - - if ( !$status->isOK() ) { - // Respond gracefully to complete server breakage: just log it and do the work - $this->logError( $status ); - return $this->doWork(); - } - - switch ( $status->value ) { - case PoolCounter::LOCKED: - $result = $this->doWork(); - $this->poolCounter->release(); - return $result; - - case PoolCounter::DONE: - $result = $this->getCachedWork(); - if ( $result === false ) { - /* That someone else work didn't serve us. - * Acquire the lock for me - */ - return $this->execute( true ); - } - return $result; - - case PoolCounter::QUEUE_FULL: - case PoolCounter::TIMEOUT: - $result = $this->fallback(); - - if ( $result !== false ) { - return $result; - } - /* no break */ - - /* These two cases should never be hit... */ - case PoolCounter::ERROR: - default: - $errors = array( - PoolCounter::QUEUE_FULL => 'pool-queuefull', - PoolCounter::TIMEOUT => 'pool-timeout' ); - - $status = Status::newFatal( isset( $errors[$status->value] ) - ? $errors[$status->value] - : 'pool-errorunknown' ); - $this->logError( $status ); - return $this->error( $status ); - } - } -} - -/** - * Convenience class for dealing with PoolCounters using callbacks - * @since 1.22 - */ -class PoolCounterWorkViaCallback extends PoolCounterWork { - /** @var callable */ - protected $doWork; - /** @var callable|null */ - protected $doCachedWork; - /** @var callable|null */ - protected $fallback; - /** @var callable|null */ - protected $error; - - /** - * Build a PoolCounterWork class from a type, key, and callback map. - * - * The callback map must at least have a callback for the 'doWork' method. - * Additionally, callbacks can be provided for the 'doCachedWork', 'fallback', - * and 'error' methods. Methods without callbacks will be no-ops that return false. - * If a 'doCachedWork' callback is provided, then execute() may wait for any prior - * process in the pool to finish and reuse its cached result. - * - * @param string $type - * @param string $key - * @param array $callbacks Map of callbacks - * @throws MWException - */ - public function __construct( $type, $key, array $callbacks ) { - parent::__construct( $type, $key ); - foreach ( array( 'doWork', 'doCachedWork', 'fallback', 'error' ) as $name ) { - if ( isset( $callbacks[$name] ) ) { - if ( !is_callable( $callbacks[$name] ) ) { - throw new MWException( "Invalid callback provided for '$name' function." ); - } - $this->$name = $callbacks[$name]; - } - } - if ( !isset( $this->doWork ) ) { - throw new MWException( "No callback provided for 'doWork' function." ); - } - $this->cacheable = isset( $this->doCachedWork ); - } - - public function doWork() { - return call_user_func_array( $this->doWork, array() ); - } - - public function getCachedWork() { - if ( $this->doCachedWork ) { - return call_user_func_array( $this->doCachedWork, array() ); - } - return false; - } - - public function fallback() { - if ( $this->fallback ) { - return call_user_func_array( $this->fallback, array() ); - } - return false; - } - - public function error( $status ) { - if ( $this->error ) { - return call_user_func_array( $this->error, array( $status ) ); - } - return false; - } -} diff --git a/includes/PoolCounterRedis.php b/includes/PoolCounterRedis.php deleted file mode 100644 index 1bc10f21f4..0000000000 --- a/includes/PoolCounterRedis.php +++ /dev/null @@ -1,412 +0,0 @@ - host) map */ - protected $serversByLabel; - /** @var string SHA-1 of the key */ - protected $keySha1; - /** @var integer TTL for locks to expire (work should finish in this time) */ - protected $lockTTL; - - /** @var RedisConnRef */ - protected $conn; - /** @var string Pool slot value */ - protected $slot; - /** @var integer AWAKE_* constant */ - protected $onRelease; - /** @var string Unique string to identify this process */ - protected $session; - /** @var integer UNIX timestamp */ - protected $slotTime; - - const AWAKE_ONE = 1; // wake-up if when a slot can be taken from an existing process - const AWAKE_ALL = 2; // wake-up if an existing process finishes and wake up such others - - /** @var Array List of active PoolCounterRedis objects in this script */ - protected static $active = null; - - function __construct( $conf, $type, $key ) { - parent::__construct( $conf, $type, $key ); - - $this->serversByLabel = $conf['servers']; - $this->ring = new HashRing( array_fill_keys( array_keys( $conf['servers'] ), 100 ) ); - - $conf['redisConfig']['serializer'] = 'none'; // for use with Lua - $this->pool = RedisConnectionPool::singleton( $conf['redisConfig'] ); - - $this->keySha1 = sha1( $this->key ); - $met = ini_get( 'max_execution_time' ); // usually 0 in CLI mode - $this->lockTTL = $met ? 2*$met : 3600; - - if ( self::$active === null ) { - self::$active = array(); - register_shutdown_function( array( __CLASS__, 'releaseAll' ) ); - } - } - - /** - * @return Status Uses RediConnRef as value on success - */ - protected function getConnection() { - if ( !isset( $this->conn ) ) { - $conn = false; - $servers = $this->ring->getLocations( $this->key, 3 ); - ArrayUtils::consistentHashSort( $servers, $this->key ); - foreach ( $servers as $server ) { - $conn = $this->pool->getConnection( $this->serversByLabel[$server] ); - if ( $conn ) { - break; - } - } - if ( !$conn ) { - return Status::newFatal( 'pool-servererror', implode( ', ', $servers ) ); - } - $this->conn = $conn; - } - return Status::newGood( $this->conn ); - } - - function acquireForMe() { - $section = new ProfileSection( __METHOD__ ); - - return $this->waitForSlotOrNotif( self::AWAKE_ONE ); - } - - function acquireForAnyone() { - $section = new ProfileSection( __METHOD__ ); - - return $this->waitForSlotOrNotif( self::AWAKE_ALL ); - } - - function release() { - $section = new ProfileSection( __METHOD__ ); - - if ( $this->slot === null ) { - return Status::newGood( PoolCounter::NOT_LOCKED ); // not locked - } - - $status = $this->getConnection(); - if ( !$status->isOK() ) { - return $status; - } - $conn = $status->value; - - static $script = -<<= (1*rMaxWorkers - 1) then - -- Clear list to save space; it will re-init as needed - redis.call('del',kSlots,kSlotsNextRelease) - else - -- Add slot back to pool and update the "next release" time - redis.call('rPush',kSlots,rSlot) - redis.call('zAdd',kSlotsNextRelease,rTime + 30,rSlot) - -- Always keep renewing the expiry on use - redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) - redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) - end - end - -- Update an ephemeral list to wake up other clients that can - -- reuse any cached work from this process. Only do this if no - -- slots are currently free (e.g. clients could be waiting). - if 1*rAwakeAll == 1 then - local count = redis.call('zCard',kWaiting) - for i = 1,count do - redis.call('rPush',kWakeup,'w') - end - redis.call('pexpire',kWakeup,1) - end - return 1 -LUA; - try { - $res = $conn->luaEval( $script, - array( - $this->getSlotListKey(), - $this->getSlotRTimeSetKey(), - $this->getWakeupListKey(), - $this->getWaitSetKey(), - $this->workers, - $this->lockTTL, - $this->slot, - $this->slotTime, // used for CAS-style sanity check - ( $this->onRelease === self::AWAKE_ALL ) ? 1 : 0, - microtime( true ) - ), - 4 # number of first argument(s) that are keys - ); - } catch ( RedisException $e ) { - return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); - } - - $this->slot = null; - $this->slotTime = null; - $this->onRelease = null; - unset( self::$active[$this->session] ); - - return Status::newGood( PoolCounter::RELEASED ); - } - - /** - * @param int $doWakeup AWAKE_* constant - * @return Status - */ - protected function waitForSlotOrNotif( $doWakeup ) { - if ( $this->slot !== null ) { - return Status::newGood( PoolCounter::LOCK_HELD ); // already acquired - } - - $status = $this->getConnection(); - if ( !$status->isOK() ) { - return $status; - } - $conn = $status->value; - - $now = microtime( true ); - try { - $slot = $this->initAndPopPoolSlotList( $conn, $now ); - if ( ctype_digit( $slot ) ) { - // Pool slot acquired by this process - $slotTime = $now; - } elseif ( $slot === 'QUEUE_FULL' ) { - // Too many processes are waiting for pooled processes to finish - return Status::newGood( PoolCounter::QUEUE_FULL ); - } elseif ( $slot === 'QUEUE_WAIT' ) { - // This process is now registered as waiting - $keys = ( $doWakeup == self::AWAKE_ALL ) - // Wait for an open slot or wake-up signal (preferring the later) - ? array( $this->getWakeupListKey(), $this->getSlotListKey() ) - // Just wait for an actual pool slot - : array( $this->getSlotListKey() ); - - $res = $conn->blPop( $keys, $this->timeout ); - if ( $res === array() ) { - $conn->zRem( $this->getWaitSetKey(), $this->session ); // no longer waiting - return Status::newGood( PoolCounter::TIMEOUT ); - } - - $slot = $res[1]; // pool slot or "w" for wake-up notifications - $slotTime = microtime( true ); // last microtime() was a few RTTs ago - // Unregister this process as waiting and bump slot "next release" time - $this->registerAcquisitionTime( $conn, $slot, $slotTime ); - } else { - return Status::newFatal( 'pool-error-unknown', "Server gave slot '$slot'." ); - } - } catch ( RedisException $e ) { - return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); - } - - if ( $slot !== 'w' ) { - $this->slot = $slot; - $this->slotTime = $slotTime; - $this->onRelease = $doWakeup; - self::$active[$this->session] = $this; - } - - return Status::newGood( $slot === 'w' ? PoolCounter::DONE : PoolCounter::LOCKED ); - } - - /** - * @param RedisConnRef $conn - * @param float UNIX timestamp - * @return string|bool False on failure - */ - protected function initAndPopPoolSlotList( RedisConnRef $conn, $now ) { - static $script = -<< 0 then - slot = redis.call('lPop',kSlots) - -- Update the slot "next release" time - redis.call('zAdd',kSlotsNextRelease,rTime + rExpiry,slot) - elseif redis.call('zCard',kSlotWaits) >= 1*rMaxQueue then - slot = 'QUEUE_FULL' - else - slot = 'QUEUE_WAIT' - -- Register this process as waiting - redis.call('zAdd',kSlotWaits,rTime,rSess) - redis.call('expireAt',kSlotWaits,math.ceil(rTime + 2*rTimeout)) - end - -- Always keep renewing the expiry on use - redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) - redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) - return slot -LUA; - return $conn->luaEval( $script, - array( - $this->getSlotListKey(), - $this->getSlotRTimeSetKey(), - $this->getWaitSetKey(), - $this->workers, - $this->maxqueue, - $this->timeout, - $this->lockTTL, - $this->session, - $now - ), - 3 # number of first argument(s) that are keys - ); - } - - /** - * @param RedisConnRef $conn - * @param string $slot - * @param float $now - * @return int|bool False on failure - */ - protected function registerAcquisitionTime( RedisConnRef $conn, $slot, $now ) { - static $script = -<<luaEval( $script, - array( - $this->getSlotListKey(), - $this->getSlotRTimeSetKey(), - $this->getWaitSetKey(), - $slot, - $this->lockTTL, - $this->session, - $now - ), - 3 # number of first argument(s) that are keys - ); - } - - /** - * @return string - */ - protected function getSlotListKey() { - return "poolcounter:l-slots-{$this->keySha1}-{$this->workers}"; - } - - /** - * @return string - */ - protected function getSlotRTimeSetKey() { - return "poolcounter:z-renewtime-{$this->keySha1}-{$this->workers}"; - } - - /** - * @return string - */ - protected function getWaitSetKey() { - return "poolcounter:z-wait-{$this->keySha1}-{$this->workers}"; - } - - /** - * @return string - */ - protected function getWakeupListKey() { - return "poolcounter:l-wakeup-{$this->keySha1}-{$this->workers}"; - } - - /** - * Try to make sure that locks get released (even with exceptions and fatals) - */ - public static function releaseAll() { - foreach ( self::$active as $poolCounter ) { - try { - if ( $poolCounter->slot !== null ) { - $poolCounter->release(); - } - } catch ( Exception $e ) {} - } - } -} diff --git a/includes/poolcounter/PoolCounter.php b/includes/poolcounter/PoolCounter.php new file mode 100644 index 0000000000..1b209e4d04 --- /dev/null +++ b/includes/poolcounter/PoolCounter.php @@ -0,0 +1,142 @@ +key = $key; + $this->workers = $conf['workers']; + $this->maxqueue = $conf['maxqueue']; + $this->timeout = $conf['timeout']; + } + + /** + * Create a Pool counter. This should only be called from the PoolWorks. + * + * @param $type + * @param $key + * + * @return PoolCounter + */ + public static function factory( $type, $key ) { + global $wgPoolCounterConf; + if ( !isset( $wgPoolCounterConf[$type] ) ) { + return new PoolCounter_Stub; + } + $conf = $wgPoolCounterConf[$type]; + $class = $conf['class']; + + return new $class( $conf, $type, $key ); + } + + /** + * @return string + */ + public function getKey() { + return $this->key; + } + + /** + * I want to do this task and I need to do it myself. + * + * @return Status Value is one of Locked/Error + */ + abstract public function acquireForMe(); + + /** + * I want to do this task, but if anyone else does it + * instead, it's also fine for me. I will read its cached data. + * + * @return Status Value is one of Locked/Done/Error + */ + abstract public function acquireForAnyone(); + + /** + * I have successfully finished my task. + * Lets another one grab the lock, and returns the workers + * waiting on acquireForAnyone() + * + * @return Status value is one of Released/NotLocked/Error + */ + abstract public function release(); +} + +class PoolCounter_Stub extends PoolCounter { + public function __construct() { + /* No parameters needed */ + } + + public function acquireForMe() { + return Status::newGood( PoolCounter::LOCKED ); + } + + public function acquireForAnyone() { + return Status::newGood( PoolCounter::LOCKED ); + } + + public function release() { + return Status::newGood( PoolCounter::RELEASED ); + } +} diff --git a/includes/poolcounter/PoolCounterRedis.php b/includes/poolcounter/PoolCounterRedis.php new file mode 100644 index 0000000000..1bc10f21f4 --- /dev/null +++ b/includes/poolcounter/PoolCounterRedis.php @@ -0,0 +1,412 @@ + host) map */ + protected $serversByLabel; + /** @var string SHA-1 of the key */ + protected $keySha1; + /** @var integer TTL for locks to expire (work should finish in this time) */ + protected $lockTTL; + + /** @var RedisConnRef */ + protected $conn; + /** @var string Pool slot value */ + protected $slot; + /** @var integer AWAKE_* constant */ + protected $onRelease; + /** @var string Unique string to identify this process */ + protected $session; + /** @var integer UNIX timestamp */ + protected $slotTime; + + const AWAKE_ONE = 1; // wake-up if when a slot can be taken from an existing process + const AWAKE_ALL = 2; // wake-up if an existing process finishes and wake up such others + + /** @var Array List of active PoolCounterRedis objects in this script */ + protected static $active = null; + + function __construct( $conf, $type, $key ) { + parent::__construct( $conf, $type, $key ); + + $this->serversByLabel = $conf['servers']; + $this->ring = new HashRing( array_fill_keys( array_keys( $conf['servers'] ), 100 ) ); + + $conf['redisConfig']['serializer'] = 'none'; // for use with Lua + $this->pool = RedisConnectionPool::singleton( $conf['redisConfig'] ); + + $this->keySha1 = sha1( $this->key ); + $met = ini_get( 'max_execution_time' ); // usually 0 in CLI mode + $this->lockTTL = $met ? 2*$met : 3600; + + if ( self::$active === null ) { + self::$active = array(); + register_shutdown_function( array( __CLASS__, 'releaseAll' ) ); + } + } + + /** + * @return Status Uses RediConnRef as value on success + */ + protected function getConnection() { + if ( !isset( $this->conn ) ) { + $conn = false; + $servers = $this->ring->getLocations( $this->key, 3 ); + ArrayUtils::consistentHashSort( $servers, $this->key ); + foreach ( $servers as $server ) { + $conn = $this->pool->getConnection( $this->serversByLabel[$server] ); + if ( $conn ) { + break; + } + } + if ( !$conn ) { + return Status::newFatal( 'pool-servererror', implode( ', ', $servers ) ); + } + $this->conn = $conn; + } + return Status::newGood( $this->conn ); + } + + function acquireForMe() { + $section = new ProfileSection( __METHOD__ ); + + return $this->waitForSlotOrNotif( self::AWAKE_ONE ); + } + + function acquireForAnyone() { + $section = new ProfileSection( __METHOD__ ); + + return $this->waitForSlotOrNotif( self::AWAKE_ALL ); + } + + function release() { + $section = new ProfileSection( __METHOD__ ); + + if ( $this->slot === null ) { + return Status::newGood( PoolCounter::NOT_LOCKED ); // not locked + } + + $status = $this->getConnection(); + if ( !$status->isOK() ) { + return $status; + } + $conn = $status->value; + + static $script = +<<= (1*rMaxWorkers - 1) then + -- Clear list to save space; it will re-init as needed + redis.call('del',kSlots,kSlotsNextRelease) + else + -- Add slot back to pool and update the "next release" time + redis.call('rPush',kSlots,rSlot) + redis.call('zAdd',kSlotsNextRelease,rTime + 30,rSlot) + -- Always keep renewing the expiry on use + redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) + redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) + end + end + -- Update an ephemeral list to wake up other clients that can + -- reuse any cached work from this process. Only do this if no + -- slots are currently free (e.g. clients could be waiting). + if 1*rAwakeAll == 1 then + local count = redis.call('zCard',kWaiting) + for i = 1,count do + redis.call('rPush',kWakeup,'w') + end + redis.call('pexpire',kWakeup,1) + end + return 1 +LUA; + try { + $res = $conn->luaEval( $script, + array( + $this->getSlotListKey(), + $this->getSlotRTimeSetKey(), + $this->getWakeupListKey(), + $this->getWaitSetKey(), + $this->workers, + $this->lockTTL, + $this->slot, + $this->slotTime, // used for CAS-style sanity check + ( $this->onRelease === self::AWAKE_ALL ) ? 1 : 0, + microtime( true ) + ), + 4 # number of first argument(s) that are keys + ); + } catch ( RedisException $e ) { + return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); + } + + $this->slot = null; + $this->slotTime = null; + $this->onRelease = null; + unset( self::$active[$this->session] ); + + return Status::newGood( PoolCounter::RELEASED ); + } + + /** + * @param int $doWakeup AWAKE_* constant + * @return Status + */ + protected function waitForSlotOrNotif( $doWakeup ) { + if ( $this->slot !== null ) { + return Status::newGood( PoolCounter::LOCK_HELD ); // already acquired + } + + $status = $this->getConnection(); + if ( !$status->isOK() ) { + return $status; + } + $conn = $status->value; + + $now = microtime( true ); + try { + $slot = $this->initAndPopPoolSlotList( $conn, $now ); + if ( ctype_digit( $slot ) ) { + // Pool slot acquired by this process + $slotTime = $now; + } elseif ( $slot === 'QUEUE_FULL' ) { + // Too many processes are waiting for pooled processes to finish + return Status::newGood( PoolCounter::QUEUE_FULL ); + } elseif ( $slot === 'QUEUE_WAIT' ) { + // This process is now registered as waiting + $keys = ( $doWakeup == self::AWAKE_ALL ) + // Wait for an open slot or wake-up signal (preferring the later) + ? array( $this->getWakeupListKey(), $this->getSlotListKey() ) + // Just wait for an actual pool slot + : array( $this->getSlotListKey() ); + + $res = $conn->blPop( $keys, $this->timeout ); + if ( $res === array() ) { + $conn->zRem( $this->getWaitSetKey(), $this->session ); // no longer waiting + return Status::newGood( PoolCounter::TIMEOUT ); + } + + $slot = $res[1]; // pool slot or "w" for wake-up notifications + $slotTime = microtime( true ); // last microtime() was a few RTTs ago + // Unregister this process as waiting and bump slot "next release" time + $this->registerAcquisitionTime( $conn, $slot, $slotTime ); + } else { + return Status::newFatal( 'pool-error-unknown', "Server gave slot '$slot'." ); + } + } catch ( RedisException $e ) { + return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); + } + + if ( $slot !== 'w' ) { + $this->slot = $slot; + $this->slotTime = $slotTime; + $this->onRelease = $doWakeup; + self::$active[$this->session] = $this; + } + + return Status::newGood( $slot === 'w' ? PoolCounter::DONE : PoolCounter::LOCKED ); + } + + /** + * @param RedisConnRef $conn + * @param float UNIX timestamp + * @return string|bool False on failure + */ + protected function initAndPopPoolSlotList( RedisConnRef $conn, $now ) { + static $script = +<< 0 then + slot = redis.call('lPop',kSlots) + -- Update the slot "next release" time + redis.call('zAdd',kSlotsNextRelease,rTime + rExpiry,slot) + elseif redis.call('zCard',kSlotWaits) >= 1*rMaxQueue then + slot = 'QUEUE_FULL' + else + slot = 'QUEUE_WAIT' + -- Register this process as waiting + redis.call('zAdd',kSlotWaits,rTime,rSess) + redis.call('expireAt',kSlotWaits,math.ceil(rTime + 2*rTimeout)) + end + -- Always keep renewing the expiry on use + redis.call('expireAt',kSlots,math.ceil(rTime + rExpiry)) + redis.call('expireAt',kSlotsNextRelease,math.ceil(rTime + rExpiry)) + return slot +LUA; + return $conn->luaEval( $script, + array( + $this->getSlotListKey(), + $this->getSlotRTimeSetKey(), + $this->getWaitSetKey(), + $this->workers, + $this->maxqueue, + $this->timeout, + $this->lockTTL, + $this->session, + $now + ), + 3 # number of first argument(s) that are keys + ); + } + + /** + * @param RedisConnRef $conn + * @param string $slot + * @param float $now + * @return int|bool False on failure + */ + protected function registerAcquisitionTime( RedisConnRef $conn, $slot, $now ) { + static $script = +<<luaEval( $script, + array( + $this->getSlotListKey(), + $this->getSlotRTimeSetKey(), + $this->getWaitSetKey(), + $slot, + $this->lockTTL, + $this->session, + $now + ), + 3 # number of first argument(s) that are keys + ); + } + + /** + * @return string + */ + protected function getSlotListKey() { + return "poolcounter:l-slots-{$this->keySha1}-{$this->workers}"; + } + + /** + * @return string + */ + protected function getSlotRTimeSetKey() { + return "poolcounter:z-renewtime-{$this->keySha1}-{$this->workers}"; + } + + /** + * @return string + */ + protected function getWaitSetKey() { + return "poolcounter:z-wait-{$this->keySha1}-{$this->workers}"; + } + + /** + * @return string + */ + protected function getWakeupListKey() { + return "poolcounter:l-wakeup-{$this->keySha1}-{$this->workers}"; + } + + /** + * Try to make sure that locks get released (even with exceptions and fatals) + */ + public static function releaseAll() { + foreach ( self::$active as $poolCounter ) { + try { + if ( $poolCounter->slot !== null ) { + $poolCounter->release(); + } + } catch ( Exception $e ) {} + } + } +} diff --git a/includes/poolcounter/PoolCounterWork.php b/includes/poolcounter/PoolCounterWork.php new file mode 100644 index 0000000000..50ddd90a77 --- /dev/null +++ b/includes/poolcounter/PoolCounterWork.php @@ -0,0 +1,219 @@ +poolCounter = PoolCounter::factory( $type, $key ); + } + + /** + * Actually perform the work, caching it if needed + * @return mixed work result or false + */ + abstract public function doWork(); + + /** + * Retrieve the work from cache + * @return mixed work result or false + */ + public function getCachedWork() { + return false; + } + + /** + * A work not so good (eg. expired one) but better than an error + * message. + * @return mixed work result or false + */ + public function fallback() { + return false; + } + + /** + * Do something with the error, like showing it to the user. + * @return bool + */ + public function error( $status ) { + return false; + } + + /** + * Log an error + * + * @param $status Status + * @return void + */ + public function logError( $status ) { + $key = $this->poolCounter->getKey(); + + wfDebugLog( 'poolcounter', "Pool key '$key': " + . $status->getMessage()->inLanguage( 'en' )->useDatabase( false )->text() ); + } + + /** + * Get the result of the work (whatever it is), or the result of the error() function. + * This returns the result of the first applicable method that returns a non-false value, + * where the methods are checked in the following order: + * - a) doWork() : Applies if the work is exclusive or no another process + * is doing it, and on the condition that either this process + * successfully entered the pool or the pool counter is down. + * - b) doCachedWork() : Applies if the work is cacheable and this blocked on another + * process which finished the work. + * - c) fallback() : Applies for all remaining cases. + * If these all fall through (by returning false), then the result of error() is returned. + * + * @param $skipcache bool + * @return mixed + */ + public function execute( $skipcache = false ) { + if ( $this->cacheable && !$skipcache ) { + $status = $this->poolCounter->acquireForAnyone(); + } else { + $status = $this->poolCounter->acquireForMe(); + } + + if ( !$status->isOK() ) { + // Respond gracefully to complete server breakage: just log it and do the work + $this->logError( $status ); + return $this->doWork(); + } + + switch ( $status->value ) { + case PoolCounter::LOCKED: + $result = $this->doWork(); + $this->poolCounter->release(); + return $result; + + case PoolCounter::DONE: + $result = $this->getCachedWork(); + if ( $result === false ) { + /* That someone else work didn't serve us. + * Acquire the lock for me + */ + return $this->execute( true ); + } + return $result; + + case PoolCounter::QUEUE_FULL: + case PoolCounter::TIMEOUT: + $result = $this->fallback(); + + if ( $result !== false ) { + return $result; + } + /* no break */ + + /* These two cases should never be hit... */ + case PoolCounter::ERROR: + default: + $errors = array( + PoolCounter::QUEUE_FULL => 'pool-queuefull', + PoolCounter::TIMEOUT => 'pool-timeout' ); + + $status = Status::newFatal( isset( $errors[$status->value] ) + ? $errors[$status->value] + : 'pool-errorunknown' ); + $this->logError( $status ); + return $this->error( $status ); + } + } +} + +/** + * Convenience class for dealing with PoolCounters using callbacks + * @since 1.22 + */ +class PoolCounterWorkViaCallback extends PoolCounterWork { + /** @var callable */ + protected $doWork; + /** @var callable|null */ + protected $doCachedWork; + /** @var callable|null */ + protected $fallback; + /** @var callable|null */ + protected $error; + + /** + * Build a PoolCounterWork class from a type, key, and callback map. + * + * The callback map must at least have a callback for the 'doWork' method. + * Additionally, callbacks can be provided for the 'doCachedWork', 'fallback', + * and 'error' methods. Methods without callbacks will be no-ops that return false. + * If a 'doCachedWork' callback is provided, then execute() may wait for any prior + * process in the pool to finish and reuse its cached result. + * + * @param string $type + * @param string $key + * @param array $callbacks Map of callbacks + * @throws MWException + */ + public function __construct( $type, $key, array $callbacks ) { + parent::__construct( $type, $key ); + foreach ( array( 'doWork', 'doCachedWork', 'fallback', 'error' ) as $name ) { + if ( isset( $callbacks[$name] ) ) { + if ( !is_callable( $callbacks[$name] ) ) { + throw new MWException( "Invalid callback provided for '$name' function." ); + } + $this->$name = $callbacks[$name]; + } + } + if ( !isset( $this->doWork ) ) { + throw new MWException( "No callback provided for 'doWork' function." ); + } + $this->cacheable = isset( $this->doCachedWork ); + } + + public function doWork() { + return call_user_func_array( $this->doWork, array() ); + } + + public function getCachedWork() { + if ( $this->doCachedWork ) { + return call_user_func_array( $this->doCachedWork, array() ); + } + return false; + } + + public function fallback() { + if ( $this->fallback ) { + return call_user_func_array( $this->fallback, array() ); + } + return false; + } + + public function error( $status ) { + if ( $this->error ) { + return call_user_func_array( $this->error, array( $status ) ); + } + return false; + } +}