* @author Aaron Schulz
*/
use MediaWiki\MediaWikiServices;
+use Wikimedia\ScopedCallback;
/**
* Class to handle job queues stored in the DB
* @return bool
*/
protected function doIsEmpty() {
- $dbr = $this->getSlaveDB();
+ $dbr = $this->getReplicaDB();
try {
$found = $dbr->selectField( // unclaimed job
'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
}
try {
- $dbr = $this->getSlaveDB();
+ $dbr = $this->getReplicaDB();
$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
[ 'job_cmd' => $this->type, 'job_token' => '' ],
__METHOD__
return $count;
}
- $dbr = $this->getSlaveDB();
+ $dbr = $this->getReplicaDB();
try {
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
[ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
return $count;
}
- $dbr = $this->getSlaveDB();
+ $dbr = $this->getReplicaDB();
try {
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
[
$dbw->onTransactionIdle(
function () use ( $dbw, $jobs, $flags, $method ) {
$this->doBatchPushInternal( $dbw, $jobs, $flags, $method );
- }
+ },
+ __METHOD__
);
}
$invertedDirection = false; // whether one job_random direction was already scanned
// This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
// instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
- // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
+ // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
// be used here with MySQL.
do {
if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
$row = false; // the row acquired
do {
if ( $dbw->getType() === 'mysql' ) {
- // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
+ // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
// same table being changed in an UPDATE query in MySQL (gives Error: 1093).
// Oracle and Postgre have no such limitation. However, MySQL offers an
// alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
// jobs to become no-ops without any actual jobs that made them redundant.
$dbw = $this->getMasterDB();
$cache = $this->dupCache;
- $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
- $timestamp = $cache->get( $key ); // current last timestamp of this job
- if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
- return true; // a newer version of this root job was enqueued
- }
+ $dbw->onTransactionIdle(
+ function () use ( $cache, $params, $key, $dbw ) {
+ $timestamp = $cache->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
- // Update the timestamp of the last root job started at the location...
- return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
- } );
+ // Update the timestamp of the last root job started at the location...
+ return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ },
+ __METHOD__
+ );
return true;
}
* @return Iterator
*/
protected function getJobIterator( array $conds ) {
- $dbr = $this->getSlaveDB();
+ $dbr = $this->getReplicaDB();
try {
return new MappedIterator(
$dbr->select( 'job', self::selectFields(), $conds ),
}
protected function doGetSiblingQueuesWithJobs( array $types ) {
- $dbr = $this->getSlaveDB();
+ $dbr = $this->getReplicaDB();
// @note: this does not check whether the jobs are claimed or not.
// This is useful so JobQueueGroup::pop() also sees queues that only
// have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
}
protected function doGetSiblingQueueSizes( array $types ) {
- $dbr = $this->getSlaveDB();
+ $dbr = $this->getReplicaDB();
$res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
[ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
* @throws JobQueueConnectionError
* @return DBConnRef
*/
- protected function getSlaveDB() {
+ protected function getReplicaDB() {
try {
return $this->getDB( DB_REPLICA );
} catch ( DBConnectionError $e ) {