*/
use MediaWiki\Logger\LegacyLogger;
+use MediaWiki\MediaWikiServices;
$optionsWithArgs = RecompressTracked::getOptionsWithArgs();
require __DIR__ . '/../commandLine.inc';
public $numProcs = 1;
public $numBatches = 0;
public $pageBlobClass, $orphanBlobClass;
- public $slavePipes, $slaveProcs, $prevSlaveId;
+ public $replicaPipes, $replicaProcs, $prevReplicaId;
public $copyOnly = false;
public $isChild = false;
- public $slaveId = false;
+ public $replicaId = false;
public $noCount = false;
public $debugLog, $infoLog, $criticalLog;
public $store;
private static $optionsWithArgs = [
'procs',
- 'slave-id',
+ 'replica-id',
'debug-log',
'info-log',
'critical-log'
'procs' => 'numProcs',
'copy-only' => 'copyOnly',
'child' => 'isChild',
- 'slave-id' => 'slaveId',
+ 'replica-id' => 'replicaId',
'debug-log' => 'debugLog',
'info-log' => 'infoLog',
'critical-log' => 'criticalLog',
$this->store = new ExternalStoreDB;
if ( !$this->isChild ) {
$GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
- } elseif ( $this->slaveId !== false ) {
- $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: ";
+ } elseif ( $this->replicaId !== false ) {
+ $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->replicaId}: ";
}
$this->pageBlobClass = function_exists( 'xdiff_string_bdiff' ) ?
'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob';
function logToFile( $msg, $file ) {
$header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid();
- if ( $this->slaveId !== false ) {
- $header .= "({$this->slaveId})";
+ if ( $this->replicaId !== false ) {
+ $header .= "({$this->replicaId})";
}
$header .= ' ' . wfWikiID();
LegacyLogger::emit( sprintf( "%-50s %s\n", $header, $msg ), $file );
}
$this->syncDBs();
- $this->startSlaveProcs();
+ $this->startReplicaProcs();
$this->doAllPages();
$this->doAllOrphans();
- $this->killSlaveProcs();
+ $this->killReplicaProcs();
}
/**
* This necessary because text recompression is slow: loading, compressing and
* writing are all slow.
*/
- function startSlaveProcs() {
+ function startReplicaProcs() {
$cmd = 'php ' . wfEscapeShellArg( __FILE__ );
foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
- if ( $cmdOption == 'slave-id' ) {
+ if ( $cmdOption == 'replica-id' ) {
continue;
} elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
$cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption );
' --wiki ' . wfEscapeShellArg( wfWikiID() ) .
' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters );
- $this->slavePipes = $this->slaveProcs = [];
+ $this->replicaPipes = $this->replicaProcs = [];
for ( $i = 0; $i < $this->numProcs; $i++ ) {
$pipes = [];
$spec = [
[ 'file', 'php://stderr', 'w' ]
];
MediaWiki\suppressWarnings();
- $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes );
+ $proc = proc_open( "$cmd --replica-id $i", $spec, $pipes );
MediaWiki\restoreWarnings();
if ( !$proc ) {
$this->critical( "Error opening replica DB process: $cmd" );
exit( 1 );
}
- $this->slaveProcs[$i] = $proc;
- $this->slavePipes[$i] = $pipes[0];
+ $this->replicaProcs[$i] = $proc;
+ $this->replicaPipes[$i] = $pipes[0];
}
- $this->prevSlaveId = -1;
+ $this->prevReplicaId = -1;
}
/**
* Gracefully terminate the child processes
*/
- function killSlaveProcs() {
+ function killReplicaProcs() {
$this->info( "Waiting for replica DB processes to finish..." );
for ( $i = 0; $i < $this->numProcs; $i++ ) {
- $this->dispatchToSlave( $i, 'quit' );
+ $this->dispatchToReplica( $i, 'quit' );
}
for ( $i = 0; $i < $this->numProcs; $i++ ) {
- $status = proc_close( $this->slaveProcs[$i] );
+ $status = proc_close( $this->replicaProcs[$i] );
if ( $status ) {
$this->critical( "Warning: child #$i exited with status $status" );
}
*/
function dispatch( /*...*/ ) {
$args = func_get_args();
- $pipes = $this->slavePipes;
+ $pipes = $this->replicaPipes;
$numPipes = stream_select( $x = [], $pipes, $y = [], 3600 );
if ( !$numPipes ) {
$this->critical( "Error waiting to write to replica DBs. Aborting" );
exit( 1 );
}
for ( $i = 0; $i < $this->numProcs; $i++ ) {
- $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs;
- if ( isset( $pipes[$slaveId] ) ) {
- $this->prevSlaveId = $slaveId;
- $this->dispatchToSlave( $slaveId, $args );
+ $replicaId = ( $i + $this->prevReplicaId + 1 ) % $this->numProcs;
+ if ( isset( $pipes[$replicaId] ) ) {
+ $this->prevReplicaId = $replicaId;
+ $this->dispatchToReplica( $replicaId, $args );
return;
}
/**
* Dispatch a command to a specified replica DB
- * @param int $slaveId
+ * @param int $replicaId
* @param array|string $args
*/
- function dispatchToSlave( $slaveId, $args ) {
+ function dispatchToReplica( $replicaId, $args ) {
$args = (array)$args;
$cmd = implode( ' ', $args );
- fwrite( $this->slavePipes[$slaveId], "$cmd\n" );
+ fwrite( $this->replicaPipes[$replicaId], "$cmd\n" );
}
/**
if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
$this->numBatches = 0;
$this->info( "$label: $current / $end" );
- wfWaitForSlaves();
+ MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
}
}
case 'quit':
return;
}
- wfWaitForSlaves();
+ MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
}
}
$startId = 0;
$trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
while ( true ) {
$res = $dbr->select(
[ 'blob_tracking', 'text' ],
$this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
$trx->commit();
$trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
- wfWaitForSlaves();
+ $lbFactory->waitForReplication();
}
}
}
*/
function finishIncompleteMoves( $conds ) {
$dbr = wfGetDB( DB_REPLICA );
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$startId = 0;
$conds = array_merge( $conds, [
$startId = $row->bt_text_id;
$this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
if ( $row->bt_text_id % 10 == 0 ) {
- wfWaitForSlaves();
+ $lbFactory->waitForReplication();
}
}
}
$trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$res = wfGetDB( DB_REPLICA )->select(
[ 'text', 'blob_tracking' ],
[ 'old_id', 'old_text', 'old_flags' ],
$this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
$trx->commit();
$trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
- wfWaitForSlaves();
+ $lbFactory->waitForReplication();
}
}
$this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );