From 0a332ee1c17975552784a5689cbf12367fa05302 Mon Sep 17 00:00:00 2001 From: Renxuan Wang Date: Mon, 28 Mar 2022 17:10:49 -0700 Subject: [PATCH 1/8] Add proxy option to backup and restore params. --- fdbbackup/FileConverter.actor.cpp | 8 ++- fdbbackup/FileDecoder.actor.cpp | 8 ++- fdbbackup/backup.actor.cpp | 72 ++++++++++++++----- fdbclient/BackupAgent.actor.h | 28 ++++++-- fdbclient/BackupContainer.actor.cpp | 16 +++-- fdbclient/BackupContainer.h | 9 ++- fdbclient/BackupContainerFileSystem.actor.cpp | 19 +++-- fdbclient/BackupContainerFileSystem.h | 6 +- fdbclient/FileBackupAgent.actor.cpp | 24 +++++-- fdbclient/RestoreInterface.h | 15 ++-- fdbclient/S3BlobStore.actor.cpp | 23 ++++-- fdbclient/S3BlobStore.h | 23 ++++-- fdbserver/BlobManager.actor.cpp | 2 +- fdbserver/BlobWorker.actor.cpp | 2 +- fdbserver/RestoreController.actor.cpp | 12 ++-- fdbserver/RestoreController.actor.h | 8 ++- fdbserver/RestoreLoader.actor.cpp | 2 +- fdbserver/RestoreLoader.actor.h | 4 +- fdbserver/RestoreWorkerInterface.actor.h | 4 +- fdbserver/workloads/AtomicRestore.actor.cpp | 1 + ...kupAndParallelRestoreCorrectness.actor.cpp | 7 +- .../workloads/BackupCorrectness.actor.cpp | 11 ++- fdbserver/workloads/BackupToBlob.actor.cpp | 1 + .../BlobGranuleCorrectnessWorkload.actor.cpp | 4 +- .../workloads/BlobGranuleVerifier.actor.cpp | 4 +- .../workloads/IncrementalBackup.actor.cpp | 8 ++- fdbserver/workloads/RestoreBackup.actor.cpp | 1 + fdbserver/workloads/RestoreFromBlob.actor.cpp | 4 +- fdbserver/workloads/SubmitBackup.actor.cpp | 1 + 29 files changed, 232 insertions(+), 95 deletions(-) diff --git a/fdbbackup/FileConverter.actor.cpp b/fdbbackup/FileConverter.actor.cpp index 1e48bd523d..8aeea5017f 100644 --- a/fdbbackup/FileConverter.actor.cpp +++ b/fdbbackup/FileConverter.actor.cpp @@ -101,6 +101,7 @@ std::vector getRelevantLogFiles(const std::vector& files, Vers struct ConvertParams { std::string container_url; + Optional proxy; Version begin = invalidVersion; Version end = invalidVersion; bool log_enabled = false; @@ -112,6 +113,10 @@ struct ConvertParams { std::string s; s.append("ContainerURL:"); s.append(container_url); + if (proxy.present()) { + s.append(" Proxy:"); + s.append(proxy.get()); + } s.append(" Begin:"); s.append(format("%" PRId64, begin)); s.append(" End:"); @@ -448,7 +453,8 @@ private: }; ACTOR Future convert(ConvertParams params) { - state Reference container = IBackupContainer::openContainer(params.container_url); + state Reference container = + IBackupContainer::openContainer(params.container_url, params.proxy, {}); state BackupFileList listing = wait(container->dumpFileList()); std::sort(listing.logs.begin(), listing.logs.end()); TraceEvent("Container").detail("URL", params.container_url).detail("Logs", listing.logs.size()); diff --git a/fdbbackup/FileDecoder.actor.cpp b/fdbbackup/FileDecoder.actor.cpp index 7e851bf6e0..7d9e27dcb1 100644 --- a/fdbbackup/FileDecoder.actor.cpp +++ b/fdbbackup/FileDecoder.actor.cpp @@ -94,6 +94,7 @@ void printBuildInformation() { struct DecodeParams { std::string container_url; + Optional proxy; std::string fileFilter; // only files match the filter will be decoded bool log_enabled = true; std::string log_dir, trace_format, trace_log_group; @@ -115,6 +116,10 @@ struct DecodeParams { std::string s; s.append("ContainerURL: "); s.append(container_url); + if (proxy.present()) { + s.append(", Proxy: "); + s.append(proxy.get()); + } s.append(", FileFilter: "); s.append(fileFilter); if (log_enabled) { @@ -526,7 +531,8 @@ ACTOR Future process_file(Reference container, LogFile f } ACTOR Future decode_logs(DecodeParams params) { - state Reference container = IBackupContainer::openContainer(params.container_url); + state Reference container = + IBackupContainer::openContainer(params.container_url, params.proxy, {}); state UID uid = deterministicRandom()->randomUniqueID(); state BackupFileList listing = wait(container->dumpFileList()); // remove partitioned logs diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 219d9ab820..431bc7798d 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -130,6 +130,7 @@ enum { OPT_USE_PARTITIONED_LOG, // Backup and Restore constants + OPT_PROXY, OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE, @@ -234,6 +235,7 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = { { OPT_NOSTOPWHENDONE, "--no-stop-when-done", SO_NONE }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, // Enable "-p" option after GA // { OPT_USE_PARTITIONED_LOG, "-p", SO_NONE }, { OPT_USE_PARTITIONED_LOG, "--partitioned-log-experimental", SO_NONE }, @@ -294,6 +296,7 @@ CSimpleOpt::SOption g_rgBackupModifyOptions[] = { { OPT_MOD_VERIFY_UID, "--verify-uid", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_SNAPSHOTINTERVAL, "-s", SO_REQ_SEP }, { OPT_SNAPSHOTINTERVAL, "--snapshot-interval", SO_REQ_SEP }, { OPT_MOD_ACTIVE_INTERVAL, "--active-snapshot-interval", SO_REQ_SEP }, @@ -482,6 +485,7 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = { { OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, { OPT_TRACE_DIR, "--logdir", SO_REQ_SEP }, { OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP }, @@ -517,6 +521,7 @@ CSimpleOpt::SOption g_rgBackupDeleteOptions[] = { #endif { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, { OPT_TRACE_DIR, "--logdir", SO_REQ_SEP }, { OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP }, @@ -546,6 +551,7 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = { { OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, { OPT_TRACE_DIR, "--logdir", SO_REQ_SEP }, { OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP }, @@ -578,6 +584,7 @@ CSimpleOpt::SOption g_rgBackupDumpOptions[] = { { OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, { OPT_TRACE_DIR, "--logdir", SO_REQ_SEP }, { OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP }, @@ -652,6 +659,7 @@ CSimpleOpt::SOption g_rgBackupQueryOptions[] = { { OPT_RESTORE_TIMESTAMP, "--query-restore-timestamp", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_RESTORE_VERSION, "-qrv", SO_REQ_SEP }, { OPT_RESTORE_VERSION, "--query-restore-version", SO_REQ_SEP }, { OPT_BACKUPKEYS_FILTER, "-k", SO_REQ_SEP }, @@ -689,6 +697,7 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = { { OPT_RESTORE_TIMESTAMP, "--timestamp", SO_REQ_SEP }, { OPT_KNOB, "--knob-", SO_REQ_SEP }, { OPT_RESTORECONTAINER, "-r", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_PREFIX_ADD, "--add-prefix", SO_REQ_SEP }, { OPT_PREFIX_REMOVE, "--remove-prefix", SO_REQ_SEP }, { OPT_TAGNAME, "-t", SO_REQ_SEP }, @@ -1920,6 +1929,7 @@ ACTOR Future submitDBBackup(Database src, ACTOR Future submitBackup(Database db, std::string url, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, Standalone> backupRanges, @@ -1977,6 +1987,7 @@ ACTOR Future submitBackup(Database db, else { wait(backupAgent.submitBackup(db, KeyRef(url), + proxy, initialSnapshotIntervalSeconds, snapshotIntervalSeconds, tagName, @@ -2260,8 +2271,9 @@ ACTOR Future changeDBBackupResumed(Database src, Database dest, bool pause } Reference openBackupContainer(const char* name, - std::string destinationContainer, - Optional const& encryptionKeyFile = {}) { + const std::string& destinationContainer, + const Optional& proxy, + const Optional& encryptionKeyFile) { // Error, if no dest container was specified if (destinationContainer.empty()) { fprintf(stderr, "ERROR: No backup destination was specified.\n"); @@ -2271,7 +2283,7 @@ Reference openBackupContainer(const char* name, Reference c; try { - c = IBackupContainer::openContainer(destinationContainer, encryptionKeyFile); + c = IBackupContainer::openContainer(destinationContainer, proxy, encryptionKeyFile); } catch (Error& e) { std::string msg = format("ERROR: '%s' on URL '%s'", e.what(), destinationContainer.c_str()); if (e.code() == error_code_backup_invalid_url && !IBackupContainer::lastOpenError.empty()) { @@ -2291,6 +2303,7 @@ ACTOR Future runRestore(Database db, std::string originalClusterFile, std::string tagName, std::string container, + Optional proxy, Standalone> ranges, Version beginVersion, Version targetVersion, @@ -2339,7 +2352,7 @@ ACTOR Future runRestore(Database db, state FileBackupAgent backupAgent; state Reference bc = - openBackupContainer(exeRestore.toString().c_str(), container, encryptionKeyFile); + openBackupContainer(exeRestore.toString().c_str(), container, proxy, encryptionKeyFile); // If targetVersion is unset then use the maximum restorable version from the backup description if (targetVersion == invalidVersion) { @@ -2368,6 +2381,7 @@ ACTOR Future runRestore(Database db, origDb, KeyRef(tagName), KeyRef(container), + proxy, ranges, waitForDone, targetVersion, @@ -2411,6 +2425,7 @@ ACTOR Future runRestore(Database db, ACTOR Future runFastRestoreTool(Database db, std::string tagName, std::string container, + Optional proxy, Standalone> ranges, Version dbVersion, bool performRestore, @@ -2440,7 +2455,7 @@ ACTOR Future runFastRestoreTool(Database db, if (performRestore) { if (dbVersion == invalidVersion) { TraceEvent("FastRestoreTool").detail("TargetRestoreVersion", "Largest restorable version"); - BackupDescription desc = wait(IBackupContainer::openContainer(container)->describeBackup()); + BackupDescription desc = wait(IBackupContainer::openContainer(container, proxy, {})->describeBackup()); if (!desc.maxRestorableVersion.present()) { fprintf(stderr, "The specified backup is not restorable to any version.\n"); throw restore_error(); @@ -2457,6 +2472,7 @@ ACTOR Future runFastRestoreTool(Database db, KeyRef(tagName), ranges, KeyRef(container), + proxy, dbVersion, LockDB::True, randomUID, @@ -2478,7 +2494,7 @@ ACTOR Future runFastRestoreTool(Database db, restoreVersion = dbVersion; } else { - state Reference bc = IBackupContainer::openContainer(container); + state Reference bc = IBackupContainer::openContainer(container, proxy, {}); state BackupDescription description = wait(bc->describeBackup()); if (dbVersion <= 0) { @@ -2522,9 +2538,10 @@ ACTOR Future runFastRestoreTool(Database db, ACTOR Future dumpBackupData(const char* name, std::string destinationContainer, + Optional proxy, Version beginVersion, Version endVersion) { - state Reference c = openBackupContainer(name, destinationContainer); + state Reference c = openBackupContainer(name, destinationContainer, proxy, {}); if (beginVersion < 0 || endVersion < 0) { BackupDescription desc = wait(c->describeBackup()); @@ -2552,6 +2569,7 @@ ACTOR Future dumpBackupData(const char* name, ACTOR Future expireBackupData(const char* name, std::string destinationContainer, + Optional proxy, Version endVersion, std::string endDatetime, Database db, @@ -2577,7 +2595,7 @@ ACTOR Future expireBackupData(const char* name, } try { - Reference c = openBackupContainer(name, destinationContainer, encryptionKeyFile); + Reference c = openBackupContainer(name, destinationContainer, proxy, encryptionKeyFile); state IBackupContainer::ExpireProgress progress; state std::string lastProgress; @@ -2623,9 +2641,11 @@ ACTOR Future expireBackupData(const char* name, return Void(); } -ACTOR Future deleteBackupContainer(const char* name, std::string destinationContainer) { +ACTOR Future deleteBackupContainer(const char* name, + std::string destinationContainer, + Optional proxy) { try { - state Reference c = openBackupContainer(name, destinationContainer); + state Reference c = openBackupContainer(name, destinationContainer, proxy, {}); state int numDeleted = 0; state Future done = c->deleteContainer(&numDeleted); @@ -2657,12 +2677,13 @@ ACTOR Future deleteBackupContainer(const char* name, std::string destinati ACTOR Future describeBackup(const char* name, std::string destinationContainer, + Optional proxy, bool deep, Optional cx, bool json, Optional encryptionKeyFile) { try { - Reference c = openBackupContainer(name, destinationContainer, encryptionKeyFile); + Reference c = openBackupContainer(name, destinationContainer, proxy, encryptionKeyFile); state BackupDescription desc = wait(c->describeBackup(deep)); if (cx.present()) wait(desc.resolveVersionTimes(cx.get())); @@ -2688,6 +2709,7 @@ static void reportBackupQueryError(UID operationId, JsonBuilderObject& result, s // resolved to that timestamp. ACTOR Future queryBackup(const char* name, std::string destinationContainer, + Optional proxy, Standalone> keyRangesFilter, Version restoreVersion, std::string originalClusterFile, @@ -2734,7 +2756,7 @@ ACTOR Future queryBackup(const char* name, } try { - state Reference bc = openBackupContainer(name, destinationContainer); + state Reference bc = openBackupContainer(name, destinationContainer, proxy, {}); if (restoreVersion == invalidVersion) { BackupDescription desc = wait(bc->describeBackup()); if (desc.maxRestorableVersion.present()) { @@ -2814,9 +2836,9 @@ ACTOR Future queryBackup(const char* name, return Void(); } -ACTOR Future listBackup(std::string baseUrl) { +ACTOR Future listBackup(std::string baseUrl, Optional proxy) { try { - std::vector containers = wait(IBackupContainer::listContainers(baseUrl)); + std::vector containers = wait(IBackupContainer::listContainers(baseUrl, proxy)); for (std::string container : containers) { printf("%s\n", container.c_str()); } @@ -2852,6 +2874,7 @@ ACTOR Future listBackupTags(Database cx) { struct BackupModifyOptions { Optional verifyUID; Optional destURL; + Optional proxy; Optional snapshotIntervalSeconds; Optional activeSnapshotIntervalSeconds; bool hasChanges() const { @@ -2869,7 +2892,7 @@ ACTOR Future modifyBackup(Database db, std::string tagName, BackupModifyOp state Reference bc; if (options.destURL.present()) { - bc = openBackupContainer(exeBackup.toString().c_str(), options.destURL.get()); + bc = openBackupContainer(exeBackup.toString().c_str(), options.destURL.get(), options.proxy, {}); try { wait(timeoutError(bc->create(), 30)); } catch (Error& e) { @@ -3342,6 +3365,7 @@ int main(int argc, char* argv[]) { break; } + Optional proxy; std::string destinationContainer; bool describeDeep = false; bool describeTimestamps = false; @@ -3595,6 +3619,10 @@ int main(int argc, char* argv[]) { return FDB_EXIT_ERROR; } break; + case OPT_PROXY: + proxy = args->OptionArg(); + modifyOptions.proxy = proxy; + break; case OPT_DESTCONTAINER: destinationContainer = args->OptionArg(); // If the url starts with '/' then prepend "file://" for backwards compatibility @@ -3962,9 +3990,10 @@ int main(int argc, char* argv[]) { if (!initCluster()) return FDB_EXIT_ERROR; // Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable. - openBackupContainer(argv[0], destinationContainer, encryptionKeyFile); + openBackupContainer(argv[0], destinationContainer, proxy, encryptionKeyFile); f = stopAfter(submitBackup(db, destinationContainer, + proxy, initialSnapshotIntervalSeconds, snapshotIntervalSeconds, backupKeys, @@ -4036,6 +4065,7 @@ int main(int argc, char* argv[]) { } f = stopAfter(expireBackupData(argv[0], destinationContainer, + proxy, expireVersion, expireDatetime, db, @@ -4047,7 +4077,7 @@ int main(int argc, char* argv[]) { case BackupType::DELETE_BACKUP: initTraceFile(); - f = stopAfter(deleteBackupContainer(argv[0], destinationContainer)); + f = stopAfter(deleteBackupContainer(argv[0], destinationContainer, proxy)); break; case BackupType::DESCRIBE: @@ -4060,6 +4090,7 @@ int main(int argc, char* argv[]) { // given, but quietly skip them if not. f = stopAfter(describeBackup(argv[0], destinationContainer, + proxy, describeDeep, describeTimestamps ? Optional(db) : Optional(), jsonOutput, @@ -4068,7 +4099,7 @@ int main(int argc, char* argv[]) { case BackupType::LIST: initTraceFile(); - f = stopAfter(listBackup(baseUrl)); + f = stopAfter(listBackup(baseUrl, proxy)); break; case BackupType::TAGS: @@ -4081,6 +4112,7 @@ int main(int argc, char* argv[]) { initTraceFile(); f = stopAfter(queryBackup(argv[0], destinationContainer, + proxy, backupKeysFilter, restoreVersion, restoreClusterFileOrig, @@ -4090,7 +4122,7 @@ int main(int argc, char* argv[]) { case BackupType::DUMP: initTraceFile(); - f = stopAfter(dumpBackupData(argv[0], destinationContainer, dumpBegin, dumpEnd)); + f = stopAfter(dumpBackupData(argv[0], destinationContainer, proxy, dumpBegin, dumpEnd)); break; case BackupType::UNDEFINED: @@ -4141,6 +4173,7 @@ int main(int argc, char* argv[]) { restoreClusterFileOrig, tagName, restoreContainer, + proxy, backupKeys, beginVersion, restoreVersion, @@ -4218,6 +4251,7 @@ int main(int argc, char* argv[]) { f = stopAfter(runFastRestoreTool(db, tagName, restoreContainer, + proxy, backupKeys, restoreVersion, !dryRun, diff --git a/fdbclient/BackupAgent.actor.h b/fdbclient/BackupAgent.actor.h index 94cb10d290..a938dcd51f 100644 --- a/fdbclient/BackupAgent.actor.h +++ b/fdbclient/BackupAgent.actor.h @@ -165,6 +165,7 @@ public: Key backupTag, Standalone> backupRanges, Key bcUrl, + Optional proxy, Version targetVersion, LockDB lockDB, UID randomUID, @@ -187,6 +188,7 @@ public: Optional cxOrig, Key tagName, Key url, + Optional proxy, Standalone> ranges, WaitForComplete = WaitForComplete::True, Version targetVersion = ::invalidVersion, @@ -202,6 +204,7 @@ public: Optional cxOrig, Key tagName, Key url, + Optional proxy, WaitForComplete waitForComplete = WaitForComplete::True, Version targetVersion = ::invalidVersion, Verbose verbose = Verbose::True, @@ -219,6 +222,7 @@ public: cxOrig, tagName, url, + proxy, rangeRef, waitForComplete, targetVersion, @@ -263,6 +267,7 @@ public: Future submitBackup(Reference tr, Key outContainer, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, std::string const& tagName, @@ -273,6 +278,7 @@ public: Optional const& encryptionKeyFileName = {}); Future submitBackup(Database cx, Key outContainer, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, std::string const& tagName, @@ -284,6 +290,7 @@ public: return runRYWTransactionFailIfLocked(cx, [=](Reference tr) { return submitBackup(tr, outContainer, + proxy, initialSnapshotIntervalSeconds, snapshotIntervalSeconds, tagName, @@ -720,20 +727,31 @@ template <> inline Tuple Codec>::pack(Reference const& bc) { Tuple tuple; tuple.append(StringRef(bc->getURL())); + if (bc->getProxy().present()) { + tuple.append(StringRef(bc->getProxy().get())); + } else { + tuple.append(StringRef()); + } if (bc->getEncryptionKeyFileName().present()) { tuple.append(bc->getEncryptionKeyFileName().get()); + } else { + tuple.append(StringRef()); } return tuple; } template <> inline Reference Codec>::unpack(Tuple const& val) { - ASSERT(val.size() == 1 || val.size() == 2); + ASSERT(val.size() == 3); auto url = val.getString(0).toString(); - Optional encryptionKeyFileName; - if (val.size() == 2) { - encryptionKeyFileName = val.getString(1).toString(); + Optional proxy; + if (!val.getString(1).empty()) { + proxy = val.getString(1).toString(); } - return IBackupContainer::openContainer(url, encryptionKeyFileName); + Optional encryptionKeyFileName; + if (!val.getString(2).empty()) { + encryptionKeyFileName = val.getString(2).toString(); + } + return IBackupContainer::openContainer(url, proxy, encryptionKeyFileName); } class BackupConfig : public KeyBackedConfig { diff --git a/fdbclient/BackupContainer.actor.cpp b/fdbclient/BackupContainer.actor.cpp index 37b2eae015..416d15c548 100644 --- a/fdbclient/BackupContainer.actor.cpp +++ b/fdbclient/BackupContainer.actor.cpp @@ -256,7 +256,8 @@ std::vector IBackupContainer::getURLFormats() { // Get an IBackupContainer based on a container URL string Reference IBackupContainer::openContainer(const std::string& url, - Optional const& encryptionKeyFileName) { + const Optional& proxy, + const Optional& encryptionKeyFileName) { static std::map> m_cache; Reference& r = m_cache[url]; @@ -273,7 +274,7 @@ Reference IBackupContainer::openContainer(const std::string& u // The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options. S3BlobStoreEndpoint::ParametersT backupParams; Reference bstore = - S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams); + S3BlobStoreEndpoint::fromString(url, proxy, &resource, &lastOpenError, &backupParams); if (resource.empty()) throw backup_invalid_url(); @@ -317,7 +318,7 @@ Reference IBackupContainer::openContainer(const std::string& u // Get a list of URLS to backup containers based on some a shorter URL. This function knows about some set of supported // URL types which support this sort of backup discovery. -ACTOR Future> listContainers_impl(std::string baseURL) { +ACTOR Future> listContainers_impl(std::string baseURL, Optional proxy) { try { StringRef u(baseURL); if (u.startsWith("file://"_sr)) { @@ -327,8 +328,8 @@ ACTOR Future> listContainers_impl(std::string baseURL) std::string resource; S3BlobStoreEndpoint::ParametersT backupParams; - Reference bstore = - S3BlobStoreEndpoint::fromString(baseURL, &resource, &IBackupContainer::lastOpenError, &backupParams); + Reference bstore = S3BlobStoreEndpoint::fromString( + baseURL, proxy, &resource, &IBackupContainer::lastOpenError, &backupParams); if (!resource.empty()) { TraceEvent(SevWarn, "BackupContainer") @@ -370,8 +371,9 @@ ACTOR Future> listContainers_impl(std::string baseURL) } } -Future> IBackupContainer::listContainers(const std::string& baseURL) { - return listContainers_impl(baseURL); +Future> IBackupContainer::listContainers(const std::string& baseURL, + const Optional& proxy) { + return listContainers_impl(baseURL, proxy); } ACTOR Future timeKeeperVersionFromDatetime(std::string datetime, Database db) { diff --git a/fdbclient/BackupContainer.h b/fdbclient/BackupContainer.h index 312e3b8ac7..36c9ff7cfa 100644 --- a/fdbclient/BackupContainer.h +++ b/fdbclient/BackupContainer.h @@ -156,6 +156,7 @@ struct BackupFileList { struct BackupDescription { BackupDescription() : snapshotBytes(0) {} std::string url; + Optional proxy; std::vector snapshots; int64_t snapshotBytes; // The version before which everything has been deleted by an expire @@ -294,11 +295,14 @@ public: // Get an IBackupContainer based on a container spec string static Reference openContainer(const std::string& url, - const Optional& encryptionKeyFileName = {}); + const Optional& proxy, + const Optional& encryptionKeyFileName); static std::vector getURLFormats(); - static Future> listContainers(const std::string& baseURL); + static Future> listContainers(const std::string& baseURL, + const Optional& proxy); std::string const& getURL() const { return URL; } + Optional const& getProxy() const { return proxy; } Optional const& getEncryptionKeyFileName() const { return encryptionKeyFileName; } static std::string lastOpenError; @@ -306,6 +310,7 @@ public: // TODO: change the following back to `private` once blob obj access is refactored protected: std::string URL; + Optional proxy; Optional encryptionKeyFileName; }; diff --git a/fdbclient/BackupContainerFileSystem.actor.cpp b/fdbclient/BackupContainerFileSystem.actor.cpp index 7acbd227f2..a4778ecc10 100644 --- a/fdbclient/BackupContainerFileSystem.actor.cpp +++ b/fdbclient/BackupContainerFileSystem.actor.cpp @@ -409,6 +409,7 @@ public: Version logStartVersionOverride) { state BackupDescription desc; desc.url = bc->getURL(); + desc.proxy = bc->getProxy(); TraceEvent("BackupContainerDescribe1") .detail("URL", bc->getURL()) @@ -1500,7 +1501,8 @@ Future BackupContainerFileSystem::createTestEncryptionKeyFile(std::string // code but returning a different template type because you can't cast between them Reference BackupContainerFileSystem::openContainerFS( const std::string& url, - Optional const& encryptionKeyFileName) { + const Optional& proxy, + const Optional& encryptionKeyFileName) { static std::map> m_cache; Reference& r = m_cache[url]; @@ -1517,7 +1519,7 @@ Reference BackupContainerFileSystem::openContainerFS( // The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options. S3BlobStoreEndpoint::ParametersT backupParams; Reference bstore = - S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams); + S3BlobStoreEndpoint::fromString(url, proxy, &resource, &lastOpenError, &backupParams); if (resource.empty()) throw backup_invalid_url(); @@ -1635,7 +1637,9 @@ ACTOR static Future testWriteSnapshotFile(Reference file, Key return Void(); } -ACTOR Future testBackupContainer(std::string url, Optional encryptionKeyFileName) { +ACTOR Future testBackupContainer(std::string url, + Optional proxy, + Optional encryptionKeyFileName) { state FlowLock lock(100e6); if (encryptionKeyFileName.present()) { @@ -1644,7 +1648,7 @@ ACTOR Future testBackupContainer(std::string url, Optional en printf("BackupContainerTest URL %s\n", url.c_str()); - state Reference c = IBackupContainer::openContainer(url, encryptionKeyFileName); + state Reference c = IBackupContainer::openContainer(url, proxy, encryptionKeyFileName); // Make sure container doesn't exist, then create it. try { @@ -1789,12 +1793,13 @@ ACTOR Future testBackupContainer(std::string url, Optional en } TEST_CASE("/backup/containers/localdir/unencrypted") { - wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {})); + wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {}, {})); return Void(); } TEST_CASE("/backup/containers/localdir/encrypted") { wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), + {}, format("%s/test_encryption_key", params.getDataDir().c_str()))); return Void(); } @@ -1803,7 +1808,7 @@ TEST_CASE("/backup/containers/url") { if (!g_network->isSimulated()) { const char* url = getenv("FDB_TEST_BACKUP_URL"); ASSERT(url != nullptr); - wait(testBackupContainer(url, {})); + wait(testBackupContainer(url, {}, {})); } return Void(); } @@ -1813,7 +1818,7 @@ TEST_CASE("/backup/containers_list") { state const char* url = getenv("FDB_TEST_BACKUP_URL"); ASSERT(url != nullptr); printf("Listing %s\n", url); - std::vector urls = wait(IBackupContainer::listContainers(url)); + std::vector urls = wait(IBackupContainer::listContainers(url, {})); for (auto& u : urls) { printf("%s\n", u.c_str()); } diff --git a/fdbclient/BackupContainerFileSystem.h b/fdbclient/BackupContainerFileSystem.h index 52c5d3fc54..784b113395 100644 --- a/fdbclient/BackupContainerFileSystem.h +++ b/fdbclient/BackupContainerFileSystem.h @@ -81,9 +81,9 @@ public: Future exists() override = 0; // TODO: refactor this to separate out the "deal with blob store" stuff from the backup business logic - static Reference openContainerFS( - const std::string& url, - const Optional& encryptionKeyFileName = {}); + static Reference openContainerFS(const std::string& url, + const Optional& proxy, + const Optional& encryptionKeyFileName); // Get a list of fileNames and their sizes in the container under the given path // Although not required, an implementation can avoid traversing unwanted subfolders diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index fc1dc558c7..b451747f08 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -4363,13 +4363,14 @@ public: Key backupTag, Standalone> backupRanges, Key bcUrl, + Optional proxy, Version targetVersion, LockDB lockDB, UID randomUID, Key addPrefix, Key removePrefix) { // Sanity check backup is valid - state Reference bc = IBackupContainer::openContainer(bcUrl.toString()); + state Reference bc = IBackupContainer::openContainer(bcUrl.toString(), proxy, {}); state BackupDescription desc = wait(bc->describeBackup()); wait(desc.resolveVersionTimes(cx)); @@ -4430,6 +4431,7 @@ public: struct RestoreRequest restoreRequest(restoreIndex, restoreTag, bcUrl, + proxy, targetVersion, range, deterministicRandom()->randomUniqueID(), @@ -4510,6 +4512,7 @@ public: ACTOR static Future submitBackup(FileBackupAgent* backupAgent, Reference tr, Key outContainer, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, std::string tagName, @@ -4555,7 +4558,8 @@ public: backupContainer = joinPath(backupContainer, std::string("backup-") + nowStr.toString()); } - state Reference bc = IBackupContainer::openContainer(backupContainer, encryptionKeyFileName); + state Reference bc = + IBackupContainer::openContainer(backupContainer, proxy, encryptionKeyFileName); try { wait(timeoutError(bc->create(), 30)); } catch (Error& e) { @@ -4642,6 +4646,7 @@ public: Reference tr, Key tagName, Key backupURL, + Optional proxy, Standalone> ranges, Version restoreVersion, Key addPrefix, @@ -4710,7 +4715,7 @@ public: // Point the tag to the new uid tag.set(tr, { uid, false }); - Reference bc = IBackupContainer::openContainer(backupURL.toString()); + Reference bc = IBackupContainer::openContainer(backupURL.toString(), proxy, {}); // Configure the new restore restore.tag().set(tr, tagName.toString()); @@ -5303,6 +5308,7 @@ public: Optional cxOrig, Key tagName, Key url, + Optional proxy, Standalone> ranges, WaitForComplete waitForComplete, Version targetVersion, @@ -5320,7 +5326,7 @@ public: throw restore_error(); } - state Reference bc = IBackupContainer::openContainer(url.toString()); + state Reference bc = IBackupContainer::openContainer(url.toString(), proxy, {}); state BackupDescription desc = wait(bc->describeBackup(true)); if (cxOrig.present()) { @@ -5360,6 +5366,7 @@ public: tr, tagName, url, + proxy, ranges, targetVersion, addPrefix, @@ -5499,6 +5506,7 @@ public: tagName, ranges, KeyRef(bc->getURL()), + bc->getProxy(), targetVersion, LockDB::True, randomUid, @@ -5520,6 +5528,7 @@ public: cx, tagName, KeyRef(bc->getURL()), + bc->getProxy(), ranges, WaitForComplete::True, ::invalidVersion, @@ -5561,13 +5570,14 @@ Future FileBackupAgent::submitParallelRestore(Database cx, Key backupTag, Standalone> backupRanges, Key bcUrl, + Optional proxy, Version targetVersion, LockDB lockDB, UID randomUID, Key addPrefix, Key removePrefix) { return FileBackupAgentImpl::submitParallelRestore( - cx, backupTag, backupRanges, bcUrl, targetVersion, lockDB, randomUID, addPrefix, removePrefix); + cx, backupTag, backupRanges, bcUrl, proxy, targetVersion, lockDB, randomUID, addPrefix, removePrefix); } Future FileBackupAgent::atomicParallelRestore(Database cx, @@ -5582,6 +5592,7 @@ Future FileBackupAgent::restore(Database cx, Optional cxOrig, Key tagName, Key url, + Optional proxy, Standalone> ranges, WaitForComplete waitForComplete, Version targetVersion, @@ -5598,6 +5609,7 @@ Future FileBackupAgent::restore(Database cx, cxOrig, tagName, url, + proxy, ranges, waitForComplete, targetVersion, @@ -5639,6 +5651,7 @@ Future FileBackupAgent::waitRestore(Database cx, Key tagName, Ver Future FileBackupAgent::submitBackup(Reference tr, Key outContainer, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, std::string const& tagName, @@ -5650,6 +5663,7 @@ Future FileBackupAgent::submitBackup(Reference return FileBackupAgentImpl::submitBackup(this, tr, outContainer, + proxy, initialSnapshotIntervalSeconds, snapshotIntervalSeconds, tagName, diff --git a/fdbclient/RestoreInterface.h b/fdbclient/RestoreInterface.h index bdb2499298..b7f3b04bcc 100644 --- a/fdbclient/RestoreInterface.h +++ b/fdbclient/RestoreInterface.h @@ -49,6 +49,7 @@ struct RestoreRequest { int index; Key tagName; Key url; + Optional proxy; Version targetVersion; KeyRange range; UID randomUid; @@ -64,27 +65,29 @@ struct RestoreRequest { explicit RestoreRequest(const int index, const Key& tagName, const Key& url, + const Optional& proxy, Version targetVersion, const KeyRange& range, const UID& randomUid, Key& addPrefix, Key removePrefix) - : index(index), tagName(tagName), url(url), targetVersion(targetVersion), range(range), randomUid(randomUid), - addPrefix(addPrefix), removePrefix(removePrefix) {} + : index(index), tagName(tagName), url(url), proxy(proxy), targetVersion(targetVersion), range(range), + randomUid(randomUid), addPrefix(addPrefix), removePrefix(removePrefix) {} // To change this serialization, ProtocolVersion::RestoreRequestValue must be updated, and downgrades need to be // considered template void serialize(Ar& ar) { - serializer(ar, index, tagName, url, targetVersion, range, randomUid, addPrefix, removePrefix, reply); + serializer(ar, index, tagName, url, proxy, targetVersion, range, randomUid, addPrefix, removePrefix, reply); } std::string toString() const { std::stringstream ss; ss << "index:" << std::to_string(index) << " tagName:" << tagName.contents().toString() - << " url:" << url.contents().toString() << " targetVersion:" << std::to_string(targetVersion) - << " range:" << range.toString() << " randomUid:" << randomUid.toString() - << " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString(); + << " url:" << url.contents().toString() << " proxy:" << (proxy.present() ? proxy.get() : "") + << " targetVersion:" << std::to_string(targetVersion) << " range:" << range.toString() + << " randomUid:" << randomUid.toString() << " addPrefix:" << addPrefix.toString() + << " removePrefix:" << removePrefix.toString(); return ss.str(); } }; diff --git a/fdbclient/S3BlobStore.actor.cpp b/fdbclient/S3BlobStore.actor.cpp index a4fa95616a..799f631c6e 100644 --- a/fdbclient/S3BlobStore.actor.cpp +++ b/fdbclient/S3BlobStore.actor.cpp @@ -162,7 +162,8 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const { return r; } -Reference S3BlobStoreEndpoint::fromString(std::string const& url, +Reference S3BlobStoreEndpoint::fromString(const std::string& url, + const Optional& proxy, std::string* resourceFromURL, std::string* error, ParametersT* ignored_parameters) { @@ -175,6 +176,13 @@ Reference S3BlobStoreEndpoint::fromString(std::string const if (prefix != LiteralStringRef("blobstore")) throw format("Invalid blobstore URL prefix '%s'", prefix.toString().c_str()); + Optional proxyHost, proxyPort; + if (proxy.present()) { + StringRef p(proxy.get()); + proxyHost = p.eat(":").toString(); + proxyPort = p.eat().toString(); + } + Optional cred; if (url.find("@") != std::string::npos) { cred = t.eat("@"); @@ -261,7 +269,8 @@ Reference S3BlobStoreEndpoint::fromString(std::string const creds = S3BlobStoreEndpoint::Credentials{ key.toString(), secret.toString(), securityToken.toString() }; } - return makeReference(host.toString(), service.toString(), creds, knobs, extraHeaders); + return makeReference( + host.toString(), service.toString(), proxyHost, proxyPort, creds, knobs, extraHeaders); } catch (std::string& err) { if (error != nullptr) @@ -624,11 +633,11 @@ ACTOR Future connect_impl(Referenceservice; + std::string host = b->host, service = b->service; if (service.empty()) service = b->knobs.secure_connection ? "https" : "http"; state Reference conn = - wait(INetworkConnections::net()->connect(b->host, service, b->knobs.secure_connection ? true : false)); + wait(INetworkConnections::net()->connect(host, service, b->knobs.secure_connection ? true : false)); wait(conn->connectHandshake()); TraceEvent("S3BlobStoreEndpointNewConnection") @@ -1609,7 +1618,7 @@ TEST_CASE("/backup/s3/v4headers") { S3BlobStoreEndpoint::Credentials creds{ "AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "" } // GET without query parameters { - S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", creds); + S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds); std::string verb("GET"); std::string resource("/test.txt"); HTTP::Headers headers; @@ -1624,7 +1633,7 @@ TEST_CASE("/backup/s3/v4headers") { // GET with query parameters { - S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", creds); + S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds); std::string verb("GET"); std::string resource("/test/examplebucket?Action=DescribeRegions&Version=2013-10-15"); HTTP::Headers headers; @@ -1639,7 +1648,7 @@ TEST_CASE("/backup/s3/v4headers") { // POST { - S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", creds); + S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", "proxy", "port", creds); std::string verb("POST"); std::string resource("/simple.json"); HTTP::Headers headers; diff --git a/fdbclient/S3BlobStore.h b/fdbclient/S3BlobStore.h index 21f39e1d0e..bd29675bae 100644 --- a/fdbclient/S3BlobStore.h +++ b/fdbclient/S3BlobStore.h @@ -99,11 +99,15 @@ public: }; S3BlobStoreEndpoint(std::string const& host, - std::string service, + std::string const& service, + Optional const& proxyHost, + Optional const& proxyPort, Optional const& creds, BlobKnobs const& knobs = BlobKnobs(), HTTP::Headers extraHeaders = HTTP::Headers()) - : host(host), service(service), credentials(creds), lookupKey(creds.present() && creds.get().key.empty()), + : host(host), service(service), proxyHost(proxyHost), proxyPort(proxyPort), + useProxy(proxyHost.present() && proxyPort.present()), credentials(creds), + lookupKey(creds.present() && creds.get().key.empty()), lookupSecret(creds.present() && creds.get().secret.empty()), knobs(knobs), extraHeaders(extraHeaders), requestRate(new SpeedLimit(knobs.requests_per_second, 1)), requestRateList(new SpeedLimit(knobs.list_requests_per_second, 1)), @@ -114,7 +118,7 @@ public: recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)), concurrentRequests(knobs.concurrent_requests), concurrentUploads(knobs.concurrent_uploads), concurrentLists(knobs.concurrent_lists) { - if (host.empty()) + if (host.empty() || (proxyHost.present() != proxyPort.present())) throw connection_string_invalid(); } @@ -132,10 +136,11 @@ public: // Parse url and return a S3BlobStoreEndpoint // If the url has parameters that S3BlobStoreEndpoint can't consume then an error will be thrown unless // ignored_parameters is given in which case the unconsumed parameters will be added to it. - static Reference fromString(std::string const& url, - std::string* resourceFromURL = nullptr, - std::string* error = nullptr, - ParametersT* ignored_parameters = nullptr); + static Reference fromString(const std::string& url, + const Optional& proxy, + std::string* resourceFromURL, + std::string* error, + ParametersT* ignored_parameters); // Get a normalized version of this URL with the given resource and any non-default BlobKnob values as URL // parameters in addition to the passed params string @@ -151,6 +156,10 @@ public: std::string host; std::string service; + Optional proxyHost; + Optional proxyPort; + bool useProxy; + Optional credentials; bool lookupKey; bool lookupSecret; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index b81fee7d70..736d5b7a59 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -2506,7 +2506,7 @@ ACTOR Future monitorPruneKeys(Reference self) { if (BM_DEBUG) { fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str()); } - self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); if (BM_DEBUG) { printf("BM constructed backup container\n"); } diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 791ee7a05a..906994922b 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -3021,7 +3021,7 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, if (BW_DEBUG) { fmt::print("BW constructing backup container from {0}\n", SERVER_KNOBS->BG_URL); } - self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); if (BW_DEBUG) { printf("BW constructed backup container\n"); } diff --git a/fdbserver/RestoreController.actor.cpp b/fdbserver/RestoreController.actor.cpp index 64d7d3d785..8092ebe39a 100644 --- a/fdbserver/RestoreController.actor.cpp +++ b/fdbserver/RestoreController.actor.cpp @@ -47,7 +47,8 @@ ACTOR static Future collectBackupFiles(Reference bc, RestoreRequest request); ACTOR static Future buildRangeVersions(KeyRangeMap* pRangeVersions, std::vector* pRangeFiles, - Key url); + Key url, + Optional proxy); ACTOR static Future processRestoreRequest(Reference self, Database cx, @@ -317,7 +318,7 @@ ACTOR static Future processRestoreRequest(Reference allFiles; state Version minRangeVersion = MAX_VERSION; - self->initBackupContainer(request.url); + self->initBackupContainer(request.url, request.proxy); // Get all backup files' description and save them to files state Version targetVersion = @@ -334,7 +335,7 @@ ACTOR static Future processRestoreRequest(Reference rangeVersions(minRangeVersion, allKeys.end); if (SERVER_KNOBS->FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE) { - wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url)); + wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url, request.proxy)); } else { // Debug purpose, dump range versions auto ranges = rangeVersions.ranges(); @@ -881,13 +882,14 @@ ACTOR static Future insertRangeVersion(KeyRangeMap* pRangeVersion // Expensive and slow operation that should not run in real prod. ACTOR static Future buildRangeVersions(KeyRangeMap* pRangeVersions, std::vector* pRangeFiles, - Key url) { + Key url, + Optional proxy) { if (!g_network->isSimulated()) { TraceEvent(SevError, "ExpensiveBuildRangeVersions") .detail("Reason", "Parsing all range files is slow and memory intensive"); return Void(); } - Reference bc = IBackupContainer::openContainer(url.toString()); + Reference bc = IBackupContainer::openContainer(url.toString(), proxy, {}); // Key ranges not in range files are empty; // Assign highest version to avoid applying any mutation in these ranges diff --git a/fdbserver/RestoreController.actor.h b/fdbserver/RestoreController.actor.h index 5c9a271f7a..77aa5e6494 100644 --- a/fdbserver/RestoreController.actor.h +++ b/fdbserver/RestoreController.actor.h @@ -446,13 +446,15 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted proxy) { if (bcUrl == url && bc.isValid()) { return; } - TraceEvent("FastRestoreControllerInitBackupContainer").detail("URL", url); + TraceEvent("FastRestoreControllerInitBackupContainer") + .detail("URL", url) + .detail("Proxy", proxy.present() ? proxy.get() : ""); bcUrl = url; - bc = IBackupContainer::openContainer(url.toString()); + bc = IBackupContainer::openContainer(url.toString(), proxy, {}); } }; diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 9aa1aadee3..1afabdcb95 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -262,7 +262,7 @@ ACTOR Future restoreLoaderCore(RestoreLoaderInterface loaderInterf, when(RestoreLoadFileRequest req = waitNext(loaderInterf.loadFile.getFuture())) { requestTypeStr = "loadFile"; hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty(); - self->initBackupContainer(req.param.url); + self->initBackupContainer(req.param.url, req.param.proxy); self->loadingQueue.push(req); if (!hasQueuedRequests) { self->hasPendingRequests->set(true); diff --git a/fdbserver/RestoreLoader.actor.h b/fdbserver/RestoreLoader.actor.h index b16e4c11fa..92b11a5a1c 100644 --- a/fdbserver/RestoreLoader.actor.h +++ b/fdbserver/RestoreLoader.actor.h @@ -226,12 +226,12 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted proxy) { if (bcUrl == url && bc.isValid()) { return; } bcUrl = url; - bc = IBackupContainer::openContainer(url.toString()); + bc = IBackupContainer::openContainer(url.toString(), proxy, {}); } }; diff --git a/fdbserver/RestoreWorkerInterface.actor.h b/fdbserver/RestoreWorkerInterface.actor.h index 065b22c468..3c2830514a 100644 --- a/fdbserver/RestoreWorkerInterface.actor.h +++ b/fdbserver/RestoreWorkerInterface.actor.h @@ -368,6 +368,7 @@ struct LoadingParam { bool isRangeFile; Key url; + Optional proxy; Optional rangeVersion; // range file's version int64_t blockSize; @@ -386,12 +387,13 @@ struct LoadingParam { template void serialize(Ar& ar) { - serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset); + serializer(ar, isRangeFile, url, proxy, rangeVersion, blockSize, asset); } std::string toString() const { std::stringstream str; str << "isRangeFile:" << isRangeFile << " url:" << url.toString() + << " proxy:" << (proxy.present() ? proxy.get() : "") << " rangeVersion:" << (rangeVersion.present() ? rangeVersion.get() : -1) << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString(); return str.str(); diff --git a/fdbserver/workloads/AtomicRestore.actor.cpp b/fdbserver/workloads/AtomicRestore.actor.cpp index 4c3c2703f9..86d90e1093 100644 --- a/fdbserver/workloads/AtomicRestore.actor.cpp +++ b/fdbserver/workloads/AtomicRestore.actor.cpp @@ -93,6 +93,7 @@ struct AtomicRestoreWorkload : TestWorkload { try { wait(backupAgent.submitBackup(cx, StringRef(backupContainer), + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), BackupAgentBase::getDefaultTagName(), diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index 890bdf6a3a..650ca6f2c6 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -222,6 +222,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { try { wait(backupAgent->submitBackup(cx, StringRef(backupContainer), + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), tag.toString(), @@ -377,6 +378,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { cx, self->backupTag, KeyRef(lastBackupContainer), + {}, WaitForComplete::True, ::invalidVersion, Verbose::True, @@ -478,6 +480,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { // the configuration to disable backup workers before restore. extraBackup = backupAgent.submitBackup(cx, LiteralStringRef("file://simfdb/backups/"), + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), self->backupTag.toString(), @@ -523,7 +526,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { .detail("BackupTag", printable(self->backupTag)); // start restoring - auto container = IBackupContainer::openContainer(lastBackupContainer->getURL()); + auto container = + IBackupContainer::openContainer(lastBackupContainer->getURL(), lastBackupContainer->getProxy(), {}); BackupDescription desc = wait(container->describeBackup()); ASSERT(self->usePartitionedLogs == desc.partitioned); ASSERT(desc.minRestorableVersion.present()); // We must have a valid backup now. @@ -566,6 +570,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { self->backupTag, self->backupRanges, KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), targetVersion, self->locked, randomID, diff --git a/fdbserver/workloads/BackupCorrectness.actor.cpp b/fdbserver/workloads/BackupCorrectness.actor.cpp index 92550a23bf..4c82762764 100644 --- a/fdbserver/workloads/BackupCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupCorrectness.actor.cpp @@ -266,6 +266,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { try { wait(backupAgent->submitBackup(cx, StringRef(backupContainer), + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), tag.toString(), @@ -423,6 +424,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, self->backupTag, KeyRef(lastBackupContainer), + {}, WaitForComplete::True, ::invalidVersion, Verbose::True, @@ -523,6 +525,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { try { extraBackup = backupAgent.submitBackup(cx, "file://simfdb/backups/"_sr, + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), self->backupTag.toString(), @@ -557,7 +560,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { .detail("RestoreAfter", self->restoreAfter) .detail("BackupTag", printable(self->backupTag)); - auto container = IBackupContainer::openContainer(lastBackupContainer->getURL()); + auto container = IBackupContainer::openContainer(lastBackupContainer->getURL(), + lastBackupContainer->getProxy(), + lastBackupContainer->getEncryptionKeyFileName()); BackupDescription desc = wait(container->describeBackup()); Version targetVersion = -1; @@ -593,6 +598,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, restoreTag, KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), WaitForComplete::True, targetVersion, Verbose::True, @@ -616,6 +622,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, restoreTag, KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), self->restoreRanges, WaitForComplete::True, targetVersion, @@ -646,6 +653,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, restoreTags[restoreIndex], KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), self->restoreRanges, WaitForComplete::True, ::invalidVersion, @@ -675,6 +683,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, restoreTags[restoreIndex], KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), WaitForComplete::True, ::invalidVersion, Verbose::True, diff --git a/fdbserver/workloads/BackupToBlob.actor.cpp b/fdbserver/workloads/BackupToBlob.actor.cpp index ee27e1a480..480ae62466 100644 --- a/fdbserver/workloads/BackupToBlob.actor.cpp +++ b/fdbserver/workloads/BackupToBlob.actor.cpp @@ -62,6 +62,7 @@ struct BackupToBlobWorkload : TestWorkload { wait(delay(self->backupAfter)); wait(backupAgent.submitBackup(cx, self->backupURL, + {}, self->initSnapshotInterval, self->snapshotInterval, self->backupTag.toString(), diff --git a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp index e0d309954e..fc6d3035ae 100644 --- a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp +++ b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp @@ -250,13 +250,13 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { if (BGW_DEBUG) { printf("Blob Granule Correctness constructing simulated backup container\n"); } - self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/"); + self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/", {}, {}); } else { if (BGW_DEBUG) { printf("Blob Granule Correctness constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str()); } - self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); if (BGW_DEBUG) { printf("Blob Granule Correctness constructed backup container\n"); } diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index cd97b1960b..ba49923bf1 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -90,13 +90,13 @@ struct BlobGranuleVerifierWorkload : TestWorkload { if (BGV_DEBUG) { printf("Blob Granule Verifier constructing simulated backup container\n"); } - bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/"); + bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/", {}, {}); } else { if (BGV_DEBUG) { printf("Blob Granule Verifier constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str()); } - bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); if (BGV_DEBUG) { printf("Blob Granule Verifier constructed backup container\n"); } diff --git a/fdbserver/workloads/IncrementalBackup.actor.cpp b/fdbserver/workloads/IncrementalBackup.actor.cpp index 688387023e..e40133ffd0 100644 --- a/fdbserver/workloads/IncrementalBackup.actor.cpp +++ b/fdbserver/workloads/IncrementalBackup.actor.cpp @@ -98,12 +98,12 @@ struct IncrementalBackupWorkload : TestWorkload { if (!backupContainer.isValid()) { TraceEvent("IBackupCheckListContainersAttempt").log(); state std::vector containers = - wait(IBackupContainer::listContainers(self->backupDir.toString())); + wait(IBackupContainer::listContainers(self->backupDir.toString(), {})); TraceEvent("IBackupCheckListContainersSuccess") .detail("Size", containers.size()) .detail("First", containers.front()); if (containers.size()) { - backupContainer = IBackupContainer::openContainer(containers.front()); + backupContainer = IBackupContainer::openContainer(containers.front(), {}, {}); } } state bool e = wait(backupContainer->exists()); @@ -152,6 +152,7 @@ struct IncrementalBackupWorkload : TestWorkload { try { wait(self->backupAgent.submitBackup(cx, self->backupDir, + {}, 0, 1e8, self->tag.toString(), @@ -219,7 +220,7 @@ struct IncrementalBackupWorkload : TestWorkload { } TraceEvent("IBackupStartListContainersAttempt").log(); state std::vector containers = - wait(IBackupContainer::listContainers(self->backupDir.toString())); + wait(IBackupContainer::listContainers(self->backupDir.toString(), {})); TraceEvent("IBackupStartListContainersSuccess") .detail("Size", containers.size()) .detail("First", containers.front()); @@ -229,6 +230,7 @@ struct IncrementalBackupWorkload : TestWorkload { cx, Key(self->tag.toString()), backupURL, + {}, WaitForComplete::True, invalidVersion, Verbose::True, diff --git a/fdbserver/workloads/RestoreBackup.actor.cpp b/fdbserver/workloads/RestoreBackup.actor.cpp index c7122bc107..c08fc7de70 100644 --- a/fdbserver/workloads/RestoreBackup.actor.cpp +++ b/fdbserver/workloads/RestoreBackup.actor.cpp @@ -114,6 +114,7 @@ struct RestoreBackupWorkload final : TestWorkload { cx, self->tag, Key(self->backupContainer->getURL()), + self->backupContainer->getProxy(), WaitForComplete::True, ::invalidVersion, Verbose::True))); diff --git a/fdbserver/workloads/RestoreFromBlob.actor.cpp b/fdbserver/workloads/RestoreFromBlob.actor.cpp index 482f22ded4..9d072bb731 100644 --- a/fdbserver/workloads/RestoreFromBlob.actor.cpp +++ b/fdbserver/workloads/RestoreFromBlob.actor.cpp @@ -61,8 +61,8 @@ struct RestoreFromBlobWorkload : TestWorkload { restoreRanges.push_back_deep(restoreRanges.arena(), normalKeys); wait(delay(self->restoreAfter)); - Version v = - wait(backupAgent.restore(cx, {}, self->backupTag, self->backupURL, restoreRanges, self->waitForComplete)); + Version v = wait( + backupAgent.restore(cx, {}, self->backupTag, self->backupURL, {}, restoreRanges, self->waitForComplete)); return Void(); } diff --git a/fdbserver/workloads/SubmitBackup.actor.cpp b/fdbserver/workloads/SubmitBackup.actor.cpp index 50759bf014..aa4dd13d9b 100644 --- a/fdbserver/workloads/SubmitBackup.actor.cpp +++ b/fdbserver/workloads/SubmitBackup.actor.cpp @@ -57,6 +57,7 @@ struct SubmitBackupWorkload final : TestWorkload { try { wait(self->backupAgent.submitBackup(cx, self->backupDir, + {}, self->initSnapshotInterval, self->snapshotInterval, self->tag.toString(), From 2f8e9d9de036290ff6d99590b2f883595b9c7fa6 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Mon, 28 Mar 2022 13:48:25 -0500 Subject: [PATCH 2/8] misc bg fixes --- fdbclient/StorageServerInterface.h | 4 ++-- fdbserver/BlobManager.actor.cpp | 2 ++ fdbserver/storageserver.actor.cpp | 6 +++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 6dce9351cb..18c2d1044d 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -924,7 +924,7 @@ struct OverlappingChangeFeedsReply { }; struct OverlappingChangeFeedsRequest { - constexpr static FileIdentifier file_identifier = 10726174; + constexpr static FileIdentifier file_identifier = 7228462; KeyRange range; Version minVersion; ReplyPromise reply; @@ -939,7 +939,7 @@ struct OverlappingChangeFeedsRequest { }; struct ChangeFeedVersionUpdateReply { - constexpr static FileIdentifier file_identifier = 11815134; + constexpr static FileIdentifier file_identifier = 4246160; Version version = 0; ChangeFeedVersionUpdateReply() {} diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index b81fee7d70..b096056784 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -1581,6 +1581,7 @@ static void addAssignment(KeyRangeMap>& map, } ACTOR Future recoverBlobManager(Reference bmData) { + state double recoveryStartTime = now(); state Promise workerListReady; bmData->addActor.send(checkBlobWorkerList(bmData, workerListReady)); wait(workerListReady.getFuture()); @@ -1836,6 +1837,7 @@ ACTOR Future recoverBlobManager(Reference bmData) { TraceEvent("BlobManagerRecovered", bmData->id) .detail("Epoch", bmData->epoch) + .detail("Duration", now() - recoveryStartTime) .detail("Granules", bmData->workerAssignments.size()) .detail("Assigned", explicitAssignments) .detail("Revoked", outOfDateAssignments.size()); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index aaf33f3ee7..a56632c1e6 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -858,7 +858,7 @@ public: CounterCollection cc; Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getMappedRangeQueries, getRangeStreamQueries, finishedQueries, lowPriorityQueries, rowsQueried, bytesQueried, watchQueries, - emptyQueries, feedRowsQueried, feedBytesQueried; + emptyQueries, feedRowsQueried, feedBytesQueried, feedStreamQueries, feedVersionQueries; // Bytes of the mutations that have been added to the memory of the storage server. When the data is durable // and cleared from the memory, we do not subtract it but add it to bytesDurable. @@ -930,6 +930,7 @@ public: lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc), bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc), feedRowsQueried("FeedRowsQueried", cc), feedBytesQueried("FeedBytesQueried", cc), + feedStreamQueries("FeedStreamQueries", cc), feedVersionQueries("FeedVersionQueries", cc), bytesInput("BytesInput", cc), logicalBytesInput("LogicalBytesInput", cc), logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc), kvCommitLogicalBytes("KVCommitLogicalBytes", cc), kvClearRanges("KVClearRanges", cc), @@ -2436,6 +2437,8 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES)); } + ++data->counters.feedStreamQueries; + wait(delay(0, TaskPriority::DefaultEndpoint)); try { @@ -2587,6 +2590,7 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques } ACTOR Future changeFeedVersionUpdateQ(StorageServer* data, ChangeFeedVersionUpdateRequest req) { + ++data->counters.feedVersionQueries; wait(data->version.whenAtLeast(req.minVersion)); wait(delay(0)); Version minVersion = data->minFeedVersionForAddress(req.reply.getEndpoint().getPrimaryAddress()); From 61474d5d548071183684bddf5e1d47b8f97424dd Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Mon, 28 Mar 2022 14:48:12 -0500 Subject: [PATCH 3/8] Future-proof blob granules with full file size --- fdbclient/BlobGranuleCommon.h | 9 +++++---- fdbclient/BlobGranuleFiles.cpp | 11 ++++++++--- fdbclient/SystemData.cpp | 9 ++++++--- fdbclient/SystemData.h | 4 ++-- fdbserver/BlobGranuleServerCommon.actor.cpp | 16 ++++++++++------ fdbserver/BlobGranuleServerCommon.actor.h | 5 +++-- fdbserver/BlobWorker.actor.cpp | 18 ++++++++++++------ 7 files changed, 46 insertions(+), 26 deletions(-) diff --git a/fdbclient/BlobGranuleCommon.h b/fdbclient/BlobGranuleCommon.h index c76e72342d..97074326d9 100644 --- a/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/BlobGranuleCommon.h @@ -52,19 +52,20 @@ struct BlobFilePointerRef { StringRef filename; int64_t offset; int64_t length; + int64_t fullFileLength; BlobFilePointerRef() {} - BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length) - : filename(to, filename), offset(offset), length(length) {} + BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length, int64_t fullFileLength) + : filename(to, filename), offset(offset), length(length), fullFileLength(fullFileLength) {} template void serialize(Ar& ar) { - serializer(ar, filename, offset, length); + serializer(ar, filename, offset, length, fullFileLength); } std::string toString() const { std::stringstream ss; - ss << filename.toString() << ":" << offset << ":" << length; + ss << filename.toString() << ":" << offset << ":" << length << ":" << fullFileLength; return std::move(ss).str(); } }; diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index 1697722fb7..469573e3d9 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -240,22 +240,27 @@ static void startLoad(const ReadBlobGranuleContext granuleContext, // Start load process for all files in chunk if (chunk.snapshotFile.present()) { std::string snapshotFname = chunk.snapshotFile.get().filename.toString(); - // FIXME: full file length won't always be length of read + // FIXME: remove when we implement file multiplexing + ASSERT(chunk.snapshotFile.get().offset == 0); + ASSERT(chunk.snapshotFile.get().length == chunk.snapshotFile.get().fullFileLength); loadIds.snapshotId = granuleContext.start_load_f(snapshotFname.c_str(), snapshotFname.size(), chunk.snapshotFile.get().offset, chunk.snapshotFile.get().length, - chunk.snapshotFile.get().length, + chunk.snapshotFile.get().fullFileLength, granuleContext.userContext); } loadIds.deltaIds.reserve(chunk.deltaFiles.size()); for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) { std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString(); + // FIXME: remove when we implement file multiplexing + ASSERT(chunk.deltaFiles[deltaFileIdx].offset == 0); + ASSERT(chunk.deltaFiles[deltaFileIdx].length == chunk.deltaFiles[deltaFileIdx].fullFileLength); int64_t deltaLoadId = granuleContext.start_load_f(deltaFName.c_str(), deltaFName.size(), chunk.deltaFiles[deltaFileIdx].offset, chunk.deltaFiles[deltaFileIdx].length, - chunk.deltaFiles[deltaFileIdx].length, + chunk.deltaFiles[deltaFileIdx].fullFileLength, granuleContext.userContext); loadIds.deltaIds.push_back(deltaLoadId); } diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 5c24441f32..42e0f83a26 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1190,23 +1190,26 @@ const KeyRange blobGranuleFileKeyRangeFor(UID granuleID) { return KeyRangeRef(startKey, strinc(startKey)); } -const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length) { +const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength) { BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); wr << filename; wr << offset; wr << length; + wr << fullFileLength; return wr.toValue(); } -std::tuple, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) { +std::tuple, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) { StringRef filename; int64_t offset; int64_t length; + int64_t fullFileLength; BinaryReader reader(value, IncludeVersion()); reader >> filename; reader >> offset; reader >> length; - return std::tuple(filename, offset, length); + reader >> fullFileLength; + return std::tuple(filename, offset, length, fullFileLength); } const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force) { diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 171130559e..fcbc20bf97 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -572,8 +572,8 @@ const Key blobGranuleFileKeyFor(UID granuleID, Version fileVersion, uint8_t file std::tuple decodeBlobGranuleFileKey(KeyRef const& key); const KeyRange blobGranuleFileKeyRangeFor(UID granuleID); -const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length); -std::tuple, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value); +const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength); +std::tuple, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value); const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force); std::tuple decodeBlobGranulePruneValue(ValueRef const& value); diff --git a/fdbserver/BlobGranuleServerCommon.actor.cpp b/fdbserver/BlobGranuleServerCommon.actor.cpp index 4792984d62..35b8d2e22f 100644 --- a/fdbserver/BlobGranuleServerCommon.actor.cpp +++ b/fdbserver/BlobGranuleServerCommon.actor.cpp @@ -60,13 +60,14 @@ ACTOR Future readGranuleFiles(Transaction* tr, Key* startKey, Key endKey, Standalone filename; int64_t offset; int64_t length; + int64_t fullFileLength; std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(it.key); ASSERT(gid == granuleID); - std::tie(filename, offset, length) = decodeBlobGranuleFileValue(it.value); + std::tie(filename, offset, length, fullFileLength) = decodeBlobGranuleFileValue(it.value); - BlobFileIndex idx(version, filename.toString(), offset, length); + BlobFileIndex idx(version, filename.toString(), offset, length, fullFileLength); if (fileType == 'S') { ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version); files->snapshotFiles.push_back(idx); @@ -168,14 +169,16 @@ void GranuleFiles::getFiles(Version beginVersion, Version lastIncluded = invalidVersion; if (snapshotF != snapshotFiles.end()) { chunk.snapshotVersion = snapshotF->version; - chunk.snapshotFile = BlobFilePointerRef(replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length); + chunk.snapshotFile = BlobFilePointerRef( + replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length, snapshotF->fullFileLength); lastIncluded = chunk.snapshotVersion; } else { chunk.snapshotVersion = invalidVersion; } while (deltaF != deltaFiles.end() && deltaF->version < readVersion) { - chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length); + chunk.deltaFiles.emplace_back_deep( + replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength); deltaBytesCounter += deltaF->length; ASSERT(lastIncluded < deltaF->version); lastIncluded = deltaF->version; @@ -183,7 +186,8 @@ void GranuleFiles::getFiles(Version beginVersion, } // include last delta file that passes readVersion, if it exists if (deltaF != deltaFiles.end() && lastIncluded < readVersion) { - chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length); + chunk.deltaFiles.emplace_back_deep( + replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength); deltaBytesCounter += deltaF->length; lastIncluded = deltaF->version; } @@ -194,7 +198,7 @@ static std::string makeTestFileName(Version v) { } static BlobFileIndex makeTestFile(Version v, int64_t len) { - return BlobFileIndex(v, makeTestFileName(v), 0, len); + return BlobFileIndex(v, makeTestFileName(v), 0, len, len); } static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) { diff --git a/fdbserver/BlobGranuleServerCommon.actor.h b/fdbserver/BlobGranuleServerCommon.actor.h index 399ae5b7b0..ea3f8c1e3b 100644 --- a/fdbserver/BlobGranuleServerCommon.actor.h +++ b/fdbserver/BlobGranuleServerCommon.actor.h @@ -49,11 +49,12 @@ struct BlobFileIndex { std::string filename; int64_t offset; int64_t length; + int64_t fullFileLength; BlobFileIndex() {} - BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length) - : version(version), filename(filename), offset(offset), length(length) {} + BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length, int64_t fullFileLength) + : version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength) {} // compare on version bool operator<(const BlobFileIndex& r) const { return version < r.version; } diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 791ee7a05a..79d0981e70 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -511,7 +511,8 @@ ACTOR Future writeDeltaFile(Reference bwData, numIterations++; Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D'); - Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize); + // TODO change once we support file multiplexing + Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize); tr->set(dfKey, dfValue); if (oldGranuleComplete.present()) { @@ -538,7 +539,8 @@ ACTOR Future writeDeltaFile(Reference bwData, if (BUGGIFY_WITH_PROB(0.01)) { wait(delay(deterministicRandom()->random01())); } - return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize); + // FIXME: change when we implement multiplexing + return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize); } catch (Error& e) { wait(tr->onError(e)); } @@ -648,7 +650,8 @@ ACTOR Future writeSnapshot(Reference bwData, wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno)); numIterations++; Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, version, 'S'); - Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize); + // TODO change once we support file multiplexing + Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize); tr->set(snapshotFileKey, snapshotFileValue); // create granule history at version if this is a new granule with the initial dump from FDB if (createGranuleHistory) { @@ -692,7 +695,8 @@ ACTOR Future writeSnapshot(Reference bwData, wait(delay(deterministicRandom()->random01())); } - return BlobFileIndex(version, fname, 0, serializedSize); + // FIXME: change when we implement multiplexing + return BlobFileIndex(version, fname, 0, serializedSize, serializedSize); } ACTOR Future dumpInitialSnapshotFromFDB(Reference bwData, @@ -797,7 +801,8 @@ ACTOR Future compactFromBlob(Reference bwData, ASSERT(snapshotVersion < version); - chunk.snapshotFile = BlobFilePointerRef(filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length); + chunk.snapshotFile = BlobFilePointerRef( + filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length, snapshotF.fullFileLength); compactBytesRead += snapshotF.length; int deltaIdx = files.deltaFiles.size() - 1; while (deltaIdx >= 0 && files.deltaFiles[deltaIdx].version > snapshotVersion) { @@ -807,7 +812,8 @@ ACTOR Future compactFromBlob(Reference bwData, Version lastDeltaVersion = invalidVersion; while (deltaIdx < files.deltaFiles.size() && files.deltaFiles[deltaIdx].version <= version) { BlobFileIndex deltaF = files.deltaFiles[deltaIdx]; - chunk.deltaFiles.emplace_back_deep(filenameArena, deltaF.filename, deltaF.offset, deltaF.length); + chunk.deltaFiles.emplace_back_deep( + filenameArena, deltaF.filename, deltaF.offset, deltaF.length, deltaF.fullFileLength); compactBytesRead += deltaF.length; lastDeltaVersion = files.deltaFiles[deltaIdx].version; deltaIdx++; From be0f0ce90393168e9b6c56a8fed0f5b35668606a Mon Sep 17 00:00:00 2001 From: akashhansda <99724223+akashhansda@users.noreply.github.com> Date: Mon, 28 Mar 2022 23:43:10 -0700 Subject: [PATCH 4/8] Update README.md Use https --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fbcd4d3ef6..e40bf6ae23 100755 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ The official docker image for building is [`foundationdb/build`](https://hub.doc To build outside the official docker image you'll need at least these dependencies: 1. Install cmake Version 3.13 or higher [CMake](https://cmake.org/) -1. Install [Mono](http://www.mono-project.com/download/stable/) +1. Install [Mono](https://www.mono-project.com/download/stable/) 1. Install [Ninja](https://ninja-build.org/) (optional, but recommended) If compiling for local development, please set `-DUSE_WERROR=ON` in @@ -177,7 +177,7 @@ Under Windows, only Visual Studio with ClangCl is supported 1. Install [Python](https://www.python.org/downloads/) if is not already installed by Visual Studio 1. (Optional) Install [OpenJDK 11](https://developers.redhat.com/products/openjdk/download) to build Java bindings 1. (Optional) Install [OpenSSL 3.x](https://slproweb.com/products/Win32OpenSSL.html) to build with TLS support -1. (Optional) Install [WIX Toolset](http://wixtoolset.org/) to build Windows installer +1. (Optional) Install [WIX Toolset](https://wixtoolset.org/) to build Windows installer 1. `mkdir build && cd build` 1. `cmake -G "Visual Studio 16 2019" -A x64 -T ClangCl ` 1. `msbuild /p:Configuration=Release foundationdb.sln` From e1775627ab7793e23920a74c191b748d6a51bd08 Mon Sep 17 00:00:00 2001 From: Renxuan Wang Date: Mon, 28 Mar 2022 22:15:24 -0700 Subject: [PATCH 5/8] Add a check on proxy format. --- fdbbackup/backup.actor.cpp | 4 ++++ fdbclient/S3BlobStore.actor.cpp | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 431bc7798d..a8b4218569 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -3621,6 +3621,10 @@ int main(int argc, char* argv[]) { break; case OPT_PROXY: proxy = args->OptionArg(); + if (!Hostname::isHostname(proxy.get()) && !NetworkAddress::parseOptional(proxy.get()).present()) { + fprintf(stderr, "ERROR: Proxy format should be either IP:port or host:port\n"); + return FDB_EXIT_ERROR; + } modifyOptions.proxy = proxy; break; case OPT_DESTCONTAINER: diff --git a/fdbclient/S3BlobStore.actor.cpp b/fdbclient/S3BlobStore.actor.cpp index 799f631c6e..edfc1d1bc0 100644 --- a/fdbclient/S3BlobStore.actor.cpp +++ b/fdbclient/S3BlobStore.actor.cpp @@ -178,6 +178,10 @@ Reference S3BlobStoreEndpoint::fromString(const std::string Optional proxyHost, proxyPort; if (proxy.present()) { + if (!Hostname::isHostname(proxy.get()) && !NetworkAddress::parseOptional(proxy.get()).present()) { + throw format("'%s' is not a valid value for proxy. Format should be either IP:port or host:port.", + proxy.get().c_str()); + } StringRef p(proxy.get()); proxyHost = p.eat(":").toString(); proxyPort = p.eat().toString(); From bc3e5cdaa17a6986fd9709cb5ca09dfb35c02fda Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Tue, 29 Mar 2022 12:02:56 -0700 Subject: [PATCH 6/8] fix cmake error when OPEN_FOR_IDE=ON --- bindings/c/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt index 62d369f4dc..c049e6f0fc 100644 --- a/bindings/c/CMakeLists.txt +++ b/bindings/c/CMakeLists.txt @@ -124,6 +124,7 @@ if(NOT WIN32) add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h) add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h) add_library(fdb_c_txn_size_test OBJECT test/txn_size_test.c test/test.h) + add_library(fdb_c_client_memory_test OBJECT test/client_memory_test.cpp test/unit/fdb_api.cpp test/unit/fdb_api.hpp) add_library(mako OBJECT ${MAKO_SRCS}) add_library(fdb_c_setup_tests OBJECT test/unit/setup_tests.cpp) add_library(fdb_c_unit_tests OBJECT ${UNIT_TEST_SRCS}) From d727e7648ecd23010d2561eed064ea83f5d33b9a Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 28 Mar 2022 14:33:59 -0700 Subject: [PATCH 7/8] Fix a few memory issues found by ASAN --- bindings/c/test/unit/fdb_api.cpp | 8 +++++++- bindings/c/test/unit/fdb_api.hpp | 7 ++++++- fdbcli/TenantCommands.actor.cpp | 18 +++++++++++++----- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/bindings/c/test/unit/fdb_api.cpp b/bindings/c/test/unit/fdb_api.cpp index 4fc715dbc5..b26d7bdf82 100644 --- a/bindings/c/test/unit/fdb_api.cpp +++ b/bindings/c/test/unit/fdb_api.cpp @@ -138,6 +138,12 @@ Tenant::Tenant(FDBDatabase* db, const uint8_t* name, int name_length) { } } +Tenant::~Tenant() { + if (tenant != nullptr) { + fdb_tenant_destroy(tenant); + } +} + // Transaction Transaction::Transaction(FDBDatabase* db) { if (fdb_error_t err = fdb_database_create_transaction(db, &tr_)) { @@ -146,7 +152,7 @@ Transaction::Transaction(FDBDatabase* db) { } } -Transaction::Transaction(Tenant tenant) { +Transaction::Transaction(Tenant& tenant) { if (fdb_error_t err = fdb_tenant_create_transaction(tenant.tenant, &tr_)) { std::cerr << fdb_get_error(err) << std::endl; std::abort(); diff --git a/bindings/c/test/unit/fdb_api.hpp b/bindings/c/test/unit/fdb_api.hpp index 5653d6e7cb..fcf1c7e5ca 100644 --- a/bindings/c/test/unit/fdb_api.hpp +++ b/bindings/c/test/unit/fdb_api.hpp @@ -206,6 +206,11 @@ public: class Tenant final { public: Tenant(FDBDatabase* db, const uint8_t* name, int name_length); + ~Tenant(); + Tenant(const Tenant&) = delete; + Tenant& operator=(const Tenant&) = delete; + Tenant(Tenant&&) = delete; + Tenant& operator=(Tenant&&) = delete; private: friend class Transaction; @@ -219,7 +224,7 @@ class Transaction final { public: // Given an FDBDatabase, initializes a new transaction. Transaction(FDBDatabase* db); - Transaction(Tenant tenant); + Transaction(Tenant& tenant); ~Transaction(); // Wrapper around fdb_transaction_reset. diff --git a/fdbcli/TenantCommands.actor.cpp b/fdbcli/TenantCommands.actor.cpp index c03bb17c88..6660893a1f 100644 --- a/fdbcli/TenantCommands.actor.cpp +++ b/fdbcli/TenantCommands.actor.cpp @@ -51,7 +51,9 @@ ACTOR Future createTenantCommandActor(Reference db, std::vector tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); try { if (!doneExistenceCheck) { - Optional existingTenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey))); + // Hold the reference to the standalone's memory + state ThreadFuture> existingTenantFuture = tr->get(tenantNameKey); + Optional existingTenant = wait(safeThreadFutureToFuture(existingTenantFuture)); if (existingTenant.present()) { throw tenant_already_exists(); } @@ -96,7 +98,9 @@ ACTOR Future deleteTenantCommandActor(Reference db, std::vector tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); try { if (!doneExistenceCheck) { - Optional existingTenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey))); + // Hold the reference to the standalone's memory + state ThreadFuture> existingTenantFuture = tr->get(tenantNameKey); + Optional existingTenant = wait(safeThreadFutureToFuture(existingTenantFuture)); if (!existingTenant.present()) { throw tenant_not_found(); } @@ -163,8 +167,10 @@ ACTOR Future listTenantsCommandActor(Reference db, std::vector< loop { try { - RangeResult tenants = wait(safeThreadFutureToFuture( - tr->getRange(firstGreaterOrEqual(beginTenantKey), firstGreaterOrEqual(endTenantKey), limit))); + // Hold the reference to the standalone's memory + state ThreadFuture kvsFuture = + tr->getRange(firstGreaterOrEqual(beginTenantKey), firstGreaterOrEqual(endTenantKey), limit); + RangeResult tenants = wait(safeThreadFutureToFuture(kvsFuture)); if (tenants.empty()) { if (tokens.size() == 1) { @@ -213,7 +219,9 @@ ACTOR Future getTenantCommandActor(Reference db, std::vector tenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey))); + // Hold the reference to the standalone's memory + state ThreadFuture> tenantFuture = tr->get(tenantNameKey); + Optional tenant = wait(safeThreadFutureToFuture(tenantFuture)); if (!tenant.present()) { throw tenant_not_found(); } From 7fc6dfa6c5abe914e58ba0c56a794fc217848a21 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Tue, 29 Mar 2022 13:16:41 -0500 Subject: [PATCH 8/8] Adding useful debugging trace events --- fdbclient/NativeAPI.actor.cpp | 1 + fdbserver/BlobManager.actor.cpp | 38 ++++++++++++++++++++++++++++++--- fdbserver/BlobWorker.actor.cpp | 31 +++++++++++++++++++-------- 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 4b21429737..7a041e1cd9 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7372,6 +7372,7 @@ ACTOR Future>> readBlobGranulesActor( fmt::print( "BG Mapping for [{0} - %{1}) too large!\n", keyRange.begin.printable(), keyRange.end.printable()); } + TraceEvent(SevWarn, "BGMappingTooLarge").detail("Range", range).detail("Max", 1000); throw unsupported_operation(); } ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY); diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 79b89f54dc..192475f4dd 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -211,6 +211,24 @@ struct SplitEvaluation { : epoch(epoch), seqno(seqno), inProgress(inProgress) {} }; +struct BlobManagerStats { + CounterCollection cc; + + // FIXME: pruning stats + + Counter granuleSplits; + Counter granuleWriteHotSplits; + Future logger; + + // Current stats maintained for a given blob worker process + explicit BlobManagerStats(UID id, double interval, std::unordered_map* workers) + : cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc), + granuleWriteHotSplits("GranuleWriteHotSplits", cc) { + specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); }); + logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics"); + } +}; + struct BlobManagerData : NonCopyable, ReferenceCounted { UID id; Database db; @@ -218,6 +236,8 @@ struct BlobManagerData : NonCopyable, ReferenceCounted { PromiseStream> addActor; Promise doLockCheck; + BlobManagerStats stats; + Reference bstore; std::unordered_map workersById; @@ -246,8 +266,9 @@ struct BlobManagerData : NonCopyable, ReferenceCounted { PromiseStream rangesToAssign; BlobManagerData(UID id, Database db, Optional dcId) - : id(id), db(db), dcId(dcId), knownBlobRanges(false, normalKeys.end), - restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {} + : id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById), + knownBlobRanges(false, normalKeys.end), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), + recruitingStream(0) {} }; ACTOR Future>> splitRange(Reference bmData, @@ -753,6 +774,7 @@ ACTOR Future monitorClientRanges(Reference bmData) { } for (KeyRangeRef range : rangesToRemove) { + TraceEvent("ClientBlobRangeRemoved", bmData->id).detail("Range", range); if (BM_DEBUG) { fmt::print( "BM Got range to revoke [{0} - {1})\n", range.begin.printable(), range.end.printable()); @@ -768,6 +790,7 @@ ACTOR Future monitorClientRanges(Reference bmData) { state std::vector>>> splitFutures; // Divide new ranges up into equal chunks by using SS byte sample for (KeyRangeRef range : rangesToAdd) { + TraceEvent("ClientBlobRangeAdded", bmData->id).detail("Range", range); splitFutures.push_back(splitRange(bmData, range, false)); } @@ -1096,6 +1119,11 @@ ACTOR Future maybeSplitRange(Reference bmData, splitVersion); } + ++bmData->stats.granuleSplits; + if (writeHot) { + ++bmData->stats.granuleWriteHotSplits; + } + // transaction committed, send range assignments // range could have been moved since split eval started, so just revoke from whoever has it RangeAssignment raRevoke; @@ -1182,6 +1210,8 @@ ACTOR Future killBlobWorker(Reference bmData, BlobWorkerI // Remove it from workersById also since otherwise that worker addr will remain excluded // when we try to recruit new blob workers. + TraceEvent("KillBlobWorker", bmData->id).detail("WorkerId", bwId); + if (registered) { bmData->deadWorkers.insert(bwId); bmData->workerStats.erase(bwId); @@ -1838,7 +1868,7 @@ ACTOR Future recoverBlobManager(Reference bmData) { TraceEvent("BlobManagerRecovered", bmData->id) .detail("Epoch", bmData->epoch) .detail("Duration", now() - recoveryStartTime) - .detail("Granules", bmData->workerAssignments.size()) + .detail("Granules", bmData->workerAssignments.size()) // TODO this includes un-set ranges, so it is inaccurate .detail("Assigned", explicitAssignments) .detail("Revoked", outOfDateAssignments.size()); @@ -2089,6 +2119,8 @@ ACTOR Future loadHistoryFiles(Reference bmData, U } } +// FIXME: trace events for pruning + /* * Deletes all files pertaining to the granule with id granuleId and * also removes the history entry for this granule from the system keyspace diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 7b1eab8477..f44bbf6a2d 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -206,6 +206,7 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted { if (BW_DEBUG) { fmt::print("BW {0} found new manager epoch {1}\n", id.toString(), currentManagerEpoch); } + TraceEvent(SevDebug, "BlobWorkerFoundNewManager", id).detail("Epoch", epoch); } return true; @@ -735,7 +736,7 @@ ACTOR Future dumpInitialSnapshotFromFDB(Reference Future streamFuture = tr->getTransaction().getRangeStream(rowsStream, metadata->keyRange, GetRangeLimits(), Snapshot::True); wait(streamFuture && success(snapshotWriter)); - TraceEvent("BlobGranuleSnapshotFile", bwData->id) + TraceEvent(SevDebug, "BlobGranuleSnapshotFile", bwData->id) .detail("Granule", metadata->keyRange) .detail("Version", readVersion); DEBUG_KEY_RANGE("BlobWorkerFDBSnapshot", readVersion, metadata->keyRange, bwData->id); @@ -759,7 +760,8 @@ ACTOR Future dumpInitialSnapshotFromFDB(Reference wait(tr->onError(e)); retries++; TEST(true); // Granule initial snapshot failed - TraceEvent(SevWarn, "BlobGranuleInitialSnapshotRetry", bwData->id) + // FIXME: why can't we supress error event? + TraceEvent(retries < 10 ? SevDebug : SevWarn, "BlobGranuleInitialSnapshotRetry", bwData->id) .error(err) .detail("Granule", metadata->keyRange) .detail("Count", retries); @@ -883,7 +885,7 @@ ACTOR Future checkSplitAndReSnapshot(Reference bw metadata->bytesInNewDeltaFiles); } - TraceEvent("BlobGranuleSnapshotCheck", bwData->id) + TraceEvent(SevDebug, "BlobGranuleSnapshotCheck", bwData->id) .detail("Granule", metadata->keyRange) .detail("Version", reSnapshotVersion); @@ -960,7 +962,7 @@ ACTOR Future checkSplitAndReSnapshot(Reference bw metadata->keyRange.end.printable(), bytesInNewDeltaFiles); } - TraceEvent("BlobGranuleSnapshotFile", bwData->id) + TraceEvent(SevDebug, "BlobGranuleSnapshotFile", bwData->id) .detail("Granule", metadata->keyRange) .detail("Version", metadata->durableDeltaVersion.get()); @@ -1540,7 +1542,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, bwData->id.toString().substr(0, 5).c_str(), deltas.version, rollbackVersion); - TraceEvent(SevWarn, "GranuleRollback", bwData->id) + TraceEvent(SevDebug, "GranuleRollback", bwData->id) .detail("Granule", metadata->keyRange) .detail("Version", deltas.version) .detail("RollbackVersion", rollbackVersion); @@ -1654,7 +1656,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, lastDeltaVersion, oldChangeFeedDataComplete.present() ? ". Finalizing " : ""); } - TraceEvent("BlobGranuleDeltaFile", bwData->id) + TraceEvent(SevDebug, "BlobGranuleDeltaFile", bwData->id) .detail("Granule", metadata->keyRange) .detail("Version", lastDeltaVersion); @@ -1831,13 +1833,13 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, } if (e.code() == error_code_granule_assignment_conflict) { - TraceEvent(SevInfo, "GranuleAssignmentConflict", bwData->id) + TraceEvent("GranuleAssignmentConflict", bwData->id) .detail("Granule", metadata->keyRange) .detail("GranuleID", startState.granuleID); return Void(); } if (e.code() == error_code_change_feed_popped) { - TraceEvent(SevInfo, "GranuleGotChangeFeedPopped", bwData->id) + TraceEvent("GranuleChangeFeedPopped", bwData->id) .detail("Granule", metadata->keyRange) .detail("GranuleID", startState.granuleID); return Void(); @@ -2579,7 +2581,16 @@ ACTOR Future openGranule(Reference bwData, As info.changeFeedStartVersion = tr.getCommittedVersion(); } - TraceEvent("GranuleOpen", bwData->id).detail("Granule", req.keyRange); + TraceEvent openEv("GranuleOpen", bwData->id); + openEv.detail("GranuleID", info.granuleID) + .detail("Granule", req.keyRange) + .detail("Epoch", req.managerEpoch) + .detail("Seqno", req.managerSeqno) + .detail("CFStartVersion", info.changeFeedStartVersion) + .detail("PreviousDurableVersion", info.previousDurableVersion); + if (info.parentGranule.present()) { + openEv.detail("ParentGranuleID", info.parentGranule.get().second); + } return info; } catch (Error& e) { @@ -2900,6 +2911,7 @@ ACTOR Future handleRangeRevoke(Reference bwData, RevokeBlo ACTOR Future registerBlobWorker(Reference bwData, BlobWorkerInterface interf) { state Reference tr = makeReference(bwData->db); + TraceEvent("BlobWorkerRegister", bwData->id); loop { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); @@ -2920,6 +2932,7 @@ ACTOR Future registerBlobWorker(Reference bwData, BlobWork if (BW_DEBUG) { fmt::print("Registered blob worker {}\n", interf.id().toString()); } + TraceEvent("BlobWorkerRegistered", bwData->id); return Void(); } catch (Error& e) { if (BW_DEBUG) {