From cc3938316c32b3e60236ec7c8c33c550b0f2fd27 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Thu, 29 Mar 2012 14:47:39 -0700 Subject: [PATCH] [FileBackend] Syncing from journal support. * Added FileJournal::getChangeEntries() and FileBackend::getJournal(). * Added a script to sync one file backend from another, using the journal of the later. * Removed some overzealous exception catching in FileJournal. Change-Id: I6cc8d4fa2479dcf88878dc0b351b3cc92f4a5ad5 --- includes/filerepo/backend/FileBackend.php | 9 + .../backend/filejournal/DBFileJournal.php | 74 ++++-- .../backend/filejournal/FileJournal.php | 61 ++++- maintenance/syncFileBackend.php | 236 ++++++++++++++++++ 4 files changed, 345 insertions(+), 35 deletions(-) create mode 100644 maintenance/syncFileBackend.php diff --git a/includes/filerepo/backend/FileBackend.php b/includes/filerepo/backend/FileBackend.php index 544c9c28dc..672bae896f 100644 --- a/includes/filerepo/backend/FileBackend.php +++ b/includes/filerepo/backend/FileBackend.php @@ -693,6 +693,15 @@ abstract class FileBackend { return "mwstore://{$this->name}"; } + /** + * Get the file journal object for this backend + * + * @return FileJournal + */ + final public function getJournal() { + return $this->fileJournal; + } + /** * Check if a given path is a "mwstore://" path. * This does not do any further validation or any existence checks. diff --git a/includes/filerepo/backend/filejournal/DBFileJournal.php b/includes/filerepo/backend/filejournal/DBFileJournal.php index 1eb9ecada1..bd993c69bc 100644 --- a/includes/filerepo/backend/filejournal/DBFileJournal.php +++ b/includes/filerepo/backend/filejournal/DBFileJournal.php @@ -16,7 +16,7 @@ class DBFileJournal extends FileJournal { * Construct a new instance from configuration. * $config includes: * 'wiki' : wiki name to use for LoadBalancer - * + * * @param $config Array */ protected function __construct( array $config ) { @@ -27,16 +27,18 @@ class DBFileJournal extends FileJournal { /** * @see FileJournal::logChangeBatch() - * @return Status + * @return Status */ protected function doLogChangeBatch( array $entries, $batchId ) { $status = Status::newGood(); - $dbw = $this->getMasterDB(); - if ( !$dbw ) { + try { + $dbw = $this->getMasterDB(); + } catch ( DBError $e ) { $status->fatal( 'filejournal-fail-dbconnect', $this->backend ); return $status; } + $now = wfTimestamp( TS_UNIX ); $data = array(); @@ -64,9 +66,39 @@ class DBFileJournal extends FileJournal { return $status; } + /** + * @see FileJournal::doGetChangeEntries() + * @return Array + * @throws DBError + */ + protected function doGetChangeEntries( $start, $limit ) { + $dbw = $this->getMasterDB(); + + $res = $dbw->select( 'filejournal', '*', + array( + 'fj_backend' => $this->backend, + 'fj_id >= ' . $dbw->addQuotes( (int)$start ) ), // $start may be 0 + __METHOD__, + array_merge( array( 'ORDER BY' => 'fj_id ASC' ), + $limit ? array( 'LIMIT' => $limit ) : array() ) + ); + + $entries = array(); + foreach ( $res as $row ) { + $item = array(); + foreach ( (array)$row as $key => $value ) { + $item[substr( $key, 3 )] = $value; // "fj_op" => "op" + } + $entries[] = $item; + } + + return $entries; + } + /** * @see FileJournal::purgeOldLogs() * @return Status + * @throws DBError */ protected function doPurgeOldLogs() { $status = Status::newGood(); @@ -75,38 +107,26 @@ class DBFileJournal extends FileJournal { } $dbw = $this->getMasterDB(); - if ( !$dbw ) { - $status->fatal( 'filejournal-fail-dbconnect', $this->backend ); - return $status; - } $dbCutoff = $dbw->timestamp( time() - 86400 * $this->ttlDays ); - try { - $dbw->begin(); - $dbw->delete( 'filejournal', - array( 'fj_timestamp < ' . $dbw->addQuotes( $dbCutoff ) ), - __METHOD__ - ); - $dbw->commit(); - } catch ( DBError $e ) { - $status->fatal( 'filejournal-fail-dbquery', $this->backend ); - return $status; - } + $dbw->begin(); + $dbw->delete( 'filejournal', + array( 'fj_timestamp < ' . $dbw->addQuotes( $dbCutoff ) ), + __METHOD__ + ); + $dbw->commit(); return $status; } /** * Get a master connection to the logging DB - * - * @return DatabaseBase|null + * + * @return DatabaseBase + * @throws DBError */ protected function getMasterDB() { - try { - $lb = wfGetLBFactory()->newMainLB(); - return $lb->getConnection( DB_MASTER, array(), $this->wiki ); - } catch ( DBConnectionError $e ) { - return null; - } + $lb = wfGetLBFactory()->newMainLB(); + return $lb->getConnection( DB_MASTER, array(), $this->wiki ); } } diff --git a/includes/filerepo/backend/filejournal/FileJournal.php b/includes/filerepo/backend/filejournal/FileJournal.php index f60b7f9b4e..738a5c96bb 100644 --- a/includes/filerepo/backend/filejournal/FileJournal.php +++ b/includes/filerepo/backend/filejournal/FileJournal.php @@ -26,7 +26,7 @@ abstract class FileJournal { * Construct a new instance from configuration. * $config includes: * 'ttlDays' : days to keep log entries around (false means "forever") - * + * * @param $config Array */ protected function __construct( array $config ) { @@ -35,7 +35,7 @@ abstract class FileJournal { /** * Create an appropriate FileJournal object from config - * + * * @param $config Array * @param $backend string A registered file backend name * @return FileJournal @@ -52,7 +52,7 @@ abstract class FileJournal { /** * Get a statistically unique ID string - * + * * @return string <9 char TS_MW timestamp in base 36><22 random base 36 chars> */ final public function getTimestampedUUID() { @@ -71,7 +71,7 @@ abstract class FileJournal { * path : The storage path of the file * newSha1 : The final base 36 SHA-1 of the file * Note that 'false' should be used as the SHA-1 for non-existing files. - * + * * @param $entries Array List of file operations (each an array of parameters) * @param $batchId string UUID string that identifies the operation batch * @return Status @@ -85,17 +85,54 @@ abstract class FileJournal { /** * @see FileJournal::logChangeBatch() - * + * * @param $entries Array List of file operations (each an array of parameters) * @param $batchId string UUID string that identifies the operation batch * @return Status */ abstract protected function doLogChangeBatch( array $entries, $batchId ); + /** + * Get an array of file change log entries. + * A starting change ID and/or limit can be specified. + * + * The result as a list of associative arrays, each having: + * id : unique, monotonic, ID for this change + * batch_uuid : UUID for an operation batch + * backend : the backend name + * op : primitive operation (create,update,delete) + * path : affected storage path + * path_sha1 : base 36 sha1 of the affected storage path + * timestamp : TS_MW timestamp of the batch change + + * Also, $next is updated to the ID of the next entry. + * + * @param $start integer Starting change ID or null + * @param $limit integer Maximum number of items to return + * @param &$next string + * @return Array + */ + final public function getChangeEntries( $start = null, $limit = 0, &$next = null ) { + $entries = $this->doGetChangeEntries( $start, $limit ? $limit + 1 : 0 ); + if ( $limit && count( $entries ) > $limit ) { + $last = array_pop( $entries ); // remove the extra entry + $next = $last['id']; // update for next call + } else { + $next = null; // end of list + } + return $entries; + } + + /** + * @see FileJournal::getChangeEntries() + * @return Array + */ + abstract protected function doGetChangeEntries( $start, $limit ); + /** * Purge any old log entries - * - * @return Status + * + * @return Status */ final public function purgeOldLogs() { return $this->doPurgeOldLogs(); @@ -115,12 +152,20 @@ abstract class FileJournal { class NullFileJournal extends FileJournal { /** * @see FileJournal::logChangeBatch() - * @return Status + * @return Status */ protected function doLogChangeBatch( array $entries, $batchId ) { return Status::newGood(); } + /** + * @see FileJournal::doGetChangeEntries() + * @return Array + */ + protected function doGetChangeEntries( $start, $limit ) { + return array(); + } + /** * @see FileJournal::purgeOldLogs() * @return Status diff --git a/maintenance/syncFileBackend.php b/maintenance/syncFileBackend.php new file mode 100644 index 0000000000..0d5c9deec6 --- /dev/null +++ b/maintenance/syncFileBackend.php @@ -0,0 +1,236 @@ +mDescription = "Sync one file backend with another using the journal"; + $this->addOption( 'src', 'Name of backend to sync from', true, true ); + $this->addOption( 'dst', 'Name of destination backend to sync', true, true ); + $this->addOption( 'start', 'Starting journal ID', false, true ); + $this->addOption( 'end', 'Ending journal ID', false, true ); + $this->addOption( 'posdir', 'Directory to read/record journal positions', false, true ); + $this->addOption( 'verbose', 'Verbose mode', false, false, 'v' ); + $this->setBatchSize( 50 ); + } + + public function execute() { + $src = FileBackendGroup::singleton()->get( $this->getOption( 'src' ) ); + $dst = FileBackendGroup::singleton()->get( $this->getOption( 'dst' ) ); + + $posFile = $this->getOption( 'posdir' ) + ? $this->getOption( 'posdir' ) . '/' . wfWikiID() + : false; + + $start = $this->getOption( 'start', 0 ); + if ( !$start && $posFile ) { + $start = is_file( $posFile ) + ? (int)trim( file_get_contents( $posFile ) ) + : 0; + ++$start; // we already did this ID, start with the next one + $startFromPosFile = true; + } else { + $startFromPosFile = false; + } + $end = $this->getOption( 'end', INF ); + + $this->output( "Synchronizing backend '{$dst->getName()}' to '{$src->getName()}'...\n" ); + $this->output( "Starting journal position is $start.\n" ); + if ( is_finite( $end ) ) { + $this->output( "Ending journal position is $end.\n" ); + } + + // Actually sync the dest backend with the reference backend + $lastOKPos = $this->syncBackends( $src, $dst, $start, $end ); + + // Update the sync position file + if ( $startFromPosFile && $lastOKPos >= $start ) { // successfully advanced + file_put_contents( $posFile, $lastOKPos, LOCK_EX ); + $this->output( "Updated journal position file.\n" ); + } + + if ( $lastOKPos === false ) { + if ( !$start ) { + $this->output( "No journal entries found.\n" ); + } else { + $this->output( "No new journal entries found.\n" ); + } + } else { + $this->output( "Stopped synchronization at journal position $lastOKPos.\n" ); + } + + if ( $this->isQuiet() ) { + print $lastOKPos; // give a single machine-readable number + } + } + + /** + * Sync $dst backend to $src backend based on the $src logs given after $start. + * Returns the journal entry ID this advanced to and handled (inclusive). + * + * @param $src FileBackend + * @param $dst FileBackend + * @param $start integer Starting journal position + * @param $end integer Starting journal position + * @return integer|false Journal entry ID or false if there are none + */ + protected function syncBackends( FileBackend $src, FileBackend $dst, $start, $end ) { + $lastOKPos = 0; // failed + $first = true; // first batch + + if ( $start > $end ) { // sanity + $this->error( "Error: given starting ID greater than ending ID.", 1 ); + } + + do { + $limit = min( $this->mBatchSize, $end - $start + 1 ); // don't go pass ending ID + $this->output( "Doing id $start to " . ( $start + $limit - 1 ) . "...\n" ); + + $entries = $src->getJournal()->getChangeEntries( $start, $limit, $next ); + $start = $next; // start where we left off next time + if ( $first && !count( $entries ) ) { + return false; // nothing to do + } + $first = false; + + $lastPosInBatch = 0; + $pathsInBatch = array(); // changed paths + foreach ( $entries as $entry ) { + if ( $entry['op'] !== 'null' ) { // null ops are just for reference + $pathsInBatch[$entry['path']] = 1; // remove duplicates + } + $lastPosInBatch = $entry['id']; + } + + $status = $this->syncFileBatch( array_keys( $pathsInBatch ), $src, $dst ); + if ( $status->isOK() ) { + $lastOKPos = max( $lastOKPos, $lastPosInBatch ); + } else { + $this->output( print_r( $status->getErrorsArray(), true ) ); + break; // no gaps; everything up to $lastPos must be OK + } + + if ( !$start ) { + $this->output( "End of journal entries.\n" ); + } + } while ( $start && $start <= $end ); + + return $lastOKPos; + } + + /** + * Sync particular files of backend $src to the corresponding $dst backend files + * + * @param $paths Array + * @param $src FileBackend + * @param $dst FileBackend + * @return Status + */ + protected function syncFileBatch( array $paths, FileBackend $src, FileBackend $dst ) { + $status = Status::newGood(); + if ( !count( $paths ) ) { + return $status; // nothing to do + } + + // Source: convert internal backend names (FileBackendMultiWrite) to the public one + $sPaths = $this->replaceNamePaths( $paths, $src ); + // Destination: get corresponding path name + $dPaths = $this->replaceNamePaths( $paths, $dst ); + + // Lock the live backend paths from modification + $sLock = $src->getScopedFileLocks( $sPaths, LockManager::LOCK_UW, $status ); + $eLock = $dst->getScopedFileLocks( $dPaths, LockManager::LOCK_EX, $status ); + if ( !$status->isOK() ) { + return $status; + } + + $ops = array(); + $fsFiles = array(); + foreach ( $sPaths as $i => $sPath ) { + $dPath = $dPaths[$i]; // destination + $sExists = $src->fileExists( array( 'src' => $sPath, 'latest' => 1 ) ); + if ( $sExists === true ) { // exists in source + if ( $this->filesAreSame( $src, $dst, $sPath, $dPath ) ) { + continue; // avoid local copies for non-FS backends + } + // Note: getLocalReference() is fast for FS backends + $fsFile = $src->getLocalReference( array( 'src' => $sPath, 'latest' => 1 ) ); + if ( !$fsFile ) { + $this->error( "Unable to sync '$dPath': could not get local copy." ); + $status->fatal( 'backend-fail-internal', $src->getName() ); + return $status; + } + $fsFiles[] = $fsFile; // keep TempFSFile objects alive as needed + // Note: prepare() is usually fast for key/value backends + $status->merge( $dst->prepare( array( 'dir' => dirname( $dPath ) ) ) ); + if ( !$status->isOK() ) { + return $status; + } + $ops[] = array( 'op' => 'store', + 'src' => $fsFile->getPath(), 'dst' => $dPath, 'overwrite' => 1 ); + } elseif ( $sExists === false ) { // does not exist in source + $ops[] = array( 'op' => 'delete', 'src' => $dPath, 'ignoreMissingSource' => 1 ); + } else { // error + $this->error( "Unable to sync '$dPath': could not stat file." ); + $status->fatal( 'backend-fail-internal', $src->getName() ); + return $status; + } + } + + $status->merge( $dst->doOperations( $ops, + array( 'nonLocking' => 1, 'nonJournaled' => 1 ) ) ); + if ( $status->isOK() && $this->getOption( 'verbose' ) ) { + $this->output( "Synchronized these file(s):\n" . implode( "\n", $dPaths ) . "\n" ); + } + + return $status; + } + + /** + * Substitute the backend name of storage paths with that of a given one + * + * @param $paths Array|string List of paths or single string path + * @return Array|string + */ + protected function replaceNamePaths( $paths, FileBackend $backend ) { + return preg_replace( + '!^mwstore://([^/]+)!', + StringUtils::escapeRegexReplacement( "mwstore://" . $backend->getName() ), + $paths // string or array + ); + } + + protected function filesAreSame( FileBackend $src, FileBackend $dst, $sPath, $dPath ) { + return ( + ( $src->getFileSize( array( 'src' => $sPath ) ) + === $dst->getFileSize( array( 'src' => $dPath ) ) // short-circuit + ) && ( $src->getFileSha1Base36( array( 'src' => $sPath ) ) + === $dst->getFileSha1Base36( array( 'src' => $dPath ) ) + ) + ); + } +} + +$maintClass = "SyncFileBackend"; +require_once( RUN_MAINTENANCE_IF_MAIN ); -- 2.20.1