dépôts
/
lhc
/
web
/
wiklou.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge "Add missing subjectspace pages to watchlist on update."
[lhc/web/wiklou.git]
/
includes
/
job
/
JobQueueDB.php
diff --git
a/includes/job/JobQueueDB.php
b/includes/job/JobQueueDB.php
index
c39083d
..
79ff4e8
100644
(file)
--- a/
includes/job/JobQueueDB.php
+++ b/
includes/job/JobQueueDB.php
@@
-37,7
+37,8
@@
class JobQueueDB extends JobQueue {
/** @var BagOStuff */
protected $cache;
/** @var BagOStuff */
protected $cache;
- protected $cluster = false; // string; name of an external DB cluster
+ /** @var bool|string Name of an external DB cluster. False if not set */
+ protected $cluster = false;
/**
* Additional parameters include:
/**
* Additional parameters include:
@@
-45,7
+46,7
@@
class JobQueueDB extends JobQueue {
* If not specified, the primary DB cluster for the wiki will be used.
* This can be overridden with a custom cluster so that DB handles will
* be retrieved via LBFactory::getExternalLB() and getConnection().
* If not specified, the primary DB cluster for the wiki will be used.
* This can be overridden with a custom cluster so that DB handles will
* be retrieved via LBFactory::getExternalLB() and getConnection().
- * @param
$params array
+ * @param
array $params
*/
protected function __construct( array $params ) {
global $wgMemc;
*/
protected function __construct( array $params ) {
global $wgMemc;
@@
-94,7
+95,7
@@
class JobQueueDB extends JobQueue {
/**
* @see JobQueue::doGetSize()
/**
* @see JobQueue::doGetSize()
- * @return int
eger
+ * @return int
*/
protected function doGetSize() {
$key = $this->getCacheKey( 'size' );
*/
protected function doGetSize() {
$key = $this->getCacheKey( 'size' );
@@
-120,7
+121,7
@@
class JobQueueDB extends JobQueue {
/**
* @see JobQueue::doGetAcquiredCount()
/**
* @see JobQueue::doGetAcquiredCount()
- * @return int
eger
+ * @return int
*/
protected function doGetAcquiredCount() {
if ( $this->claimTTL <= 0 ) {
*/
protected function doGetAcquiredCount() {
if ( $this->claimTTL <= 0 ) {
@@
-150,7
+151,7
@@
class JobQueueDB extends JobQueue {
/**
* @see JobQueue::doGetAbandonedCount()
/**
* @see JobQueue::doGetAbandonedCount()
- * @return int
eger
+ * @return int
* @throws MWException
*/
protected function doGetAbandonedCount() {
* @throws MWException
*/
protected function doGetAbandonedCount() {
@@
-198,7
+199,7
@@
class JobQueueDB extends JobQueue {
$that = $this;
$method = __METHOD__;
$dbw->onTransactionIdle(
$that = $this;
$method = __METHOD__;
$dbw->onTransactionIdle(
- function() use ( $dbw, $that, $jobs, $flags, $method ) {
+ function
() use ( $dbw, $that, $jobs, $flags, $method ) {
$that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
}
);
$that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
}
);
@@
-209,12
+210,12
@@
class JobQueueDB extends JobQueue {
/**
* This function should *not* be called outside of JobQueueDB
*
/**
* This function should *not* be called outside of JobQueueDB
*
- * @param
DatabaseB
ase $dbw
+ * @param
IDatab
ase $dbw
* @param array $jobs
* @param int $flags
* @param string $method
* @param array $jobs
* @param int $flags
* @param string $method
- * @
return boolean
- * @
throws type
+ * @
throws DBError
+ * @
return bool
*/
public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
if ( !count( $jobs ) ) {
*/
public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
if ( !count( $jobs ) ) {
@@
-258,8
+259,11
@@
class JobQueueDB extends JobQueue {
$dbw->insert( 'job', $rowBatch, $method );
}
JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
$dbw->insert( 'job', $rowBatch, $method );
}
JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
- JobQueue::incrStats( 'job-insert-duplicate', $this->type,
- count( $rowSet ) + count( $rowList ) - count( $rows ) );
+ JobQueue::incrStats(
+ 'job-insert-duplicate',
+ $this->type,
+ count( $rowSet ) + count( $rowList ) - count( $rows )
+ );
} catch ( DBError $e ) {
if ( $flags & self::QOS_ATOMIC ) {
$dbw->rollback( $method );
} catch ( DBError $e ) {
if ( $flags & self::QOS_ATOMIC ) {
$dbw->rollback( $method );
@@
-289,7
+293,7
@@
class JobQueueDB extends JobQueue {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
- $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+ $scopedReset = new ScopedCallback( function
() use ( $dbw, $autoTrx ) {
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
} );
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
} );
@@
-320,7
+324,6
@@
class JobQueueDB extends JobQueue {
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->metadata['id'] = $row->job_id;
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->metadata['id'] = $row->job_id;
- $job->id = $row->job_id; // XXX: work around broken subclasses
break; // done
} while ( true );
} catch ( DBError $e ) {
break; // done
} while ( true );
} catch ( DBError $e ) {
@@
-336,7
+339,7
@@
class JobQueueDB extends JobQueue {
* @param string $uuid 32 char hex string
* @param $rand integer Random unsigned integer (31 bits)
* @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
* @param string $uuid 32 char hex string
* @param $rand integer Random unsigned integer (31 bits)
* @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
- * @return Row|false
+ * @return
stdClass|bool
Row|false
*/
protected function claimRandom( $uuid, $rand, $gte ) {
$dbw = $this->getMasterDB();
*/
protected function claimRandom( $uuid, $rand, $gte ) {
$dbw = $this->getMasterDB();
@@
-386,6
+389,7
@@
class JobQueueDB extends JobQueue {
continue; // use job_random
}
}
continue; // use job_random
}
}
+
if ( $row ) { // claim the job
$dbw->update( 'job', // update by PK
array(
if ( $row ) { // claim the job
$dbw->update( 'job', // update by PK
array(
@@
-412,7
+416,7
@@
class JobQueueDB extends JobQueue {
* Reserve a row with a single UPDATE without holding row locks over RTTs...
*
* @param string $uuid 32 char hex string
* Reserve a row with a single UPDATE without holding row locks over RTTs...
*
* @param string $uuid 32 char hex string
- * @return Row|false
+ * @return
stdClass|bool
Row|false
*/
protected function claimOldest( $uuid ) {
$dbw = $this->getMasterDB();
*/
protected function claimOldest( $uuid ) {
$dbw = $this->getMasterDB();
@@
-485,7
+489,7
@@
class JobQueueDB extends JobQueue {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
- $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+ $scopedReset = new ScopedCallback( function
() use ( $dbw, $autoTrx ) {
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
} );
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
} );
@@
-520,7
+524,7
@@
class JobQueueDB extends JobQueue {
// jobs to become no-ops without any actual jobs that made them redundant.
$dbw = $this->getMasterDB();
$cache = $this->dupCache;
// jobs to become no-ops without any actual jobs that made them redundant.
$dbw = $this->getMasterDB();
$cache = $this->dupCache;
- $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) {
+ $dbw->onTransactionIdle( function
() use ( $cache, $params, $key, $dbw ) {
$timestamp = $cache->get( $key ); // current last timestamp of this job
if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
return true; // a newer version of this root job was enqueued
$timestamp = $cache->get( $key ); // current last timestamp of this job
if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
return true; // a newer version of this root job was enqueued
@@
-544,6
+548,7
@@
class JobQueueDB extends JobQueue {
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
+
return true;
}
return true;
}
@@
-556,7
+561,7
@@
class JobQueueDB extends JobQueue {
}
/**
}
/**
- * @return
A
rray
+ * @return
a
rray
*/
protected function doGetPeriodicTasks() {
return array(
*/
protected function doGetPeriodicTasks() {
return array(
@@
-586,15
+591,13
@@
class JobQueueDB extends JobQueue {
return new MappedIterator(
$dbr->select( 'job', '*',
array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
return new MappedIterator(
$dbr->select( 'job', '*',
array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
- function( $row ) use ( $dbr ) {
+ function
( $row ) use ( $dbr ) {
$job = Job::factory(
$row->job_cmd,
Title::makeTitle( $row->job_namespace, $row->job_title ),
$job = Job::factory(
$row->job_cmd,
Title::makeTitle( $row->job_namespace, $row->job_title ),
- strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
- $row->job_id
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
);
$job->metadata['id'] = $row->job_id;
);
$job->metadata['id'] = $row->job_id;
- $job->id = $row->job_id; // XXX: work around broken subclasses
return $job;
}
);
return $job;
}
);
@@
-618,6
+621,7
@@
class JobQueueDB extends JobQueue {
foreach ( $res as $row ) {
$types[] = $row->job_cmd;
}
foreach ( $res as $row ) {
$types[] = $row->job_cmd;
}
+
return $types;
}
return $types;
}
@@
-630,13
+634,14
@@
class JobQueueDB extends JobQueue {
foreach ( $res as $row ) {
$sizes[$row->job_cmd] = (int)$row->count;
}
foreach ( $res as $row ) {
$sizes[$row->job_cmd] = (int)$row->count;
}
+
return $sizes;
}
/**
* Recycle or destroy any jobs that have been claimed for too long
*
return $sizes;
}
/**
* Recycle or destroy any jobs that have been claimed for too long
*
- * @return int
eger
Number of jobs recycled/deleted
+ * @return int Number of jobs recycled/deleted
*/
public function recycleAndDeleteStaleJobs() {
$now = time();
*/
public function recycleAndDeleteStaleJobs() {
$now = time();
@@
-663,7
+668,7
@@
class JobQueueDB extends JobQueue {
__METHOD__
);
$ids = array_map(
__METHOD__
);
$ids = array_map(
- function( $o ) {
+ function
( $o ) {
return $o->job_id;
}, iterator_to_array( $res )
);
return $o->job_id;
}, iterator_to_array( $res )
);
@@
-699,7
+704,7
@@
class JobQueueDB extends JobQueue {
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
$ids = array_map(
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
$ids = array_map(
- function( $o ) {
+ function
( $o ) {
return $o->job_id;
}, iterator_to_array( $res )
);
return $o->job_id;
}, iterator_to_array( $res )
);
@@
-718,29
+723,31
@@
class JobQueueDB extends JobQueue {
}
/**
}
/**
- * @param
$job J
ob
+ * @param
Job $j
ob
* @return array
*/
protected function insertFields( Job $job ) {
$dbw = $this->getMasterDB();
* @return array
*/
protected function insertFields( Job $job ) {
$dbw = $this->getMasterDB();
+
return array(
// Fields that describe the nature of the job
return array(
// Fields that describe the nature of the job
- 'job_cmd'
=> $job->getType(),
+ 'job_cmd' => $job->getType(),
'job_namespace' => $job->getTitle()->getNamespace(),
'job_namespace' => $job->getTitle()->getNamespace(),
- 'job_title'
=> $job->getTitle()->getDBkey(),
- 'job_params'
=> self::makeBlob( $job->getParams() ),
+ 'job_title' => $job->getTitle()->getDBkey(),
+ 'job_params' => self::makeBlob( $job->getParams() ),
// Additional job metadata
// Additional job metadata
- 'job_id'
=> $dbw->nextSequenceValue( 'job_job_id_seq' ),
+ 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
'job_timestamp' => $dbw->timestamp(),
'job_timestamp' => $dbw->timestamp(),
- 'job_sha1'
=> wfBaseConvert(
+ 'job_sha1' => wfBaseConvert(
sha1( serialize( $job->getDeduplicationInfo() ) ),
16, 36, 31
),
sha1( serialize( $job->getDeduplicationInfo() ) ),
16, 36, 31
),
- 'job_random'
=> mt_rand( 0, self::MAX_JOB_RANDOM )
+ 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
);
}
/**
);
}
/**
+ * @throws JobQueueConnectionError
* @return DBConnRef
*/
protected function getSlaveDB() {
* @return DBConnRef
*/
protected function getSlaveDB() {
@@
-752,6
+759,7
@@
class JobQueueDB extends JobQueue {
}
/**
}
/**
+ * @throws JobQueueConnectionError
* @return DBConnRef
*/
protected function getMasterDB() {
* @return DBConnRef
*/
protected function getMasterDB() {
@@
-770,15
+778,18
@@
class JobQueueDB extends JobQueue {
$lb = ( $this->cluster !== false )
? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
: wfGetLB( $this->wiki );
$lb = ( $this->cluster !== false )
? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
: wfGetLB( $this->wiki );
+
return $lb->getConnectionRef( $index, array(), $this->wiki );
}
/**
return $lb->getConnectionRef( $index, array(), $this->wiki );
}
/**
+ * @param $property
* @return string
*/
private function getCacheKey( $property ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
$cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
* @return string
*/
private function getCacheKey( $property ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
$cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
+
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
}
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
}