From 03833b54e8050005a81eacf7941fe7bc0b6afbb2 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Tue, 11 Dec 2012 16:04:03 -0800 Subject: [PATCH] [JobQueue] Added a JobQueueRedis class. * The Job class now includes an extra "metadata" field now for usage by the queue classes. This is useful for stores that lack IDs. * A RedisConnectionPool class was added for reusability. Change-Id: Ie72536240e27fe1caf11096cdc4c85dfa821d181 --- includes/AutoLoader.php | 5 + includes/clientpool/RedisConnectionPool.php | 298 ++++++++++ includes/job/Job.php | 3 + includes/job/JobQueueRedis.php | 610 ++++++++++++++++++++ includes/job/jobs/DuplicateJob.php | 7 +- 5 files changed, 920 insertions(+), 3 deletions(-) create mode 100644 includes/clientpool/RedisConnectionPool.php create mode 100644 includes/job/JobQueueRedis.php diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index 5eb497d317..f4552f4dcf 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -449,6 +449,10 @@ $wgAutoloadLocalClasses = array( 'TitleDependency' => 'includes/cache/CacheDependency.php', 'TitleListDependency' => 'includes/cache/CacheDependency.php', + # includes/clientpool + 'RedisConnectionPool' => 'includes/clientpool/RedisConnectionPool.php', + 'RedisConnRef' => 'includes/clientpool/RedisConnectionPool.php', + # includes/context 'ContextSource' => 'includes/context/ContextSource.php', 'DerivativeContext' => 'includes/context/DerivativeContext.php', @@ -653,6 +657,7 @@ $wgAutoloadLocalClasses = array( 'JobQueue' => 'includes/job/JobQueue.php', 'JobQueueDB' => 'includes/job/JobQueueDB.php', 'JobQueueGroup' => 'includes/job/JobQueueGroup.php', + 'JobQueueRedis' => 'includes/job/JobQueueRedis.php', # includes/job/jobs 'DoubleRedirectJob' => 'includes/job/jobs/DoubleRedirectJob.php', diff --git a/includes/clientpool/RedisConnectionPool.php b/includes/clientpool/RedisConnectionPool.php new file mode 100644 index 0000000000..95a0afa1ba --- /dev/null +++ b/includes/clientpool/RedisConnectionPool.php @@ -0,0 +1,298 @@ + ((connection info array),...) */ + protected $connections = array(); + /** @var Array (server name => UNIX timestamp) */ + protected $downServers = array(); + + /** @var Array */ + protected static $instances = array(); // (pool ID => RedisConnectionPool) + + const SERVER_DOWN_TTL = 30; // integer; seconds to cache servers as "down" + + /** + * $options include: + * - connectTimeout : The timeout for new connections, in seconds. + * Optional, default is 1 second. + * - persistent : Set this to true to allow connections to persist across + * multiple web requests. False by default. + * - poolSize : Maximim number of idle connections. Default is 5. + * - password : The authentication password, will be sent to Redis in clear text. + * Optional, if it is unspecified, no AUTH command will be sent. + * - serializer : Set to "php" or "igbinary". Default is "php". + * @param array $options + */ + protected function __construct( array $options ) { + if ( !extension_loaded( 'redis' ) ) { + throw new MWException( __CLASS__. ' requires the phpredis extension: ' . + 'https://github.com/nicolasff/phpredis' ); + } + $this->connectTimeout = isset( $options['connectTimeout'] ) + ? $options['connectTimeout'] + : 1; + $this->persistent = isset( $options['persistent'] ) + ? $options['persistent'] + : false; + $this->password = isset( $options['password'] ) + ? $options['password'] + : ''; + $this->poolSize = isset( $options['poolSize'] ) + ? $options['poolSize'] + : 5; + if ( !isset( $options['serializer'] ) || $options['serializer'] === 'php' ) { + $this->serializer = Redis::SERIALIZER_PHP; + } elseif ( $options['serializer'] === 'igbinary' ) { + $this->serializer = Redis::SERIALIZER_IGBINARY; + } else { + throw new MWException( "Invalid serializer specified." ); + } + } + + /** + * @param $options Array + * @return RedisConnectionPool + */ + public static function singleton( array $options ) { + $id = sha1( serialize( $options ) ); + if ( !isset( self::$instances[$id] ) ) { + self::$instances[$id] = new self( $options ); + wfDebug( "Creating a new " . __CLASS__ . " instance with id $id." ); + } + return self::$instances[$id]; + } + + /** + * Get a connection to a redis server. Based on code in RedisBagOStuff.php. + * + * @param $server string A hostname/port combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, port 6379 will be used. + * @return RedisConnRef|bool Returns false on failure + * @throws MWException + */ + public function getConnection( $server ) { + // Check the listing "dead" servers which have had a connection errors. + // Servers are marked dead for a limited period of time, to + // avoid excessive overhead from repeated connection timeouts. + if ( isset( $this->downServers[$server] ) ) { + $now = time(); + if ( $now > $this->downServers[$server] ) { + // Dead time expired + unset( $this->downServers[$server] ); + } else { + // Server is dead + wfDebug( "server $server is marked down for another " . + ( $this->downServers[$server] - $now ) . " seconds, can't get connection" ); + return false; + } + } + + // Check if a connection is already free for use + if ( isset( $this->connections[$server] ) ) { + foreach ( $this->connections[$server] as &$connection ) { + if ( $connection['free'] ) { + $connection['free'] = false; + --$this->idlePoolSize; + return new RedisConnRef( $this, $server, $connection['conn'] ); + } + } + } + + if ( substr( $server, 0, 1 ) === '/' ) { + // UNIX domain socket + // These are required by the redis extension to start with a slash, but + // we still need to set the port to a special value to make it work. + $host = $server; + $port = 0; + } else { + // TCP connection + $hostPort = IP::splitHostAndPort( $server ); + if ( !$hostPort ) { + throw new MWException( __CLASS__.": invalid configured server \"$server\"" ); + } + list( $host, $port ) = $hostPort; + if ( $port === false ) { + $port = 6379; + } + } + + $conn = new Redis(); + try { + if ( $this->persistent ) { + $result = $conn->pconnect( $host, $port, $this->connectTimeout ); + } else { + $result = $conn->connect( $host, $port, $this->connectTimeout ); + } + if ( !$result ) { + wfDebugLog( 'redis', "Could not connect to server $server" ); + // Mark server down for some time to avoid further timeouts + $this->downServers[$server] = time() + self::SERVER_DOWN_TTL; + return false; + } + if ( $this->password !== null ) { + if ( !$conn->auth( $this->password ) ) { + wfDebugLog( 'redis', "Authentication error connecting to $server" ); + } + } + } catch ( RedisException $e ) { + $this->downServers[$server] = time() + self::SERVER_DOWN_TTL; + wfDebugLog( 'redis', "Redis exception: " . $e->getMessage() . "\n" ); + return false; + } + + if ( $conn ) { + $conn->setOption( Redis::OPT_SERIALIZER, $this->serializer ); + $this->connections[$server][] = array( 'conn' => $conn, 'free' => false ); + return new RedisConnRef( $this, $server, $conn ); + } else { + return false; + } + } + + /** + * Mark a connection to a server as free to return to the pool + * + * @param $server string + * @param $conn Redis + * @return boolean + */ + public function freeConnection( $server, Redis $conn ) { + $found = false; + + foreach ( $this->connections[$server] as &$connection ) { + if ( $connection['conn'] === $conn && !$connection['free'] ) { + $connection['free'] = true; + ++$this->idlePoolSize; + break; + } + } + + $this->closeExcessIdleConections(); + + return $found; + } + + /** + * Close any extra idle connections if there are more than the limit + * + * @return void + */ + protected function closeExcessIdleConections() { + if ( $this->idlePoolSize <= $this->poolSize ) { + return; // nothing to do + } + + foreach ( $this->connections as $server => &$serverConnections ) { + foreach ( $serverConnections as $key => &$connection ) { + if ( $connection['free'] ) { + unset( $serverConnections[$key] ); + if ( --$this->idlePoolSize <= $this->poolSize ) { + return; // done + } + } + } + } + } + + /** + * The redis extension throws an exception in response to various read, write + * and protocol errors. Sometimes it also closes the connection, sometimes + * not. The safest response for us is to explicitly destroy the connection + * object and let it be reopened during the next request. + * + * @param $server string + * @param $conn RedisConnRef + * @param $e RedisException + * @return void + */ + public function handleException( $server, RedisConnRef $conn, RedisException $e ) { + wfDebugLog( 'redis', + "Redis exception on server $server: " . $e->getMessage() . "\n" ); + foreach ( $this->connections[$server] as $key => $connection ) { + if ( $connection['conn'] === $conn ) { + $this->idlePoolSize -= $connection['free'] ? 1 : 0; + unset( $this->connections[$server][$key] ); + break; + } + } + } +} + +/** + * Helper class to handle automatically marking connectons as reusable (via RAII pattern) + * + * @ingroup Redis + * @since 1.21 + */ +class RedisConnRef { + /** @var RedisConnectionPool */ + protected $pool; + + protected $server; // string + + /** @var Redis */ + protected $conn; + + /** + * @param $pool RedisConnectionPool + * @param $server string + * @param $conn Redis + */ + public function __construct( RedisConnectionPool $pool, $server, Redis $conn ) { + $this->pool = $pool; + $this->server = $server; + $this->conn = $conn; + } + + public function __call( $name, $arguments ) { + return call_user_func_array( array( $this->conn, $name ), $arguments ); + } + + function __destruct() { + $this->pool->freeConnection( $this->server, $this->conn ); + } +} diff --git a/includes/job/Job.php b/includes/job/Job.php index 927ca4e302..c47569491f 100644 --- a/includes/job/Job.php +++ b/includes/job/Job.php @@ -39,6 +39,9 @@ abstract class Job { $removeDuplicates, $error; + /** @var Array Additional queue metadata */ + public $metadata = array(); + /*------------------------------------------------------------------------- * Abstract functions *------------------------------------------------------------------------*/ diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php new file mode 100644 index 0000000000..6c908a28c3 --- /dev/null +++ b/includes/job/JobQueueRedis.php @@ -0,0 +1,610 @@ +server = $params['redisConf']['server']; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] ); + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + * @throws MWException + */ + protected function doIsEmpty() { + if ( mt_rand( 0, 99 ) == 0 ) { + $this->doInternalMaintenance(); + } + + $conn = $this->getConnection(); + try { + return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doGetSize() + * @return integer + * @throws MWException + */ + protected function doGetSize() { + if ( mt_rand( 0, 99 ) == 0 ) { + $this->doInternalMaintenance(); + } + + $conn = $this->getConnection(); + try { + return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return integer + * @throws MWException + */ + protected function doGetAcquiredCount() { + if ( mt_rand( 0, 99 ) == 0 ) { + $this->doInternalMaintenance(); + } + + $conn = $this->getConnection(); + try { + if ( $this->claimTTL > 0 ) { + return $conn->lSize( $this->getQueueKey( 'l-claimed' ) ); + } else { + return 0; + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * @see JobQueue::doBatchPush() + * @param array $jobs + * @param $flags + * @return bool + * @throws MWException + */ + protected function doBatchPush( array $jobs, $flags ) { + if ( !count( $jobs ) ) { + return true; + } + + // Convert the jobs into a list of field maps + $items = array(); // (uid => job fields map) + foreach ( $jobs as $job ) { + $item = $this->getNewJobFields( $job ); + $items[$item['uid']] = $item; + } + + $dedupUids = array(); // list of uids to check for duplicates + foreach ( $items as $item ) { + if ( $this->isHashUid( $item['uid'] ) ) { // hash identifier => de-duplicate + $dedupUids[] = $item['uid']; + } + } + + $conn = $this->getConnection(); + try { + // Find which of these jobs are duplicates unclaimed jobs in the queue... + if ( count( $dedupUids ) ) { + $conn->multi( Redis::PIPELINE ); + foreach ( $dedupUids as $uid ) { // check if job data exists + $conn->exists( $this->prefixWithQueueKey( 'data', $uid ) ); + } + if ( $this->claimTTL > 0 ) { // check which jobs were claimed + foreach ( $dedupUids as $uid ) { + $conn->hExists( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime' ); + } + list( $exists, $claimed ) = array_chunk( $conn->exec(), count( $dedupUids ) ); + } else { + $exists = $conn->exec(); + $claimed = array(); // no claim system + } + // Remove the duplicate jobs to cut down on pushing duplicate uids... + foreach ( $dedupUids as $k => $uid ) { + if ( $exists[$k] && empty( $claimed[$k] ) ) { + unset( $items[$uid] ); + } + } + } + // Actually push the non-duplicate jobs into the queue... + if ( count( $items ) ) { + $uids = array_keys( $items ); + $conn->multi( Redis::MULTI ); // begin (atomic trx) + $conn->mSet( $this->prefixKeysWithQueueKey( 'data', $items ) ); + call_user_func_array( + array( $conn, 'lPush' ), + array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uids ) + ); + $res = $conn->exec(); // commit (atomic trx) + if ( in_array( false, $res, true ) ) { + wfDebugLog( 'JobQueueRedis', "Could not insert {$this->type} job(s)." ); + return false; + } + } + wfIncrStats( 'job-insert', count( $items ) ); + wfIncrStats( 'job-insert-duplicate', count( $jobs ) - count( $items ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return true; + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + * @throws MWException + */ + protected function doPop() { + $job = false; + + if ( mt_rand( 0, 99 ) == 0 ) { + $this->doInternalMaintenance(); + } + + $conn = $this->getConnection(); + try { + do { + // Atomically pop an item off the queue and onto the "claimed" list + $uid = $conn->rpoplpush( + $this->getQueueKey( 'l-unclaimed' ), + $this->getQueueKey( 'l-claimed' ) + ); + if ( $uid === false ) { + break; // no jobs; nothing to do + } + + wfIncrStats( 'job-pop' ); + $conn->multi( Redis::PIPELINE ); + $conn->get( $this->prefixWithQueueKey( 'data', $uid ) ); + if ( $this->claimTTL > 0 ) { + // Set the claim timestamp metadata. If this step fails, then + // the timestamp will be assumed to be the current timestamp by + // recycleAndDeleteStaleJobs() as of the next time that it runs. + // If two runners claim duplicate jobs, one will abort here. + $conn->hSetNx( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', time() ); + } else { + // If this fails, the message key will be deleted in cleanupClaimedJobs(). + // If two runners claim duplicate jobs, one of them will abort here. + $conn->delete( + $this->prefixWithQueueKey( 'h-meta', $uid ), + $this->prefixWithQueueKey( 'data', $uid ) ); + } + list( $item, $ok ) = $conn->exec(); + if ( $item === false || ( $this->claimTTL && !$ok ) ) { + wfDebug( "Could not find or delete job $uid; probably was a duplicate." ); + continue; // job was probably a duplicate + } + + // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed + $job = $this->getJobFromFields( $item ); // may be false + } while ( !$job ); // job may be false if invalid + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + wfIncrStats( 'job-pop-duplicate' ); + return DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( MWException $e ) {} // don't lose jobs over this + + return $job; + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @return Job|bool + * @throws MWException + */ + protected function doAck( Job $job ) { + if ( $this->claimTTL > 0 ) { + $conn = $this->getConnection(); + try { + // Get the exact field map this Job came from, regardless of whether + // the job was transformed into a DuplicateJob or anything of the sort. + $item = $job->metadata['sourceFields']; + + $conn->multi( Redis::MULTI ); // begin (atomic trx) + // Remove the first instance of this job scanning right-to-left. + // This is O(N) in the worst case, but is likely to be much faster since + // jobs are pushed to the left and we are starting from the right, where + // the longest running jobs are likely to be. These should be the first + // jobs to be acknowledged assuming that job run times are roughly equal. + $conn->lRem( $this->getQueueKey( 'l-claimed' ), $item['uid'], -1 ); + // Delete the job data and its claim metadata + $conn->delete( + $this->prefixWithQueueKey( 'h-meta', $item['uid'] ), + $this->prefixWithQueueKey( 'data', $item['uid'] ) ); + $res = $conn->exec(); // commit (atomic trx) + + if ( in_array( false, $res, true ) ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + return false; + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + return true; + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param Job $job + * @return bool + * @throws MWException + */ + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); + } + $key = $this->getRootJobKey( $params['rootJobSignature'] ); + + $conn = $this->getConnection(); + try { + $timestamp = $conn->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + // Update the timestamp of the last root job started at the location... + return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param $job Job + * @return bool + * @throws MWException + */ + protected function isRootJobOldDuplicate( Job $job ) { + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + return false; // job has no de-deplication info + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + wfDebugLog( 'JobQueueRedis', "Cannot check root job; missing 'rootJobTimestamp'." ); + return false; + } + + $conn = $this->getConnection(); + try { + // Get the last time this root job was enqueued + $timestamp = $conn->get( $this->getRootJobKey( $params['rootJobSignature'] ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * Do any job recycling or queue cleanup as needed + * + * @return void + * @return integer Number of jobs recycled/deleted + * @throws MWException + */ + protected function doInternalMaintenance() { + return ( $this->claimTTL > 0 ) ? + $this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs(); + } + + /** + * Recycle or destroy any jobs that have been claimed for too long + * + * @return integer Number of jobs recycled/deleted + * @throws MWException + */ + protected function recycleAndDeleteStaleJobs() { + $count = 0; + // For each job item that can be retried, we need to add it back to the + // main queue and remove it from the list of currenty claimed job items. + $conn = $this->getConnection(); + try { + // Avoid duplicate insertions of items to be re-enqueued + $conn->multi( Redis::MULTI ); + $conn->setnx( $this->getQueueKey( 'lock' ), 1 ); + $conn->expire( $this->getQueueKey( 'lock' ), 3600 ); + if ( $conn->exec() !== array( true, true ) ) { // lock + return $count; // already in progress + } + + $now = time(); + $claimCutoff = $now - $this->claimTTL; + $pruneCutoff = $now - self::MAX_AGE_PRUNE; + + // Get the list of all claimed jobs + $claimedUids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 ); + // Get a map of (uid => claim metadata) for all claimed jobs + $metadata = $conn->mGet( $this->prefixValuesWithQueueKey( 'h-meta', $claimedUids ) ); + + $uidsPush = array(); // items IDs to move to the "unclaimed" queue + $uidsRemove = array(); // item IDs to remove from "claimed" queue + foreach ( $claimedUids as $i => $uid ) { // all claimed items + $info = $metadata[$i] ? $metadata[$i] : array(); + if ( isset( $info['ctime'] ) || isset( $info['rctime'] ) ) { + // Prefer "ctime" (set by pop()) over "rctime" (set by this function) + $ctime = isset( $info['ctime'] ) ? $info['ctime'] : $info['rctime']; + // Claimed job claimed for too long? + if ( $ctime < $claimCutoff ) { + // Get the number of failed attempts + $attempts = isset( $info['attempts'] ) ? $info['attempts'] : 0; + if ( $attempts < self::MAX_ATTEMPTS ) { + $uidsPush[] = $uid; // retry it + } elseif ( $ctime < $pruneCutoff ) { + $uidsRemove[] = $uid; // just remove it + } + } + } else { + // If pop() failed to set the claim timestamp, set it to the current time. + // Since that function sets this non-atomically *after* moving the job to + // the "claimed" queue, it may be the case that it just didn't set it yet. + $conn->hSet( $this->prefixWithQueueKey( 'h-meta', $uid ), 'rctime', $now ); + } + } + + $conn->multi( Redis::MULTI ); // begin (atomic trx) + if ( count( $uidsPush ) ) { // move from "l-claimed" to "l-unclaimed" + call_user_func_array( + array( $conn, 'lPush' ), + array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uidsPush ) + ); + foreach ( $uidsPush as $uid ) { + $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 ); + $conn->hDel( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', 'rctime' ); + $conn->hIncrBy( $this->prefixWithQueueKey( 'h-meta', $uid ), 'attempts', 1 ); + } + } + foreach ( $uidsRemove as $uid ) { // remove from "l-claimed" + $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 ); + $conn->delete( // delete job data and metadata + $this->prefixWithQueueKey( 'h-meta', $uid ), + $this->prefixWithQueueKey( 'data', $uid ) ); + } + $res = $conn->exec(); // commit (atomic trx) + + if ( in_array( false, $res, true ) ) { + wfDebugLog( 'JobQueueRedis', "Could not recycle {$this->type} job(s)." ); + } else { + $count += ( count( $uidsPush ) + count( $uidsRemove ) ); + wfIncrStats( 'job-recycle', count( $uidsPush ) ); + } + + $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $count; + } + + /** + * Destroy any jobs that have been claimed + * + * @return integer Number of jobs deleted + * @throws MWException + */ + protected function cleanupClaimedJobs() { + $count = 0; + // Make sure the message for claimed jobs was deleted + // and remove the claimed job IDs from the "claimed" list. + $conn = $this->getConnection(); + try { + // Avoid races and duplicate effort + $conn->multi( Redis::MULTI ); + $conn->setnx( $this->getQueueKey( 'lock' ), 1 ); + $conn->expire( $this->getQueueKey( 'lock' ), 3600 ); + if ( $conn->exec() !== array( true, true ) ) { // lock + return $count; // already in progress + } + // Get the list of all claimed jobs + $uids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 ); + if ( count( $uids ) ) { + // Delete the message keys and delist the corresponding ids. + // Since the only other changes to "l-claimed" are left pushes, we can just strip + // off the elements read here using a right trim based on the number of ids read. + $conn->multi( Redis::MULTI ); // begin (atomic trx) + $conn->lTrim( $this->getQueueKey( 'l-claimed' ), 0, -count( $uids ) - 1 ); + $conn->delete( array_merge( + $this->prefixValuesWithQueueKey( 'h-meta', $uids ), + $this->prefixValuesWithQueueKey( 'data', $uids ) + ) ); + $res = $conn->exec(); // commit (atomic trx) + + if ( in_array( false, $res, true ) ) { + wfDebugLog( 'JobQueueRedis', "Could not purge {$this->type} job(s)." ); + } else { + $count += count( $uids ); + } + } + $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + + return $count; + } + + /** + * @param $job Job + * @return array + */ + protected function getNewJobFields( Job $job ) { + return array( + // Fields that describe the nature of the job + 'type' => $job->getType(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getDBkey(), + 'params' => $job->getParams(), + // Additional metadata + 'uid' => $job->ignoreDuplicates() + ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) + : wfRandomString( 32 ), + 'timestamp' => time() // UNIX timestamp + ); + } + + /** + * @param $fields array + * @return Job|bool + */ + protected function getJobFromFields( array $fields ) { + $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); + if ( $title ) { + $job = Job::factory( $fields['type'], $title, $fields['params'] ); + $job->metadata['sourceFields'] = $fields; + return $job; + } + return false; + } + + /** + * @param $uid string Job UID + * @return bool Whether $uid is a SHA-1 hash based identifier for de-duplication + */ + protected function isHashUid( $uid ) { + return strlen( $uid ) == 31; + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return Array (server name, Redis instance) + * @throws MWException + */ + protected function getConnection() { + $conn = $this->redisPool->getConnection( $this->server ); + if ( !$conn ) { + throw new MWException( "Unable to connect to redis server." ); + } + return $conn; + } + + /** + * @param $server string + * @param $conn RedisConnRef + * @param $e RedisException + * @throws MWException + */ + protected function throwRedisException( $server, RedisConnRef $conn, $e ) { + $this->redisPool->handleException( $server, $conn, $e ); + throw new MWException( "Redis server error: {$e->getMessage()}\n" ); + } + + /** + * @param $prop string + * @return string + */ + private function getQueueKey( $prop ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + private function getRootJobKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + } + + /** + * @param $prop string + * @param $string string + * @return string + */ + private function prefixWithQueueKey( $prop, $string ) { + return $this->getQueueKey( $prop ) . ':' . $string; + } + + /** + * @param $prop string + * @param $items array + * @return Array + */ + private function prefixValuesWithQueueKey( $prop, array $items ) { + $res = array(); + foreach ( $items as $item ) { + $res[] = $this->prefixWithQueueKey( $prop, $item ); + } + return $res; + } + + /** + * @param $prop string + * @param $items array + * @return Array + */ + private function prefixKeysWithQueueKey( $prop, array $items ) { + $res = array(); + foreach ( $items as $key => $item ) { + $res[$this->prefixWithQueueKey( $prop, $key )] = $item; + } + return $res; + } +} diff --git a/includes/job/jobs/DuplicateJob.php b/includes/job/jobs/DuplicateJob.php index 23418e303d..1b64b1010a 100644 --- a/includes/job/jobs/DuplicateJob.php +++ b/includes/job/jobs/DuplicateJob.php @@ -46,9 +46,10 @@ final class DuplicateJob extends Job { */ public static function newFromJob( Job $job ) { $djob = new self( $job->getTitle(), $job->getParams(), $job->getId() ); - $djob->command = $job->getType(); - $djob->params = is_array( $djob->params ) ? $djob->params : array(); - $djob->params = array( 'isDuplicate' => true ) + $djob->params; + $djob->command = $job->getType(); + $djob->params = is_array( $djob->params ) ? $djob->params : array(); + $djob->params = array( 'isDuplicate' => true ) + $djob->params; + $djob->metadata = $job->metadata; return $djob; } -- 2.20.1