*/
class JobQueueGroup {
/** @var JobQueueGroup[] */
- protected static $instances = array();
+ protected static $instances = [];
/** @var ProcessCacheLRU */
protected $cache;
/** @var string Wiki ID */
protected $wiki;
+ /** @var string|bool Read only rationale (or false if r/w) */
+ protected $readOnlyReason;
/** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
protected $coalescedQueues;
/** @var Job[] */
- protected $bufferedJobs = array();
+ protected $bufferedJobs = [];
const TYPE_DEFAULT = 1; // integer; jobs popped by default
const TYPE_ANY = 2; // integer; any job
/**
* @param string $wiki Wiki ID
+ * @param string|bool $readOnlyReason Read-only reason or false
*/
- protected function __construct( $wiki ) {
+ protected function __construct( $wiki, $readOnlyReason ) {
$this->wiki = $wiki;
+ $this->readOnlyReason = $readOnlyReason;
$this->cache = new ProcessCacheLRU( 10 );
}
public static function singleton( $wiki = false ) {
$wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
if ( !isset( self::$instances[$wiki] ) ) {
- self::$instances[$wiki] = new self( $wiki );
+ self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
}
return self::$instances[$wiki];
* @return void
*/
public static function destroySingletons() {
- self::$instances = array();
+ self::$instances = [];
}
/**
public function get( $type ) {
global $wgJobTypeConf;
- $conf = array( 'wiki' => $this->wiki, 'type' => $type );
+ $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
if ( isset( $wgJobTypeConf[$type] ) ) {
$conf = $conf + $wgJobTypeConf[$type];
} else {
$conf = $conf + $wgJobTypeConf['default'];
}
$conf['aggregator'] = JobQueueAggregator::singleton();
+ if ( $this->readOnlyReason !== false ) {
+ $conf['readOnlyReason'] = $this->readOnlyReason;
+ }
return JobQueue::factory( $conf );
}
* @return void
*/
public function push( $jobs ) {
- $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
if ( !count( $jobs ) ) {
return;
}
$this->assertValidJobs( $jobs );
- $jobsByType = array(); // (job type => list of jobs)
+ $jobsByType = []; // (job type => list of jobs)
foreach ( $jobs as $job ) {
$jobsByType[$job->getType()][] = $job;
}
return;
}
- $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
// Throw errors now instead of on push(), when other jobs may be buffered
$this->assertValidJobs( $jobs );
*/
public static function pushLazyJobs() {
foreach ( self::$instances as $group ) {
- $group->push( $group->bufferedJobs );
- $group->bufferedJobs = array();
+ try {
+ $group->push( $group->bufferedJobs );
+ $group->bufferedJobs = [];
+ } catch ( Exception $e ) {
+ // Get in as many jobs as possible and let other post-send updates happen
+ MWExceptionHandler::logException( $e );
+ }
}
}
* @param array $blacklist List of job types to ignore
* @return Job|bool Returns false on failure
*/
- public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = array() ) {
+ public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
$job = false;
if ( is_string( $qtype ) ) { // specific job type
* @since 1.23
*/
public function queuesHaveJobs( $type = self::TYPE_ANY ) {
- global $wgMemc;
-
$key = wfMemcKey( 'jobqueue', 'queueshavejobs', $type );
+ $cache = ObjectCache::getLocalClusterInstance();
- $value = $wgMemc->get( $key );
+ $value = $cache->get( $key );
if ( $value === false ) {
$queues = $this->getQueuesWithJobs();
if ( $type == self::TYPE_DEFAULT ) {
$queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
}
$value = count( $queues ) ? 'true' : 'false';
- $wgMemc->add( $key, $value, 15 );
+ $cache->add( $key, $value, 15 );
}
return ( $value === 'true' );
* @return array List of job types that have non-empty queues
*/
public function getQueuesWithJobs() {
- $types = array();
+ $types = [];
foreach ( $this->getCoalescedQueues() as $info ) {
$nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
if ( is_array( $nonEmpty ) ) { // batching features supported
* @return array Map of (job type => size)
*/
public function getQueueSizes() {
- $sizeMap = array();
+ $sizeMap = [];
foreach ( $this->getCoalescedQueues() as $info ) {
$sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
if ( is_array( $sizes ) ) { // batching features supported
global $wgJobTypeConf;
if ( $this->coalescedQueues === null ) {
- $this->coalescedQueues = array();
+ $this->coalescedQueues = [];
foreach ( $wgJobTypeConf as $type => $conf ) {
$queue = JobQueue::factory(
- array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
+ [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
$loc = $queue->getCoalesceLocationInternal();
if ( !isset( $this->coalescedQueues[$loc] ) ) {
$this->coalescedQueues[$loc]['queue'] = $queue;
- $this->coalescedQueues[$loc]['types'] = array();
+ $this->coalescedQueues[$loc]['types'] = [];
}
if ( $type === 'default' ) {
$this->coalescedQueues[$loc]['types'] = array_merge(
* @return mixed
*/
private function getCachedConfigVar( $name ) {
- global $wgConf, $wgMemc;
-
+ // @TODO: cleanup this whole method with a proper config system
if ( $this->wiki === wfWikiID() ) {
return $GLOBALS[$name]; // common case
} else {
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
- $value = $wgMemc->get( $key ); // ('v' => ...) or false
- if ( is_array( $value ) ) {
- return $value['v'];
- } else {
- $value = $wgConf->getConfig( $this->wiki, $name );
- $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
-
- return $value;
- }
+ $wiki = $this->wiki;
+ $cache = ObjectCache::getMainWANInstance();
+ $value = $cache->getWithSetCallback(
+ $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
+ $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
+ function () use ( $wiki, $name ) {
+ global $wgConf;
+
+ return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
+ },
+ [ 'pcTTL' => 30 ]
+ );
+
+ return $value['v'];
}
}