Merge pull request from sfc-gh-huliu/applylog

blobrestore - apply mutation log
This commit is contained in:
Hui Liu 2022-12-02 13:03:13 -08:00 committed by GitHub
commit d76822bc12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 19 deletions
fdbclient
fdbserver

View File

@ -1031,6 +1031,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_FULL_RESTORE_MODE, false );
init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0);
init( BLOB_MANIFEST_RW_ROWS, isSimulated ? 10 : 1000);
init( BLOB_RESTORE_MLOGS_URL, isSimulated ? "file://simfdb/fdbblob/mlogs" : "");
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );

View File

@ -999,6 +999,7 @@ public:
bool BLOB_FULL_RESTORE_MODE;
double BLOB_MIGRATOR_CHECK_INTERVAL;
int BLOB_MANIFEST_RW_ROWS;
std::string BLOB_RESTORE_MLOGS_URL;
// Blob metadata
int64_t BLOB_METADATA_CACHE_TTL;

View File

@ -17,13 +17,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/BlobGranuleCommon.h"
#include <algorithm>
#include <string>
#include "flow/network.h"
#include "flow/flow.h"
#include "flow/ActorCollection.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include "flow/Platform.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/BlobConnectionProvider.h"
#include "fdbclient/FDBTypes.h"
@ -31,6 +34,7 @@
#include "fdbclient/SystemData.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbserver/ServerDBInfo.actor.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/MoveKeys.actor.h"
@ -38,9 +42,6 @@
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // has to be last include
#include "flow/network.h"
#include <algorithm>
#include <string>
#define ENABLE_DEBUG_MG true
@ -82,7 +83,7 @@ private:
if (!granules.empty()) {
self->blobGranules_ = granules;
for (BlobGranuleRestoreVersion granule : granules) {
TraceEvent("RestorableGranule")
TraceEvent("RestorableGranule", self->interf_.id())
.detail("GranuleId", granule.granuleID.toString())
.detail("KeyRange", granule.keyRange.toString())
.detail("Version", granule.version)
@ -167,7 +168,7 @@ private:
if (owning) {
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(id), keys, allKeys, serverKeysFalse));
dprint("Unassign {} from storage server {}\n", keys.toString(), id.toString());
TraceEvent("UnassignKeys").detail("Keys", keys.toString()).detail("From", id.toString());
TraceEvent("UnassignKeys", self->interf_.id()).detail("Keys", keys).detail("SS", id);
}
}
wait(tr.commit());
@ -183,9 +184,9 @@ private:
loop {
bool done = wait(checkProgress(self));
if (done) {
BlobRestoreStatus status(BlobRestorePhase::DONE);
wait(updateRestoreStatus(self->db_, normalKeys, status));
wait(updateRestoreStatus(self->db_, normalKeys, BlobRestoreStatus(BlobRestorePhase::APPLY_MLOGS)));
wait(applyMutationLogs(self));
wait(updateRestoreStatus(self->db_, normalKeys, BlobRestoreStatus(BlobRestorePhase::DONE)));
return Void();
}
wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL));
@ -221,7 +222,7 @@ private:
int progress = (total - incompleted) * 100 / total;
state bool done = incompleted == 0;
dprint("Migration progress :{}%. done {}\n", progress, done);
TraceEvent("BlobMigratorProgress").detail("Progress", progress).detail("Done", done);
TraceEvent("BlobMigratorProgress", self->interf_.id()).detail("Progress", progress);
BlobRestoreStatus status(BlobRestorePhase::MIGRATE, progress);
wait(updateRestoreStatus(self->db_, normalKeys, status));
return done;
@ -239,12 +240,12 @@ private:
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
Version currentVersion = wait(tr.getRawReadVersion());
Version expectedVersion = maxVersion(self);
if (currentVersion <= expectedVersion) {
tr.set(minRequiredCommitVersionKey, BinaryWriter::toValue(expectedVersion + 1, Unversioned()));
dprint("Advance version from {} to {}\n", currentVersion, expectedVersion);
TraceEvent("AdvanceVersion").detail("Current", currentVersion).detail("New", expectedVersion);
Version current = wait(tr.getRawReadVersion());
Version expected = maxVersion(self);
if (current <= expected) {
tr.set(minRequiredCommitVersionKey, BinaryWriter::toValue(expected + 1, Unversioned()));
dprint("Advance version from {} to {}\n", current, expected);
TraceEvent("AdvanceVersion", self->interf_.id()).detail("From", current).detail("To", expected);
wait(tr.commit());
}
return Void();
@ -254,6 +255,88 @@ private:
}
}
// Find mutation logs url
static std::string mlogsUrl(Reference<BlobMigrator> self) {
std::string url = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL;
// A special case for local directory.
// See FileBackupAgent.actor.cpp. if the container string describes a local directory then "/backup-<timestamp>"
// will be added to it. so we need to append this directory name to the url
if (url.find("file://") == 0) {
std::string path = url.substr(7);
path.erase(path.find_last_not_of("\\/") + 1); // Remove trailing slashes
std::vector<std::string> dirs = platform::listDirectories(path);
if (dirs.empty()) {
TraceEvent(SevError, "BlobMigratorMissingMutationLogs").detail("Url", url);
throw restore_missing_data();
}
// Pick the newest backup folder
std::sort(dirs.begin(), dirs.end());
std::string name = dirs.back();
url.erase(url.find_last_not_of("\\/") + 1); // Remove trailing slashes
return url + "/" + name;
}
return url;
}
// Apply mutation logs to blob granules so that they reach to a consistent version for all blob granules
ACTOR static Future<Void> applyMutationLogs(Reference<BlobMigrator> self) {
state std::string mutationLogsUrl = mlogsUrl(self);
TraceEvent("ApplyMutationLogs", self->interf_.id()).detail("Url", mutationLogsUrl);
// check last version in mutation logs
Optional<std::string> proxy; // unused
Optional<std::string> encryptionKeyFile; // unused
Reference<IBackupContainer> bc = IBackupContainer::openContainer(mutationLogsUrl, proxy, encryptionKeyFile);
BackupDescription desc = wait(bc->describeBackup());
if (!desc.contiguousLogEnd.present()) {
TraceEvent(SevError, "BlobMigratorInvalidMutationLogs").detail("Url", mutationLogsUrl);
throw restore_missing_data();
}
Version targetVersion = desc.contiguousLogEnd.get() - 1;
TraceEvent("ApplyMutationLogs", self->interf_.id()).detail("Version", targetVersion);
// restore to target version
Standalone<VectorRef<KeyRangeRef>> ranges;
Standalone<VectorRef<Version>> beginVersions;
for (auto& granule : self->blobGranules_) {
ranges.push_back(ranges.arena(), granule.keyRange);
beginVersions.push_back(beginVersions.arena(), granule.version);
}
std::string tagName = "blobrestore-" + self->interf_.id().shortString();
wait(submitRestore(self, KeyRef(tagName), KeyRef(mutationLogsUrl), ranges, beginVersions));
return Void();
}
// Submit restore task to backup agent
ACTOR static Future<Void> submitRestore(Reference<BlobMigrator> self,
Key tagName,
Key mutationLogsUrl,
Standalone<VectorRef<KeyRangeRef>> ranges,
Standalone<VectorRef<Version>> beginVersions) {
state Optional<std::string> proxy; // unused
state Optional<Database> origDb; // unused
TraceEvent("ApplyMutationLogsStart", self->interf_.id()).detail("Tag", tagName);
Version version = wait(self->backupAgent_.restore(self->db_,
origDb,
KeyRef(tagName),
KeyRef(mutationLogsUrl),
proxy,
ranges,
beginVersions,
WaitForComplete::True,
invalidVersion,
Verbose::False,
""_sr, // addPrefix
""_sr, // removePrefix
LockDB::False,
UnlockDB::False,
OnlyApplyMutationLogs::True));
TraceEvent("ApplyMutationLogsComplete", self->interf_.id()).detail("Version", version);
return Void();
}
// Main server loop
ACTOR static Future<Void> serverLoop(Reference<BlobMigrator> self) {
self->actors_.add(waitFailureServer(self->interf_.waitFailure.getFuture()));
@ -435,11 +518,12 @@ private:
BlobGranuleRestoreVersionVector blobGranules_;
BlobMigratorInterface interf_;
ActorCollection actors_;
FileBackupAgent backupAgent_;
};
// Main entry point
ACTOR Future<Void> blobMigrator(BlobMigratorInterface interf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("StartBlobMigrator").detail("Interface", interf.id().toString());
TraceEvent("StartBlobMigrator", interf.id()).detail("Interface", interf.id().toString());
dprint("Starting blob migrator {}\n", interf.id().toString());
try {
Reference<BlobMigrator> self = makeReference<BlobMigrator>(dbInfo, interf);