Merge branch 'main' of https://github.com/apple/foundationdb into rename-rocks-engine

This commit is contained in:
He Liu 2022-03-29 16:24:50 -07:00
commit ca4bfb55d6
43 changed files with 382 additions and 145 deletions

View File

@ -35,7 +35,7 @@ The official docker image for building is [`foundationdb/build`](https://hub.doc
To build outside the official docker image you'll need at least these dependencies:
1. Install cmake Version 3.13 or higher [CMake](https://cmake.org/)
1. Install [Mono](http://www.mono-project.com/download/stable/)
1. Install [Mono](https://www.mono-project.com/download/stable/)
1. Install [Ninja](https://ninja-build.org/) (optional, but recommended)
If compiling for local development, please set `-DUSE_WERROR=ON` in
@ -177,7 +177,7 @@ Under Windows, only Visual Studio with ClangCl is supported
1. Install [Python](https://www.python.org/downloads/) if is not already installed by Visual Studio
1. (Optional) Install [OpenJDK 11](https://developers.redhat.com/products/openjdk/download) to build Java bindings
1. (Optional) Install [OpenSSL 3.x](https://slproweb.com/products/Win32OpenSSL.html) to build with TLS support
1. (Optional) Install [WIX Toolset](http://wixtoolset.org/) to build Windows installer
1. (Optional) Install [WIX Toolset](https://wixtoolset.org/) to build Windows installer
1. `mkdir build && cd build`
1. `cmake -G "Visual Studio 16 2019" -A x64 -T ClangCl <PATH_TO_FOUNDATIONDB_SOURCE>`
1. `msbuild /p:Configuration=Release foundationdb.sln`

View File

@ -124,6 +124,7 @@ if(NOT WIN32)
add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h)
add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h)
add_library(fdb_c_txn_size_test OBJECT test/txn_size_test.c test/test.h)
add_library(fdb_c_client_memory_test OBJECT test/client_memory_test.cpp test/unit/fdb_api.cpp test/unit/fdb_api.hpp)
add_library(mako OBJECT ${MAKO_SRCS})
add_library(fdb_c_setup_tests OBJECT test/unit/setup_tests.cpp)
add_library(fdb_c_unit_tests OBJECT ${UNIT_TEST_SRCS})

View File

@ -138,6 +138,12 @@ Tenant::Tenant(FDBDatabase* db, const uint8_t* name, int name_length) {
}
}
Tenant::~Tenant() {
if (tenant != nullptr) {
fdb_tenant_destroy(tenant);
}
}
// Transaction
Transaction::Transaction(FDBDatabase* db) {
if (fdb_error_t err = fdb_database_create_transaction(db, &tr_)) {
@ -146,7 +152,7 @@ Transaction::Transaction(FDBDatabase* db) {
}
}
Transaction::Transaction(Tenant tenant) {
Transaction::Transaction(Tenant& tenant) {
if (fdb_error_t err = fdb_tenant_create_transaction(tenant.tenant, &tr_)) {
std::cerr << fdb_get_error(err) << std::endl;
std::abort();

View File

@ -206,6 +206,11 @@ public:
class Tenant final {
public:
Tenant(FDBDatabase* db, const uint8_t* name, int name_length);
~Tenant();
Tenant(const Tenant&) = delete;
Tenant& operator=(const Tenant&) = delete;
Tenant(Tenant&&) = delete;
Tenant& operator=(Tenant&&) = delete;
private:
friend class Transaction;
@ -219,7 +224,7 @@ class Transaction final {
public:
// Given an FDBDatabase, initializes a new transaction.
Transaction(FDBDatabase* db);
Transaction(Tenant tenant);
Transaction(Tenant& tenant);
~Transaction();
// Wrapper around fdb_transaction_reset.

View File

@ -101,6 +101,7 @@ std::vector<LogFile> getRelevantLogFiles(const std::vector<LogFile>& files, Vers
struct ConvertParams {
std::string container_url;
Optional<std::string> proxy;
Version begin = invalidVersion;
Version end = invalidVersion;
bool log_enabled = false;
@ -112,6 +113,10 @@ struct ConvertParams {
std::string s;
s.append("ContainerURL:");
s.append(container_url);
if (proxy.present()) {
s.append(" Proxy:");
s.append(proxy.get());
}
s.append(" Begin:");
s.append(format("%" PRId64, begin));
s.append(" End:");
@ -448,7 +453,8 @@ private:
};
ACTOR Future<Void> convert(ConvertParams params) {
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
state Reference<IBackupContainer> container =
IBackupContainer::openContainer(params.container_url, params.proxy, {});
state BackupFileList listing = wait(container->dumpFileList());
std::sort(listing.logs.begin(), listing.logs.end());
TraceEvent("Container").detail("URL", params.container_url).detail("Logs", listing.logs.size());

View File

@ -94,6 +94,7 @@ void printBuildInformation() {
struct DecodeParams {
std::string container_url;
Optional<std::string> proxy;
std::string fileFilter; // only files match the filter will be decoded
bool log_enabled = true;
std::string log_dir, trace_format, trace_log_group;
@ -115,6 +116,10 @@ struct DecodeParams {
std::string s;
s.append("ContainerURL: ");
s.append(container_url);
if (proxy.present()) {
s.append(", Proxy: ");
s.append(proxy.get());
}
s.append(", FileFilter: ");
s.append(fileFilter);
if (log_enabled) {
@ -526,7 +531,8 @@ ACTOR Future<Void> process_file(Reference<IBackupContainer> container, LogFile f
}
ACTOR Future<Void> decode_logs(DecodeParams params) {
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
state Reference<IBackupContainer> container =
IBackupContainer::openContainer(params.container_url, params.proxy, {});
state UID uid = deterministicRandom()->randomUniqueID();
state BackupFileList listing = wait(container->dumpFileList());
// remove partitioned logs

View File

@ -130,6 +130,7 @@ enum {
OPT_USE_PARTITIONED_LOG,
// Backup and Restore constants
OPT_PROXY,
OPT_TAGNAME,
OPT_BACKUPKEYS,
OPT_WAITFORDONE,
@ -234,6 +235,7 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
{ OPT_NOSTOPWHENDONE, "--no-stop-when-done", SO_NONE },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
// Enable "-p" option after GA
// { OPT_USE_PARTITIONED_LOG, "-p", SO_NONE },
{ OPT_USE_PARTITIONED_LOG, "--partitioned-log-experimental", SO_NONE },
@ -294,6 +296,7 @@ CSimpleOpt::SOption g_rgBackupModifyOptions[] = {
{ OPT_MOD_VERIFY_UID, "--verify-uid", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_SNAPSHOTINTERVAL, "-s", SO_REQ_SEP },
{ OPT_SNAPSHOTINTERVAL, "--snapshot-interval", SO_REQ_SEP },
{ OPT_MOD_ACTIVE_INTERVAL, "--active-snapshot-interval", SO_REQ_SEP },
@ -482,6 +485,7 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = {
{ OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP },
@ -517,6 +521,7 @@ CSimpleOpt::SOption g_rgBackupDeleteOptions[] = {
#endif
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP },
@ -546,6 +551,7 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
{ OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP },
@ -578,6 +584,7 @@ CSimpleOpt::SOption g_rgBackupDumpOptions[] = {
{ OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
@ -652,6 +659,7 @@ CSimpleOpt::SOption g_rgBackupQueryOptions[] = {
{ OPT_RESTORE_TIMESTAMP, "--query-restore-timestamp", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_RESTORE_VERSION, "-qrv", SO_REQ_SEP },
{ OPT_RESTORE_VERSION, "--query-restore-version", SO_REQ_SEP },
{ OPT_BACKUPKEYS_FILTER, "-k", SO_REQ_SEP },
@ -689,6 +697,7 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
{ OPT_RESTORE_TIMESTAMP, "--timestamp", SO_REQ_SEP },
{ OPT_KNOB, "--knob-", SO_REQ_SEP },
{ OPT_RESTORECONTAINER, "-r", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_PREFIX_ADD, "--add-prefix", SO_REQ_SEP },
{ OPT_PREFIX_REMOVE, "--remove-prefix", SO_REQ_SEP },
{ OPT_TAGNAME, "-t", SO_REQ_SEP },
@ -1920,6 +1929,7 @@ ACTOR Future<Void> submitDBBackup(Database src,
ACTOR Future<Void> submitBackup(Database db,
std::string url,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
@ -1977,6 +1987,7 @@ ACTOR Future<Void> submitBackup(Database db,
else {
wait(backupAgent.submitBackup(db,
KeyRef(url),
proxy,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,
@ -2260,8 +2271,9 @@ ACTOR Future<Void> changeDBBackupResumed(Database src, Database dest, bool pause
}
Reference<IBackupContainer> openBackupContainer(const char* name,
std::string destinationContainer,
Optional<std::string> const& encryptionKeyFile = {}) {
const std::string& destinationContainer,
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFile) {
// Error, if no dest container was specified
if (destinationContainer.empty()) {
fprintf(stderr, "ERROR: No backup destination was specified.\n");
@ -2271,7 +2283,7 @@ Reference<IBackupContainer> openBackupContainer(const char* name,
Reference<IBackupContainer> c;
try {
c = IBackupContainer::openContainer(destinationContainer, encryptionKeyFile);
c = IBackupContainer::openContainer(destinationContainer, proxy, encryptionKeyFile);
} catch (Error& e) {
std::string msg = format("ERROR: '%s' on URL '%s'", e.what(), destinationContainer.c_str());
if (e.code() == error_code_backup_invalid_url && !IBackupContainer::lastOpenError.empty()) {
@ -2291,6 +2303,7 @@ ACTOR Future<Void> runRestore(Database db,
std::string originalClusterFile,
std::string tagName,
std::string container,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
Version beginVersion,
Version targetVersion,
@ -2339,7 +2352,7 @@ ACTOR Future<Void> runRestore(Database db,
state FileBackupAgent backupAgent;
state Reference<IBackupContainer> bc =
openBackupContainer(exeRestore.toString().c_str(), container, encryptionKeyFile);
openBackupContainer(exeRestore.toString().c_str(), container, proxy, encryptionKeyFile);
// If targetVersion is unset then use the maximum restorable version from the backup description
if (targetVersion == invalidVersion) {
@ -2368,6 +2381,7 @@ ACTOR Future<Void> runRestore(Database db,
origDb,
KeyRef(tagName),
KeyRef(container),
proxy,
ranges,
waitForDone,
targetVersion,
@ -2411,6 +2425,7 @@ ACTOR Future<Void> runRestore(Database db,
ACTOR Future<Void> runFastRestoreTool(Database db,
std::string tagName,
std::string container,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
Version dbVersion,
bool performRestore,
@ -2440,7 +2455,7 @@ ACTOR Future<Void> runFastRestoreTool(Database db,
if (performRestore) {
if (dbVersion == invalidVersion) {
TraceEvent("FastRestoreTool").detail("TargetRestoreVersion", "Largest restorable version");
BackupDescription desc = wait(IBackupContainer::openContainer(container)->describeBackup());
BackupDescription desc = wait(IBackupContainer::openContainer(container, proxy, {})->describeBackup());
if (!desc.maxRestorableVersion.present()) {
fprintf(stderr, "The specified backup is not restorable to any version.\n");
throw restore_error();
@ -2457,6 +2472,7 @@ ACTOR Future<Void> runFastRestoreTool(Database db,
KeyRef(tagName),
ranges,
KeyRef(container),
proxy,
dbVersion,
LockDB::True,
randomUID,
@ -2478,7 +2494,7 @@ ACTOR Future<Void> runFastRestoreTool(Database db,
restoreVersion = dbVersion;
} else {
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(container);
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(container, proxy, {});
state BackupDescription description = wait(bc->describeBackup());
if (dbVersion <= 0) {
@ -2522,9 +2538,10 @@ ACTOR Future<Void> runFastRestoreTool(Database db,
ACTOR Future<Void> dumpBackupData(const char* name,
std::string destinationContainer,
Optional<std::string> proxy,
Version beginVersion,
Version endVersion) {
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, proxy, {});
if (beginVersion < 0 || endVersion < 0) {
BackupDescription desc = wait(c->describeBackup());
@ -2552,6 +2569,7 @@ ACTOR Future<Void> dumpBackupData(const char* name,
ACTOR Future<Void> expireBackupData(const char* name,
std::string destinationContainer,
Optional<std::string> proxy,
Version endVersion,
std::string endDatetime,
Database db,
@ -2577,7 +2595,7 @@ ACTOR Future<Void> expireBackupData(const char* name,
}
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, encryptionKeyFile);
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, proxy, encryptionKeyFile);
state IBackupContainer::ExpireProgress progress;
state std::string lastProgress;
@ -2623,9 +2641,11 @@ ACTOR Future<Void> expireBackupData(const char* name,
return Void();
}
ACTOR Future<Void> deleteBackupContainer(const char* name, std::string destinationContainer) {
ACTOR Future<Void> deleteBackupContainer(const char* name,
std::string destinationContainer,
Optional<std::string> proxy) {
try {
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, proxy, {});
state int numDeleted = 0;
state Future<Void> done = c->deleteContainer(&numDeleted);
@ -2657,12 +2677,13 @@ ACTOR Future<Void> deleteBackupContainer(const char* name, std::string destinati
ACTOR Future<Void> describeBackup(const char* name,
std::string destinationContainer,
Optional<std::string> proxy,
bool deep,
Optional<Database> cx,
bool json,
Optional<std::string> encryptionKeyFile) {
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, encryptionKeyFile);
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, proxy, encryptionKeyFile);
state BackupDescription desc = wait(c->describeBackup(deep));
if (cx.present())
wait(desc.resolveVersionTimes(cx.get()));
@ -2688,6 +2709,7 @@ static void reportBackupQueryError(UID operationId, JsonBuilderObject& result, s
// resolved to that timestamp.
ACTOR Future<Void> queryBackup(const char* name,
std::string destinationContainer,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> keyRangesFilter,
Version restoreVersion,
std::string originalClusterFile,
@ -2734,7 +2756,7 @@ ACTOR Future<Void> queryBackup(const char* name,
}
try {
state Reference<IBackupContainer> bc = openBackupContainer(name, destinationContainer);
state Reference<IBackupContainer> bc = openBackupContainer(name, destinationContainer, proxy, {});
if (restoreVersion == invalidVersion) {
BackupDescription desc = wait(bc->describeBackup());
if (desc.maxRestorableVersion.present()) {
@ -2814,9 +2836,9 @@ ACTOR Future<Void> queryBackup(const char* name,
return Void();
}
ACTOR Future<Void> listBackup(std::string baseUrl) {
ACTOR Future<Void> listBackup(std::string baseUrl, Optional<std::string> proxy) {
try {
std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl));
std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl, proxy));
for (std::string container : containers) {
printf("%s\n", container.c_str());
}
@ -2852,6 +2874,7 @@ ACTOR Future<Void> listBackupTags(Database cx) {
struct BackupModifyOptions {
Optional<std::string> verifyUID;
Optional<std::string> destURL;
Optional<std::string> proxy;
Optional<int> snapshotIntervalSeconds;
Optional<int> activeSnapshotIntervalSeconds;
bool hasChanges() const {
@ -2869,7 +2892,7 @@ ACTOR Future<Void> modifyBackup(Database db, std::string tagName, BackupModifyOp
state Reference<IBackupContainer> bc;
if (options.destURL.present()) {
bc = openBackupContainer(exeBackup.toString().c_str(), options.destURL.get());
bc = openBackupContainer(exeBackup.toString().c_str(), options.destURL.get(), options.proxy, {});
try {
wait(timeoutError(bc->create(), 30));
} catch (Error& e) {
@ -3342,6 +3365,7 @@ int main(int argc, char* argv[]) {
break;
}
Optional<std::string> proxy;
std::string destinationContainer;
bool describeDeep = false;
bool describeTimestamps = false;
@ -3595,6 +3619,14 @@ int main(int argc, char* argv[]) {
return FDB_EXIT_ERROR;
}
break;
case OPT_PROXY:
proxy = args->OptionArg();
if (!Hostname::isHostname(proxy.get()) && !NetworkAddress::parseOptional(proxy.get()).present()) {
fprintf(stderr, "ERROR: Proxy format should be either IP:port or host:port\n");
return FDB_EXIT_ERROR;
}
modifyOptions.proxy = proxy;
break;
case OPT_DESTCONTAINER:
destinationContainer = args->OptionArg();
// If the url starts with '/' then prepend "file://" for backwards compatibility
@ -3962,9 +3994,10 @@ int main(int argc, char* argv[]) {
if (!initCluster())
return FDB_EXIT_ERROR;
// Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable.
openBackupContainer(argv[0], destinationContainer, encryptionKeyFile);
openBackupContainer(argv[0], destinationContainer, proxy, encryptionKeyFile);
f = stopAfter(submitBackup(db,
destinationContainer,
proxy,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
backupKeys,
@ -4036,6 +4069,7 @@ int main(int argc, char* argv[]) {
}
f = stopAfter(expireBackupData(argv[0],
destinationContainer,
proxy,
expireVersion,
expireDatetime,
db,
@ -4047,7 +4081,7 @@ int main(int argc, char* argv[]) {
case BackupType::DELETE_BACKUP:
initTraceFile();
f = stopAfter(deleteBackupContainer(argv[0], destinationContainer));
f = stopAfter(deleteBackupContainer(argv[0], destinationContainer, proxy));
break;
case BackupType::DESCRIBE:
@ -4060,6 +4094,7 @@ int main(int argc, char* argv[]) {
// given, but quietly skip them if not.
f = stopAfter(describeBackup(argv[0],
destinationContainer,
proxy,
describeDeep,
describeTimestamps ? Optional<Database>(db) : Optional<Database>(),
jsonOutput,
@ -4068,7 +4103,7 @@ int main(int argc, char* argv[]) {
case BackupType::LIST:
initTraceFile();
f = stopAfter(listBackup(baseUrl));
f = stopAfter(listBackup(baseUrl, proxy));
break;
case BackupType::TAGS:
@ -4081,6 +4116,7 @@ int main(int argc, char* argv[]) {
initTraceFile();
f = stopAfter(queryBackup(argv[0],
destinationContainer,
proxy,
backupKeysFilter,
restoreVersion,
restoreClusterFileOrig,
@ -4090,7 +4126,7 @@ int main(int argc, char* argv[]) {
case BackupType::DUMP:
initTraceFile();
f = stopAfter(dumpBackupData(argv[0], destinationContainer, dumpBegin, dumpEnd));
f = stopAfter(dumpBackupData(argv[0], destinationContainer, proxy, dumpBegin, dumpEnd));
break;
case BackupType::UNDEFINED:
@ -4141,6 +4177,7 @@ int main(int argc, char* argv[]) {
restoreClusterFileOrig,
tagName,
restoreContainer,
proxy,
backupKeys,
beginVersion,
restoreVersion,
@ -4218,6 +4255,7 @@ int main(int argc, char* argv[]) {
f = stopAfter(runFastRestoreTool(db,
tagName,
restoreContainer,
proxy,
backupKeys,
restoreVersion,
!dryRun,

View File

@ -51,7 +51,9 @@ ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
try {
if (!doneExistenceCheck) {
Optional<Value> existingTenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey)));
// Hold the reference to the standalone's memory
state ThreadFuture<Optional<Value>> existingTenantFuture = tr->get(tenantNameKey);
Optional<Value> existingTenant = wait(safeThreadFutureToFuture(existingTenantFuture));
if (existingTenant.present()) {
throw tenant_already_exists();
}
@ -96,7 +98,9 @@ ACTOR Future<bool> deleteTenantCommandActor(Reference<IDatabase> db, std::vector
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
try {
if (!doneExistenceCheck) {
Optional<Value> existingTenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey)));
// Hold the reference to the standalone's memory
state ThreadFuture<Optional<Value>> existingTenantFuture = tr->get(tenantNameKey);
Optional<Value> existingTenant = wait(safeThreadFutureToFuture(existingTenantFuture));
if (!existingTenant.present()) {
throw tenant_not_found();
}
@ -163,8 +167,10 @@ ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<
loop {
try {
RangeResult tenants = wait(safeThreadFutureToFuture(
tr->getRange(firstGreaterOrEqual(beginTenantKey), firstGreaterOrEqual(endTenantKey), limit)));
// Hold the reference to the standalone's memory
state ThreadFuture<RangeResult> kvsFuture =
tr->getRange(firstGreaterOrEqual(beginTenantKey), firstGreaterOrEqual(endTenantKey), limit);
RangeResult tenants = wait(safeThreadFutureToFuture(kvsFuture));
if (tenants.empty()) {
if (tokens.size() == 1) {
@ -213,7 +219,9 @@ ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<St
loop {
try {
Optional<Value> tenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey)));
// Hold the reference to the standalone's memory
state ThreadFuture<Optional<Value>> tenantFuture = tr->get(tenantNameKey);
Optional<Value> tenant = wait(safeThreadFutureToFuture(tenantFuture));
if (!tenant.present()) {
throw tenant_not_found();
}

View File

@ -165,6 +165,7 @@ public:
Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
Key bcUrl,
Optional<std::string> proxy,
Version targetVersion,
LockDB lockDB,
UID randomUID,
@ -187,6 +188,7 @@ public:
Optional<Database> cxOrig,
Key tagName,
Key url,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
WaitForComplete = WaitForComplete::True,
Version targetVersion = ::invalidVersion,
@ -202,6 +204,7 @@ public:
Optional<Database> cxOrig,
Key tagName,
Key url,
Optional<std::string> proxy,
WaitForComplete waitForComplete = WaitForComplete::True,
Version targetVersion = ::invalidVersion,
Verbose verbose = Verbose::True,
@ -219,6 +222,7 @@ public:
cxOrig,
tagName,
url,
proxy,
rangeRef,
waitForComplete,
targetVersion,
@ -263,6 +267,7 @@ public:
Future<Void> submitBackup(Reference<ReadYourWritesTransaction> tr,
Key outContainer,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string const& tagName,
@ -273,6 +278,7 @@ public:
Optional<std::string> const& encryptionKeyFileName = {});
Future<Void> submitBackup(Database cx,
Key outContainer,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string const& tagName,
@ -284,6 +290,7 @@ public:
return runRYWTransactionFailIfLocked(cx, [=](Reference<ReadYourWritesTransaction> tr) {
return submitBackup(tr,
outContainer,
proxy,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,
@ -720,20 +727,31 @@ template <>
inline Tuple Codec<Reference<IBackupContainer>>::pack(Reference<IBackupContainer> const& bc) {
Tuple tuple;
tuple.append(StringRef(bc->getURL()));
if (bc->getProxy().present()) {
tuple.append(StringRef(bc->getProxy().get()));
} else {
tuple.append(StringRef());
}
if (bc->getEncryptionKeyFileName().present()) {
tuple.append(bc->getEncryptionKeyFileName().get());
} else {
tuple.append(StringRef());
}
return tuple;
}
template <>
inline Reference<IBackupContainer> Codec<Reference<IBackupContainer>>::unpack(Tuple const& val) {
ASSERT(val.size() == 1 || val.size() == 2);
ASSERT(val.size() == 3);
auto url = val.getString(0).toString();
Optional<std::string> encryptionKeyFileName;
if (val.size() == 2) {
encryptionKeyFileName = val.getString(1).toString();
Optional<std::string> proxy;
if (!val.getString(1).empty()) {
proxy = val.getString(1).toString();
}
return IBackupContainer::openContainer(url, encryptionKeyFileName);
Optional<std::string> encryptionKeyFileName;
if (!val.getString(2).empty()) {
encryptionKeyFileName = val.getString(2).toString();
}
return IBackupContainer::openContainer(url, proxy, encryptionKeyFileName);
}
class BackupConfig : public KeyBackedConfig {

View File

@ -256,7 +256,8 @@ std::vector<std::string> IBackupContainer::getURLFormats() {
// Get an IBackupContainer based on a container URL string
Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& url,
Optional<std::string> const& encryptionKeyFileName) {
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFileName) {
static std::map<std::string, Reference<IBackupContainer>> m_cache;
Reference<IBackupContainer>& r = m_cache[url];
@ -273,7 +274,7 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
// The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options.
S3BlobStoreEndpoint::ParametersT backupParams;
Reference<S3BlobStoreEndpoint> bstore =
S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams);
S3BlobStoreEndpoint::fromString(url, proxy, &resource, &lastOpenError, &backupParams);
if (resource.empty())
throw backup_invalid_url();
@ -317,7 +318,7 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
// Get a list of URLS to backup containers based on some a shorter URL. This function knows about some set of supported
// URL types which support this sort of backup discovery.
ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL) {
ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL, Optional<std::string> proxy) {
try {
StringRef u(baseURL);
if (u.startsWith("file://"_sr)) {
@ -327,8 +328,8 @@ ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL)
std::string resource;
S3BlobStoreEndpoint::ParametersT backupParams;
Reference<S3BlobStoreEndpoint> bstore =
S3BlobStoreEndpoint::fromString(baseURL, &resource, &IBackupContainer::lastOpenError, &backupParams);
Reference<S3BlobStoreEndpoint> bstore = S3BlobStoreEndpoint::fromString(
baseURL, proxy, &resource, &IBackupContainer::lastOpenError, &backupParams);
if (!resource.empty()) {
TraceEvent(SevWarn, "BackupContainer")
@ -370,8 +371,9 @@ ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL)
}
}
Future<std::vector<std::string>> IBackupContainer::listContainers(const std::string& baseURL) {
return listContainers_impl(baseURL);
Future<std::vector<std::string>> IBackupContainer::listContainers(const std::string& baseURL,
const Optional<std::string>& proxy) {
return listContainers_impl(baseURL, proxy);
}
ACTOR Future<Version> timeKeeperVersionFromDatetime(std::string datetime, Database db) {

View File

@ -156,6 +156,7 @@ struct BackupFileList {
struct BackupDescription {
BackupDescription() : snapshotBytes(0) {}
std::string url;
Optional<std::string> proxy;
std::vector<KeyspaceSnapshotFile> snapshots;
int64_t snapshotBytes;
// The version before which everything has been deleted by an expire
@ -294,11 +295,14 @@ public:
// Get an IBackupContainer based on a container spec string
static Reference<IBackupContainer> openContainer(const std::string& url,
const Optional<std::string>& encryptionKeyFileName = {});
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFileName);
static std::vector<std::string> getURLFormats();
static Future<std::vector<std::string>> listContainers(const std::string& baseURL);
static Future<std::vector<std::string>> listContainers(const std::string& baseURL,
const Optional<std::string>& proxy);
std::string const& getURL() const { return URL; }
Optional<std::string> const& getProxy() const { return proxy; }
Optional<std::string> const& getEncryptionKeyFileName() const { return encryptionKeyFileName; }
static std::string lastOpenError;
@ -306,6 +310,7 @@ public:
// TODO: change the following back to `private` once blob obj access is refactored
protected:
std::string URL;
Optional<std::string> proxy;
Optional<std::string> encryptionKeyFileName;
};

View File

@ -409,6 +409,7 @@ public:
Version logStartVersionOverride) {
state BackupDescription desc;
desc.url = bc->getURL();
desc.proxy = bc->getProxy();
TraceEvent("BackupContainerDescribe1")
.detail("URL", bc->getURL())
@ -1500,7 +1501,8 @@ Future<Void> BackupContainerFileSystem::createTestEncryptionKeyFile(std::string
// code but returning a different template type because you can't cast between them
Reference<BackupContainerFileSystem> BackupContainerFileSystem::openContainerFS(
const std::string& url,
Optional<std::string> const& encryptionKeyFileName) {
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFileName) {
static std::map<std::string, Reference<BackupContainerFileSystem>> m_cache;
Reference<BackupContainerFileSystem>& r = m_cache[url];
@ -1517,7 +1519,7 @@ Reference<BackupContainerFileSystem> BackupContainerFileSystem::openContainerFS(
// The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options.
S3BlobStoreEndpoint::ParametersT backupParams;
Reference<S3BlobStoreEndpoint> bstore =
S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams);
S3BlobStoreEndpoint::fromString(url, proxy, &resource, &lastOpenError, &backupParams);
if (resource.empty())
throw backup_invalid_url();
@ -1635,7 +1637,9 @@ ACTOR static Future<Void> testWriteSnapshotFile(Reference<IBackupFile> file, Key
return Void();
}
ACTOR Future<Void> testBackupContainer(std::string url, Optional<std::string> encryptionKeyFileName) {
ACTOR Future<Void> testBackupContainer(std::string url,
Optional<std::string> proxy,
Optional<std::string> encryptionKeyFileName) {
state FlowLock lock(100e6);
if (encryptionKeyFileName.present()) {
@ -1644,7 +1648,7 @@ ACTOR Future<Void> testBackupContainer(std::string url, Optional<std::string> en
printf("BackupContainerTest URL %s\n", url.c_str());
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url, encryptionKeyFileName);
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url, proxy, encryptionKeyFileName);
// Make sure container doesn't exist, then create it.
try {
@ -1789,12 +1793,13 @@ ACTOR Future<Void> testBackupContainer(std::string url, Optional<std::string> en
}
TEST_CASE("/backup/containers/localdir/unencrypted") {
wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {}));
wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {}, {}));
return Void();
}
TEST_CASE("/backup/containers/localdir/encrypted") {
wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()),
{},
format("%s/test_encryption_key", params.getDataDir().c_str())));
return Void();
}
@ -1803,7 +1808,7 @@ TEST_CASE("/backup/containers/url") {
if (!g_network->isSimulated()) {
const char* url = getenv("FDB_TEST_BACKUP_URL");
ASSERT(url != nullptr);
wait(testBackupContainer(url, {}));
wait(testBackupContainer(url, {}, {}));
}
return Void();
}
@ -1813,7 +1818,7 @@ TEST_CASE("/backup/containers_list") {
state const char* url = getenv("FDB_TEST_BACKUP_URL");
ASSERT(url != nullptr);
printf("Listing %s\n", url);
std::vector<std::string> urls = wait(IBackupContainer::listContainers(url));
std::vector<std::string> urls = wait(IBackupContainer::listContainers(url, {}));
for (auto& u : urls) {
printf("%s\n", u.c_str());
}

View File

@ -81,9 +81,9 @@ public:
Future<bool> exists() override = 0;
// TODO: refactor this to separate out the "deal with blob store" stuff from the backup business logic
static Reference<BackupContainerFileSystem> openContainerFS(
const std::string& url,
const Optional<std::string>& encryptionKeyFileName = {});
static Reference<BackupContainerFileSystem> openContainerFS(const std::string& url,
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFileName);
// Get a list of fileNames and their sizes in the container under the given path
// Although not required, an implementation can avoid traversing unwanted subfolders

View File

@ -52,19 +52,20 @@ struct BlobFilePointerRef {
StringRef filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
BlobFilePointerRef() {}
BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length)
: filename(to, filename), offset(offset), length(length) {}
BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length, int64_t fullFileLength)
: filename(to, filename), offset(offset), length(length), fullFileLength(fullFileLength) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, filename, offset, length);
serializer(ar, filename, offset, length, fullFileLength);
}
std::string toString() const {
std::stringstream ss;
ss << filename.toString() << ":" << offset << ":" << length;
ss << filename.toString() << ":" << offset << ":" << length << ":" << fullFileLength;
return std::move(ss).str();
}
};

View File

@ -240,22 +240,27 @@ static void startLoad(const ReadBlobGranuleContext granuleContext,
// Start load process for all files in chunk
if (chunk.snapshotFile.present()) {
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
// FIXME: full file length won't always be length of read
// FIXME: remove when we implement file multiplexing
ASSERT(chunk.snapshotFile.get().offset == 0);
ASSERT(chunk.snapshotFile.get().length == chunk.snapshotFile.get().fullFileLength);
loadIds.snapshotId = granuleContext.start_load_f(snapshotFname.c_str(),
snapshotFname.size(),
chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length,
chunk.snapshotFile.get().length,
chunk.snapshotFile.get().fullFileLength,
granuleContext.userContext);
}
loadIds.deltaIds.reserve(chunk.deltaFiles.size());
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
// FIXME: remove when we implement file multiplexing
ASSERT(chunk.deltaFiles[deltaFileIdx].offset == 0);
ASSERT(chunk.deltaFiles[deltaFileIdx].length == chunk.deltaFiles[deltaFileIdx].fullFileLength);
int64_t deltaLoadId = granuleContext.start_load_f(deltaFName.c_str(),
deltaFName.size(),
chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length,
chunk.deltaFiles[deltaFileIdx].length,
chunk.deltaFiles[deltaFileIdx].fullFileLength,
granuleContext.userContext);
loadIds.deltaIds.push_back(deltaLoadId);
}

View File

@ -4363,13 +4363,14 @@ public:
Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
Key bcUrl,
Optional<std::string> proxy,
Version targetVersion,
LockDB lockDB,
UID randomUID,
Key addPrefix,
Key removePrefix) {
// Sanity check backup is valid
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(bcUrl.toString());
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(bcUrl.toString(), proxy, {});
state BackupDescription desc = wait(bc->describeBackup());
wait(desc.resolveVersionTimes(cx));
@ -4430,6 +4431,7 @@ public:
struct RestoreRequest restoreRequest(restoreIndex,
restoreTag,
bcUrl,
proxy,
targetVersion,
range,
deterministicRandom()->randomUniqueID(),
@ -4510,6 +4512,7 @@ public:
ACTOR static Future<Void> submitBackup(FileBackupAgent* backupAgent,
Reference<ReadYourWritesTransaction> tr,
Key outContainer,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
@ -4555,7 +4558,8 @@ public:
backupContainer = joinPath(backupContainer, std::string("backup-") + nowStr.toString());
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupContainer, encryptionKeyFileName);
state Reference<IBackupContainer> bc =
IBackupContainer::openContainer(backupContainer, proxy, encryptionKeyFileName);
try {
wait(timeoutError(bc->create(), 30));
} catch (Error& e) {
@ -4642,6 +4646,7 @@ public:
Reference<ReadYourWritesTransaction> tr,
Key tagName,
Key backupURL,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
Version restoreVersion,
Key addPrefix,
@ -4710,7 +4715,7 @@ public:
// Point the tag to the new uid
tag.set(tr, { uid, false });
Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupURL.toString());
Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupURL.toString(), proxy, {});
// Configure the new restore
restore.tag().set(tr, tagName.toString());
@ -5303,6 +5308,7 @@ public:
Optional<Database> cxOrig,
Key tagName,
Key url,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
WaitForComplete waitForComplete,
Version targetVersion,
@ -5320,7 +5326,7 @@ public:
throw restore_error();
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString(), proxy, {});
state BackupDescription desc = wait(bc->describeBackup(true));
if (cxOrig.present()) {
@ -5360,6 +5366,7 @@ public:
tr,
tagName,
url,
proxy,
ranges,
targetVersion,
addPrefix,
@ -5499,6 +5506,7 @@ public:
tagName,
ranges,
KeyRef(bc->getURL()),
bc->getProxy(),
targetVersion,
LockDB::True,
randomUid,
@ -5520,6 +5528,7 @@ public:
cx,
tagName,
KeyRef(bc->getURL()),
bc->getProxy(),
ranges,
WaitForComplete::True,
::invalidVersion,
@ -5561,13 +5570,14 @@ Future<Void> FileBackupAgent::submitParallelRestore(Database cx,
Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
Key bcUrl,
Optional<std::string> proxy,
Version targetVersion,
LockDB lockDB,
UID randomUID,
Key addPrefix,
Key removePrefix) {
return FileBackupAgentImpl::submitParallelRestore(
cx, backupTag, backupRanges, bcUrl, targetVersion, lockDB, randomUID, addPrefix, removePrefix);
cx, backupTag, backupRanges, bcUrl, proxy, targetVersion, lockDB, randomUID, addPrefix, removePrefix);
}
Future<Void> FileBackupAgent::atomicParallelRestore(Database cx,
@ -5582,6 +5592,7 @@ Future<Version> FileBackupAgent::restore(Database cx,
Optional<Database> cxOrig,
Key tagName,
Key url,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
WaitForComplete waitForComplete,
Version targetVersion,
@ -5598,6 +5609,7 @@ Future<Version> FileBackupAgent::restore(Database cx,
cxOrig,
tagName,
url,
proxy,
ranges,
waitForComplete,
targetVersion,
@ -5639,6 +5651,7 @@ Future<ERestoreState> FileBackupAgent::waitRestore(Database cx, Key tagName, Ver
Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction> tr,
Key outContainer,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string const& tagName,
@ -5650,6 +5663,7 @@ Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction>
return FileBackupAgentImpl::submitBackup(this,
tr,
outContainer,
proxy,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,

View File

@ -7372,6 +7372,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
fmt::print(
"BG Mapping for [{0} - %{1}) too large!\n", keyRange.begin.printable(), keyRange.end.printable());
}
TraceEvent(SevWarn, "BGMappingTooLarge").detail("Range", range).detail("Max", 1000);
throw unsupported_operation();
}
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);

View File

@ -49,6 +49,7 @@ struct RestoreRequest {
int index;
Key tagName;
Key url;
Optional<std::string> proxy;
Version targetVersion;
KeyRange range;
UID randomUid;
@ -64,27 +65,29 @@ struct RestoreRequest {
explicit RestoreRequest(const int index,
const Key& tagName,
const Key& url,
const Optional<std::string>& proxy,
Version targetVersion,
const KeyRange& range,
const UID& randomUid,
Key& addPrefix,
Key removePrefix)
: index(index), tagName(tagName), url(url), targetVersion(targetVersion), range(range), randomUid(randomUid),
addPrefix(addPrefix), removePrefix(removePrefix) {}
: index(index), tagName(tagName), url(url), proxy(proxy), targetVersion(targetVersion), range(range),
randomUid(randomUid), addPrefix(addPrefix), removePrefix(removePrefix) {}
// To change this serialization, ProtocolVersion::RestoreRequestValue must be updated, and downgrades need to be
// considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, index, tagName, url, targetVersion, range, randomUid, addPrefix, removePrefix, reply);
serializer(ar, index, tagName, url, proxy, targetVersion, range, randomUid, addPrefix, removePrefix, reply);
}
std::string toString() const {
std::stringstream ss;
ss << "index:" << std::to_string(index) << " tagName:" << tagName.contents().toString()
<< " url:" << url.contents().toString() << " targetVersion:" << std::to_string(targetVersion)
<< " range:" << range.toString() << " randomUid:" << randomUid.toString()
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
<< " url:" << url.contents().toString() << " proxy:" << (proxy.present() ? proxy.get() : "")
<< " targetVersion:" << std::to_string(targetVersion) << " range:" << range.toString()
<< " randomUid:" << randomUid.toString() << " addPrefix:" << addPrefix.toString()
<< " removePrefix:" << removePrefix.toString();
return ss.str();
}
};

View File

@ -162,7 +162,8 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
return r;
}
Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(std::string const& url,
Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(const std::string& url,
const Optional<std::string>& proxy,
std::string* resourceFromURL,
std::string* error,
ParametersT* ignored_parameters) {
@ -175,6 +176,17 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(std::string const
if (prefix != LiteralStringRef("blobstore"))
throw format("Invalid blobstore URL prefix '%s'", prefix.toString().c_str());
Optional<std::string> proxyHost, proxyPort;
if (proxy.present()) {
if (!Hostname::isHostname(proxy.get()) && !NetworkAddress::parseOptional(proxy.get()).present()) {
throw format("'%s' is not a valid value for proxy. Format should be either IP:port or host:port.",
proxy.get().c_str());
}
StringRef p(proxy.get());
proxyHost = p.eat(":").toString();
proxyPort = p.eat().toString();
}
Optional<StringRef> cred;
if (url.find("@") != std::string::npos) {
cred = t.eat("@");
@ -261,7 +273,8 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(std::string const
creds = S3BlobStoreEndpoint::Credentials{ key.toString(), secret.toString(), securityToken.toString() };
}
return makeReference<S3BlobStoreEndpoint>(host.toString(), service.toString(), creds, knobs, extraHeaders);
return makeReference<S3BlobStoreEndpoint>(
host.toString(), service.toString(), proxyHost, proxyPort, creds, knobs, extraHeaders);
} catch (std::string& err) {
if (error != nullptr)
@ -624,11 +637,11 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
return rconn;
}
}
std::string service = b->service;
std::string host = b->host, service = b->service;
if (service.empty())
service = b->knobs.secure_connection ? "https" : "http";
state Reference<IConnection> conn =
wait(INetworkConnections::net()->connect(b->host, service, b->knobs.secure_connection ? true : false));
wait(INetworkConnections::net()->connect(host, service, b->knobs.secure_connection ? true : false));
wait(conn->connectHandshake());
TraceEvent("S3BlobStoreEndpointNewConnection")
@ -1609,7 +1622,7 @@ TEST_CASE("/backup/s3/v4headers") {
S3BlobStoreEndpoint::Credentials creds{ "AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "" }
// GET without query parameters
{
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", creds);
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds);
std::string verb("GET");
std::string resource("/test.txt");
HTTP::Headers headers;
@ -1624,7 +1637,7 @@ TEST_CASE("/backup/s3/v4headers") {
// GET with query parameters
{
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", creds);
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds);
std::string verb("GET");
std::string resource("/test/examplebucket?Action=DescribeRegions&Version=2013-10-15");
HTTP::Headers headers;
@ -1639,7 +1652,7 @@ TEST_CASE("/backup/s3/v4headers") {
// POST
{
S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", creds);
S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", "proxy", "port", creds);
std::string verb("POST");
std::string resource("/simple.json");
HTTP::Headers headers;

View File

@ -99,11 +99,15 @@ public:
};
S3BlobStoreEndpoint(std::string const& host,
std::string service,
std::string const& service,
Optional<std::string> const& proxyHost,
Optional<std::string> const& proxyPort,
Optional<Credentials> const& creds,
BlobKnobs const& knobs = BlobKnobs(),
HTTP::Headers extraHeaders = HTTP::Headers())
: host(host), service(service), credentials(creds), lookupKey(creds.present() && creds.get().key.empty()),
: host(host), service(service), proxyHost(proxyHost), proxyPort(proxyPort),
useProxy(proxyHost.present() && proxyPort.present()), credentials(creds),
lookupKey(creds.present() && creds.get().key.empty()),
lookupSecret(creds.present() && creds.get().secret.empty()), knobs(knobs), extraHeaders(extraHeaders),
requestRate(new SpeedLimit(knobs.requests_per_second, 1)),
requestRateList(new SpeedLimit(knobs.list_requests_per_second, 1)),
@ -114,7 +118,7 @@ public:
recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)), concurrentRequests(knobs.concurrent_requests),
concurrentUploads(knobs.concurrent_uploads), concurrentLists(knobs.concurrent_lists) {
if (host.empty())
if (host.empty() || (proxyHost.present() != proxyPort.present()))
throw connection_string_invalid();
}
@ -132,10 +136,11 @@ public:
// Parse url and return a S3BlobStoreEndpoint
// If the url has parameters that S3BlobStoreEndpoint can't consume then an error will be thrown unless
// ignored_parameters is given in which case the unconsumed parameters will be added to it.
static Reference<S3BlobStoreEndpoint> fromString(std::string const& url,
std::string* resourceFromURL = nullptr,
std::string* error = nullptr,
ParametersT* ignored_parameters = nullptr);
static Reference<S3BlobStoreEndpoint> fromString(const std::string& url,
const Optional<std::string>& proxy,
std::string* resourceFromURL,
std::string* error,
ParametersT* ignored_parameters);
// Get a normalized version of this URL with the given resource and any non-default BlobKnob values as URL
// parameters in addition to the passed params string
@ -151,6 +156,10 @@ public:
std::string host;
std::string service;
Optional<std::string> proxyHost;
Optional<std::string> proxyPort;
bool useProxy;
Optional<Credentials> credentials;
bool lookupKey;
bool lookupSecret;

View File

@ -924,7 +924,7 @@ struct OverlappingChangeFeedsReply {
};
struct OverlappingChangeFeedsRequest {
constexpr static FileIdentifier file_identifier = 10726174;
constexpr static FileIdentifier file_identifier = 7228462;
KeyRange range;
Version minVersion;
ReplyPromise<OverlappingChangeFeedsReply> reply;
@ -939,7 +939,7 @@ struct OverlappingChangeFeedsRequest {
};
struct ChangeFeedVersionUpdateReply {
constexpr static FileIdentifier file_identifier = 11815134;
constexpr static FileIdentifier file_identifier = 4246160;
Version version = 0;
ChangeFeedVersionUpdateReply() {}

View File

@ -1190,23 +1190,26 @@ const KeyRange blobGranuleFileKeyRangeFor(UID granuleID) {
return KeyRangeRef(startKey, strinc(startKey));
}
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length) {
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << filename;
wr << offset;
wr << length;
wr << fullFileLength;
return wr.toValue();
}
std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) {
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) {
StringRef filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
BinaryReader reader(value, IncludeVersion());
reader >> filename;
reader >> offset;
reader >> length;
return std::tuple(filename, offset, length);
reader >> fullFileLength;
return std::tuple(filename, offset, length, fullFileLength);
}
const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force) {

View File

@ -572,8 +572,8 @@ const Key blobGranuleFileKeyFor(UID granuleID, Version fileVersion, uint8_t file
std::tuple<UID, Version, uint8_t> decodeBlobGranuleFileKey(KeyRef const& key);
const KeyRange blobGranuleFileKeyRangeFor(UID granuleID);
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length);
std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value);
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength);
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value);
const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force);
std::tuple<Version, KeyRange, bool> decodeBlobGranulePruneValue(ValueRef const& value);

View File

@ -60,13 +60,14 @@ ACTOR Future<Void> readGranuleFiles(Transaction* tr, Key* startKey, Key endKey,
Standalone<StringRef> filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(it.key);
ASSERT(gid == granuleID);
std::tie(filename, offset, length) = decodeBlobGranuleFileValue(it.value);
std::tie(filename, offset, length, fullFileLength) = decodeBlobGranuleFileValue(it.value);
BlobFileIndex idx(version, filename.toString(), offset, length);
BlobFileIndex idx(version, filename.toString(), offset, length, fullFileLength);
if (fileType == 'S') {
ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version);
files->snapshotFiles.push_back(idx);
@ -168,14 +169,16 @@ void GranuleFiles::getFiles(Version beginVersion,
Version lastIncluded = invalidVersion;
if (snapshotF != snapshotFiles.end()) {
chunk.snapshotVersion = snapshotF->version;
chunk.snapshotFile = BlobFilePointerRef(replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length);
chunk.snapshotFile = BlobFilePointerRef(
replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length, snapshotF->fullFileLength);
lastIncluded = chunk.snapshotVersion;
} else {
chunk.snapshotVersion = invalidVersion;
}
while (deltaF != deltaFiles.end() && deltaF->version < readVersion) {
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
chunk.deltaFiles.emplace_back_deep(
replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength);
deltaBytesCounter += deltaF->length;
ASSERT(lastIncluded < deltaF->version);
lastIncluded = deltaF->version;
@ -183,7 +186,8 @@ void GranuleFiles::getFiles(Version beginVersion,
}
// include last delta file that passes readVersion, if it exists
if (deltaF != deltaFiles.end() && lastIncluded < readVersion) {
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
chunk.deltaFiles.emplace_back_deep(
replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength);
deltaBytesCounter += deltaF->length;
lastIncluded = deltaF->version;
}
@ -194,7 +198,7 @@ static std::string makeTestFileName(Version v) {
}
static BlobFileIndex makeTestFile(Version v, int64_t len) {
return BlobFileIndex(v, makeTestFileName(v), 0, len);
return BlobFileIndex(v, makeTestFileName(v), 0, len, len);
}
static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) {

View File

@ -49,11 +49,12 @@ struct BlobFileIndex {
std::string filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
BlobFileIndex() {}
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length)
: version(version), filename(filename), offset(offset), length(length) {}
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length, int64_t fullFileLength)
: version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength) {}
// compare on version
bool operator<(const BlobFileIndex& r) const { return version < r.version; }

View File

@ -211,6 +211,24 @@ struct SplitEvaluation {
: epoch(epoch), seqno(seqno), inProgress(inProgress) {}
};
struct BlobManagerStats {
CounterCollection cc;
// FIXME: pruning stats
Counter granuleSplits;
Counter granuleWriteHotSplits;
Future<Void> logger;
// Current stats maintained for a given blob worker process
explicit BlobManagerStats(UID id, double interval, std::unordered_map<UID, BlobWorkerInterface>* workers)
: cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc),
granuleWriteHotSplits("GranuleWriteHotSplits", cc) {
specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); });
logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics");
}
};
struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
UID id;
Database db;
@ -218,6 +236,8 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
PromiseStream<Future<Void>> addActor;
Promise<Void> doLockCheck;
BlobManagerStats stats;
Reference<BackupContainerFileSystem> bstore;
std::unordered_map<UID, BlobWorkerInterface> workersById;
@ -246,8 +266,9 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
PromiseStream<RangeAssignment> rangesToAssign;
BlobManagerData(UID id, Database db, Optional<Key> dcId)
: id(id), db(db), dcId(dcId), knownBlobRanges(false, normalKeys.end),
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {}
: id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById),
knownBlobRanges(false, normalKeys.end), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY),
recruitingStream(0) {}
};
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<BlobManagerData> bmData,
@ -753,6 +774,7 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
}
for (KeyRangeRef range : rangesToRemove) {
TraceEvent("ClientBlobRangeRemoved", bmData->id).detail("Range", range);
if (BM_DEBUG) {
fmt::print(
"BM Got range to revoke [{0} - {1})\n", range.begin.printable(), range.end.printable());
@ -768,6 +790,7 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
state std::vector<Future<Standalone<VectorRef<KeyRef>>>> splitFutures;
// Divide new ranges up into equal chunks by using SS byte sample
for (KeyRangeRef range : rangesToAdd) {
TraceEvent("ClientBlobRangeAdded", bmData->id).detail("Range", range);
splitFutures.push_back(splitRange(bmData, range, false));
}
@ -1096,6 +1119,11 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
splitVersion);
}
++bmData->stats.granuleSplits;
if (writeHot) {
++bmData->stats.granuleWriteHotSplits;
}
// transaction committed, send range assignments
// range could have been moved since split eval started, so just revoke from whoever has it
RangeAssignment raRevoke;
@ -1182,6 +1210,8 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
// Remove it from workersById also since otherwise that worker addr will remain excluded
// when we try to recruit new blob workers.
TraceEvent("KillBlobWorker", bmData->id).detail("WorkerId", bwId);
if (registered) {
bmData->deadWorkers.insert(bwId);
bmData->workerStats.erase(bwId);
@ -1581,6 +1611,7 @@ static void addAssignment(KeyRangeMap<std::tuple<UID, int64_t, int64_t>>& map,
}
ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
state double recoveryStartTime = now();
state Promise<Void> workerListReady;
bmData->addActor.send(checkBlobWorkerList(bmData, workerListReady));
wait(workerListReady.getFuture());
@ -1836,7 +1867,8 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
TraceEvent("BlobManagerRecovered", bmData->id)
.detail("Epoch", bmData->epoch)
.detail("Granules", bmData->workerAssignments.size())
.detail("Duration", now() - recoveryStartTime)
.detail("Granules", bmData->workerAssignments.size()) // TODO this includes un-set ranges, so it is inaccurate
.detail("Assigned", explicitAssignments)
.detail("Revoked", outOfDateAssignments.size());
@ -2087,6 +2119,8 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobManagerData> bmData, U
}
}
// FIXME: trace events for pruning
/*
* Deletes all files pertaining to the granule with id granuleId and
* also removes the history entry for this granule from the system keyspace
@ -2506,7 +2540,7 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
if (BM_DEBUG) {
fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str());
}
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BM_DEBUG) {
printf("BM constructed backup container\n");
}

View File

@ -206,6 +206,7 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
if (BW_DEBUG) {
fmt::print("BW {0} found new manager epoch {1}\n", id.toString(), currentManagerEpoch);
}
TraceEvent(SevDebug, "BlobWorkerFoundNewManager", id).detail("Epoch", epoch);
}
return true;
@ -511,7 +512,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
numIterations++;
Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D');
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize);
// TODO change once we support file multiplexing
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize);
tr->set(dfKey, dfValue);
if (oldGranuleComplete.present()) {
@ -538,7 +540,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
if (BUGGIFY_WITH_PROB(0.01)) {
wait(delay(deterministicRandom()->random01()));
}
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize);
// FIXME: change when we implement multiplexing
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize);
} catch (Error& e) {
wait(tr->onError(e));
}
@ -648,7 +651,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno));
numIterations++;
Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, version, 'S');
Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize);
// TODO change once we support file multiplexing
Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize);
tr->set(snapshotFileKey, snapshotFileValue);
// create granule history at version if this is a new granule with the initial dump from FDB
if (createGranuleHistory) {
@ -692,7 +696,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
wait(delay(deterministicRandom()->random01()));
}
return BlobFileIndex(version, fname, 0, serializedSize);
// FIXME: change when we implement multiplexing
return BlobFileIndex(version, fname, 0, serializedSize, serializedSize);
}
ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData> bwData,
@ -731,7 +736,7 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
Future<Void> streamFuture =
tr->getTransaction().getRangeStream(rowsStream, metadata->keyRange, GetRangeLimits(), Snapshot::True);
wait(streamFuture && success(snapshotWriter));
TraceEvent("BlobGranuleSnapshotFile", bwData->id)
TraceEvent(SevDebug, "BlobGranuleSnapshotFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", readVersion);
DEBUG_KEY_RANGE("BlobWorkerFDBSnapshot", readVersion, metadata->keyRange, bwData->id);
@ -755,7 +760,8 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
wait(tr->onError(e));
retries++;
TEST(true); // Granule initial snapshot failed
TraceEvent(SevWarn, "BlobGranuleInitialSnapshotRetry", bwData->id)
// FIXME: why can't we supress error event?
TraceEvent(retries < 10 ? SevDebug : SevWarn, "BlobGranuleInitialSnapshotRetry", bwData->id)
.error(err)
.detail("Granule", metadata->keyRange)
.detail("Count", retries);
@ -797,7 +803,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
ASSERT(snapshotVersion < version);
chunk.snapshotFile = BlobFilePointerRef(filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length);
chunk.snapshotFile = BlobFilePointerRef(
filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length, snapshotF.fullFileLength);
compactBytesRead += snapshotF.length;
int deltaIdx = files.deltaFiles.size() - 1;
while (deltaIdx >= 0 && files.deltaFiles[deltaIdx].version > snapshotVersion) {
@ -807,7 +814,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
Version lastDeltaVersion = invalidVersion;
while (deltaIdx < files.deltaFiles.size() && files.deltaFiles[deltaIdx].version <= version) {
BlobFileIndex deltaF = files.deltaFiles[deltaIdx];
chunk.deltaFiles.emplace_back_deep(filenameArena, deltaF.filename, deltaF.offset, deltaF.length);
chunk.deltaFiles.emplace_back_deep(
filenameArena, deltaF.filename, deltaF.offset, deltaF.length, deltaF.fullFileLength);
compactBytesRead += deltaF.length;
lastDeltaVersion = files.deltaFiles[deltaIdx].version;
deltaIdx++;
@ -877,7 +885,7 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
metadata->bytesInNewDeltaFiles);
}
TraceEvent("BlobGranuleSnapshotCheck", bwData->id)
TraceEvent(SevDebug, "BlobGranuleSnapshotCheck", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", reSnapshotVersion);
@ -954,7 +962,7 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
metadata->keyRange.end.printable(),
bytesInNewDeltaFiles);
}
TraceEvent("BlobGranuleSnapshotFile", bwData->id)
TraceEvent(SevDebug, "BlobGranuleSnapshotFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", metadata->durableDeltaVersion.get());
@ -1534,7 +1542,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
bwData->id.toString().substr(0, 5).c_str(),
deltas.version,
rollbackVersion);
TraceEvent(SevWarn, "GranuleRollback", bwData->id)
TraceEvent(SevDebug, "GranuleRollback", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", deltas.version)
.detail("RollbackVersion", rollbackVersion);
@ -1648,7 +1656,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
lastDeltaVersion,
oldChangeFeedDataComplete.present() ? ". Finalizing " : "");
}
TraceEvent("BlobGranuleDeltaFile", bwData->id)
TraceEvent(SevDebug, "BlobGranuleDeltaFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", lastDeltaVersion);
@ -1825,13 +1833,13 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
if (e.code() == error_code_granule_assignment_conflict) {
TraceEvent(SevInfo, "GranuleAssignmentConflict", bwData->id)
TraceEvent("GranuleAssignmentConflict", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("GranuleID", startState.granuleID);
return Void();
}
if (e.code() == error_code_change_feed_popped) {
TraceEvent(SevInfo, "GranuleGotChangeFeedPopped", bwData->id)
TraceEvent("GranuleChangeFeedPopped", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("GranuleID", startState.granuleID);
return Void();
@ -2573,7 +2581,16 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
info.changeFeedStartVersion = tr.getCommittedVersion();
}
TraceEvent("GranuleOpen", bwData->id).detail("Granule", req.keyRange);
TraceEvent openEv("GranuleOpen", bwData->id);
openEv.detail("GranuleID", info.granuleID)
.detail("Granule", req.keyRange)
.detail("Epoch", req.managerEpoch)
.detail("Seqno", req.managerSeqno)
.detail("CFStartVersion", info.changeFeedStartVersion)
.detail("PreviousDurableVersion", info.previousDurableVersion);
if (info.parentGranule.present()) {
openEv.detail("ParentGranuleID", info.parentGranule.get().second);
}
return info;
} catch (Error& e) {
@ -2894,6 +2911,7 @@ ACTOR Future<Void> handleRangeRevoke(Reference<BlobWorkerData> bwData, RevokeBlo
ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWorkerInterface interf) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
TraceEvent("BlobWorkerRegister", bwData->id);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
@ -2914,6 +2932,7 @@ ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWork
if (BW_DEBUG) {
fmt::print("Registered blob worker {}\n", interf.id().toString());
}
TraceEvent("BlobWorkerRegistered", bwData->id);
return Void();
} catch (Error& e) {
if (BW_DEBUG) {
@ -3021,7 +3040,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
if (BW_DEBUG) {
fmt::print("BW constructing backup container from {0}\n", SERVER_KNOBS->BG_URL);
}
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BW_DEBUG) {
printf("BW constructed backup container\n");
}

View File

@ -47,7 +47,8 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
RestoreRequest request);
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
std::vector<RestoreFileFR>* pRangeFiles,
Key url);
Key url,
Optional<std::string> proxy);
ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerData> self,
Database cx,
@ -317,7 +318,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
state std::vector<RestoreFileFR> allFiles;
state Version minRangeVersion = MAX_VERSION;
self->initBackupContainer(request.url);
self->initBackupContainer(request.url, request.proxy);
// Get all backup files' description and save them to files
state Version targetVersion =
@ -334,7 +335,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
// Build range versions: version of key ranges in range file
state KeyRangeMap<Version> rangeVersions(minRangeVersion, allKeys.end);
if (SERVER_KNOBS->FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE) {
wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url));
wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url, request.proxy));
} else {
// Debug purpose, dump range versions
auto ranges = rangeVersions.ranges();
@ -881,13 +882,14 @@ ACTOR static Future<Void> insertRangeVersion(KeyRangeMap<Version>* pRangeVersion
// Expensive and slow operation that should not run in real prod.
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
std::vector<RestoreFileFR>* pRangeFiles,
Key url) {
Key url,
Optional<std::string> proxy) {
if (!g_network->isSimulated()) {
TraceEvent(SevError, "ExpensiveBuildRangeVersions")
.detail("Reason", "Parsing all range files is slow and memory intensive");
return Void();
}
Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString(), proxy, {});
// Key ranges not in range files are empty;
// Assign highest version to avoid applying any mutation in these ranges

View File

@ -446,13 +446,15 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
}
}
void initBackupContainer(Key url) {
void initBackupContainer(Key url, Optional<std::string> proxy) {
if (bcUrl == url && bc.isValid()) {
return;
}
TraceEvent("FastRestoreControllerInitBackupContainer").detail("URL", url);
TraceEvent("FastRestoreControllerInitBackupContainer")
.detail("URL", url)
.detail("Proxy", proxy.present() ? proxy.get() : "");
bcUrl = url;
bc = IBackupContainer::openContainer(url.toString());
bc = IBackupContainer::openContainer(url.toString(), proxy, {});
}
};

View File

@ -262,7 +262,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf,
when(RestoreLoadFileRequest req = waitNext(loaderInterf.loadFile.getFuture())) {
requestTypeStr = "loadFile";
hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty();
self->initBackupContainer(req.param.url);
self->initBackupContainer(req.param.url, req.param.proxy);
self->loadingQueue.push(req);
if (!hasQueuedRequests) {
self->hasPendingRequests->set(true);

View File

@ -226,12 +226,12 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
finishedBatch = NotifiedVersion(0);
}
void initBackupContainer(Key url) {
void initBackupContainer(Key url, Optional<std::string> proxy) {
if (bcUrl == url && bc.isValid()) {
return;
}
bcUrl = url;
bc = IBackupContainer::openContainer(url.toString());
bc = IBackupContainer::openContainer(url.toString(), proxy, {});
}
};

View File

@ -368,6 +368,7 @@ struct LoadingParam {
bool isRangeFile;
Key url;
Optional<std::string> proxy;
Optional<Version> rangeVersion; // range file's version
int64_t blockSize;
@ -386,12 +387,13 @@ struct LoadingParam {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset);
serializer(ar, isRangeFile, url, proxy, rangeVersion, blockSize, asset);
}
std::string toString() const {
std::stringstream str;
str << "isRangeFile:" << isRangeFile << " url:" << url.toString()
<< " proxy:" << (proxy.present() ? proxy.get() : "")
<< " rangeVersion:" << (rangeVersion.present() ? rangeVersion.get() : -1) << " blockSize:" << blockSize
<< " RestoreAsset:" << asset.toString();
return str.str();

View File

@ -858,7 +858,7 @@ public:
CounterCollection cc;
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getMappedRangeQueries,
getRangeStreamQueries, finishedQueries, lowPriorityQueries, rowsQueried, bytesQueried, watchQueries,
emptyQueries, feedRowsQueried, feedBytesQueried;
emptyQueries, feedRowsQueried, feedBytesQueried, feedStreamQueries, feedVersionQueries;
// Bytes of the mutations that have been added to the memory of the storage server. When the data is durable
// and cleared from the memory, we do not subtract it but add it to bytesDurable.
@ -930,6 +930,7 @@ public:
lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc),
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
feedRowsQueried("FeedRowsQueried", cc), feedBytesQueried("FeedBytesQueried", cc),
feedStreamQueries("FeedStreamQueries", cc), feedVersionQueries("FeedVersionQueries", cc),
bytesInput("BytesInput", cc), logicalBytesInput("LogicalBytesInput", cc),
logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc),
kvCommitLogicalBytes("KVCommitLogicalBytes", cc), kvClearRanges("KVClearRanges", cc),
@ -2436,6 +2437,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES));
}
++data->counters.feedStreamQueries;
wait(delay(0, TaskPriority::DefaultEndpoint));
try {
@ -2587,6 +2590,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
}
ACTOR Future<Void> changeFeedVersionUpdateQ(StorageServer* data, ChangeFeedVersionUpdateRequest req) {
++data->counters.feedVersionQueries;
wait(data->version.whenAtLeast(req.minVersion));
wait(delay(0));
Version minVersion = data->minFeedVersionForAddress(req.reply.getEndpoint().getPrimaryAddress());

View File

@ -93,6 +93,7 @@ struct AtomicRestoreWorkload : TestWorkload {
try {
wait(backupAgent.submitBackup(cx,
StringRef(backupContainer),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
BackupAgentBase::getDefaultTagName(),

View File

@ -222,6 +222,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
try {
wait(backupAgent->submitBackup(cx,
StringRef(backupContainer),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
tag.toString(),
@ -377,6 +378,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
cx,
self->backupTag,
KeyRef(lastBackupContainer),
{},
WaitForComplete::True,
::invalidVersion,
Verbose::True,
@ -478,6 +480,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
// the configuration to disable backup workers before restore.
extraBackup = backupAgent.submitBackup(cx,
LiteralStringRef("file://simfdb/backups/"),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
@ -523,7 +526,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("BackupTag", printable(self->backupTag));
// start restoring
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
auto container =
IBackupContainer::openContainer(lastBackupContainer->getURL(), lastBackupContainer->getProxy(), {});
BackupDescription desc = wait(container->describeBackup());
ASSERT(self->usePartitionedLogs == desc.partitioned);
ASSERT(desc.minRestorableVersion.present()); // We must have a valid backup now.
@ -566,6 +570,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
self->backupTag,
self->backupRanges,
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
targetVersion,
self->locked,
randomID,

View File

@ -266,6 +266,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
try {
wait(backupAgent->submitBackup(cx,
StringRef(backupContainer),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
tag.toString(),
@ -423,6 +424,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
self->backupTag,
KeyRef(lastBackupContainer),
{},
WaitForComplete::True,
::invalidVersion,
Verbose::True,
@ -523,6 +525,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
try {
extraBackup = backupAgent.submitBackup(cx,
"file://simfdb/backups/"_sr,
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
@ -557,7 +560,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
.detail("RestoreAfter", self->restoreAfter)
.detail("BackupTag", printable(self->backupTag));
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL(),
lastBackupContainer->getProxy(),
lastBackupContainer->getEncryptionKeyFileName());
BackupDescription desc = wait(container->describeBackup());
Version targetVersion = -1;
@ -593,6 +598,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
restoreTag,
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
WaitForComplete::True,
targetVersion,
Verbose::True,
@ -616,6 +622,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
restoreTag,
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
self->restoreRanges,
WaitForComplete::True,
targetVersion,
@ -646,6 +653,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
restoreTags[restoreIndex],
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
self->restoreRanges,
WaitForComplete::True,
::invalidVersion,
@ -675,6 +683,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
restoreTags[restoreIndex],
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
WaitForComplete::True,
::invalidVersion,
Verbose::True,

View File

@ -62,6 +62,7 @@ struct BackupToBlobWorkload : TestWorkload {
wait(delay(self->backupAfter));
wait(backupAgent.submitBackup(cx,
self->backupURL,
{},
self->initSnapshotInterval,
self->snapshotInterval,
self->backupTag.toString(),

View File

@ -250,13 +250,13 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
if (BGW_DEBUG) {
printf("Blob Granule Correctness constructing simulated backup container\n");
}
self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/");
self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/", {}, {});
} else {
if (BGW_DEBUG) {
printf("Blob Granule Correctness constructing backup container from %s\n",
SERVER_KNOBS->BG_URL.c_str());
}
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BGW_DEBUG) {
printf("Blob Granule Correctness constructed backup container\n");
}

View File

@ -90,13 +90,13 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
if (BGV_DEBUG) {
printf("Blob Granule Verifier constructing simulated backup container\n");
}
bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/");
bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/", {}, {});
} else {
if (BGV_DEBUG) {
printf("Blob Granule Verifier constructing backup container from %s\n",
SERVER_KNOBS->BG_URL.c_str());
}
bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BGV_DEBUG) {
printf("Blob Granule Verifier constructed backup container\n");
}

View File

@ -98,12 +98,12 @@ struct IncrementalBackupWorkload : TestWorkload {
if (!backupContainer.isValid()) {
TraceEvent("IBackupCheckListContainersAttempt").log();
state std::vector<std::string> containers =
wait(IBackupContainer::listContainers(self->backupDir.toString()));
wait(IBackupContainer::listContainers(self->backupDir.toString(), {}));
TraceEvent("IBackupCheckListContainersSuccess")
.detail("Size", containers.size())
.detail("First", containers.front());
if (containers.size()) {
backupContainer = IBackupContainer::openContainer(containers.front());
backupContainer = IBackupContainer::openContainer(containers.front(), {}, {});
}
}
state bool e = wait(backupContainer->exists());
@ -152,6 +152,7 @@ struct IncrementalBackupWorkload : TestWorkload {
try {
wait(self->backupAgent.submitBackup(cx,
self->backupDir,
{},
0,
1e8,
self->tag.toString(),
@ -219,7 +220,7 @@ struct IncrementalBackupWorkload : TestWorkload {
}
TraceEvent("IBackupStartListContainersAttempt").log();
state std::vector<std::string> containers =
wait(IBackupContainer::listContainers(self->backupDir.toString()));
wait(IBackupContainer::listContainers(self->backupDir.toString(), {}));
TraceEvent("IBackupStartListContainersSuccess")
.detail("Size", containers.size())
.detail("First", containers.front());
@ -229,6 +230,7 @@ struct IncrementalBackupWorkload : TestWorkload {
cx,
Key(self->tag.toString()),
backupURL,
{},
WaitForComplete::True,
invalidVersion,
Verbose::True,

View File

@ -114,6 +114,7 @@ struct RestoreBackupWorkload final : TestWorkload {
cx,
self->tag,
Key(self->backupContainer->getURL()),
self->backupContainer->getProxy(),
WaitForComplete::True,
::invalidVersion,
Verbose::True)));

View File

@ -61,8 +61,8 @@ struct RestoreFromBlobWorkload : TestWorkload {
restoreRanges.push_back_deep(restoreRanges.arena(), normalKeys);
wait(delay(self->restoreAfter));
Version v =
wait(backupAgent.restore(cx, {}, self->backupTag, self->backupURL, restoreRanges, self->waitForComplete));
Version v = wait(
backupAgent.restore(cx, {}, self->backupTag, self->backupURL, {}, restoreRanges, self->waitForComplete));
return Void();
}

View File

@ -57,6 +57,7 @@ struct SubmitBackupWorkload final : TestWorkload {
try {
wait(self->backupAgent.submitBackup(cx,
self->backupDir,
{},
self->initSnapshotInterval,
self->snapshotInterval,
self->tag.toString(),