Merge pull request #4233 from sfc-gh-tclinkenbeard/encrypt-backup-files

Added AsyncFileEncrypted
This commit is contained in:
Trevor Clinkenbeard 2021-07-07 13:28:28 -07:00 committed by GitHub
commit f5ade03538
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1095 additions and 144 deletions

View File

@ -133,6 +133,7 @@ enum {
OPT_WAITFORDONE,
OPT_BACKUPKEYS_FILTER,
OPT_INCREMENTALONLY,
OPT_ENCRYPTION_KEY_FILE,
// Backup Modify
OPT_MOD_ACTIVE_INTERVAL,
@ -259,6 +260,7 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
{ OPT_INCREMENTALONLY, "--incremental", SO_NONE },
{ OPT_ENCRYPTION_KEY_FILE, "--encryption_key_file", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -697,6 +699,7 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
{ OPT_INCREMENTALONLY, "--incremental", SO_NONE },
{ OPT_RESTORE_BEGIN_VERSION, "--begin_version", SO_REQ_SEP },
{ OPT_RESTORE_INCONSISTENT_SNAPSHOT_ONLY, "--inconsistent_snapshot_only", SO_NONE },
{ OPT_ENCRYPTION_KEY_FILE, "--encryption_key_file", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
@ -1089,6 +1092,8 @@ static void printBackupUsage(bool devhelp) {
" Performs incremental backup without the base backup.\n"
" This option indicates to the backup agent that it will only need to record the log files, "
"and ignore the range files.\n");
printf(" --encryption_key_file"
" The AES-128-GCM key in the provided file is used for encrypting backup files.\n");
#ifndef TLS_DISABLED
printf(TLS_HELP);
#endif
@ -1162,6 +1167,8 @@ static void printRestoreUsage(bool devhelp) {
" To be used in conjunction with incremental restore.\n"
" Indicates to the backup agent to only begin replaying log files from a certain version, "
"instead of the entire set.\n");
printf(" --encryption_key_file"
" The AES-128-GCM key in the provided file is used for decrypting backup files.\n");
#ifndef TLS_DISABLED
printf(TLS_HELP);
#endif
@ -2220,7 +2227,9 @@ ACTOR Future<Void> changeDBBackupResumed(Database src, Database dest, bool pause
return Void();
}
Reference<IBackupContainer> openBackupContainer(const char* name, std::string destinationContainer) {
Reference<IBackupContainer> openBackupContainer(const char* name,
std::string destinationContainer,
Optional<std::string> const& encryptionKeyFile = {}) {
// Error, if no dest container was specified
if (destinationContainer.empty()) {
fprintf(stderr, "ERROR: No backup destination was specified.\n");
@ -2230,7 +2239,7 @@ Reference<IBackupContainer> openBackupContainer(const char* name, std::string de
Reference<IBackupContainer> c;
try {
c = IBackupContainer::openContainer(destinationContainer);
c = IBackupContainer::openContainer(destinationContainer, encryptionKeyFile);
} catch (Error& e) {
std::string msg = format("ERROR: '%s' on URL '%s'", e.what(), destinationContainer.c_str());
if (e.code() == error_code_backup_invalid_url && !IBackupContainer::lastOpenError.empty()) {
@ -2259,8 +2268,9 @@ ACTOR Future<Void> runRestore(Database db,
bool waitForDone,
std::string addPrefix,
std::string removePrefix,
bool onlyAppyMutationLogs,
bool inconsistentSnapshotOnly) {
bool onlyApplyMutationLogs,
bool inconsistentSnapshotOnly,
Optional<std::string> encryptionKeyFile) {
if (ranges.empty()) {
ranges.push_back_deep(ranges.arena(), normalKeys);
}
@ -2296,7 +2306,8 @@ ACTOR Future<Void> runRestore(Database db,
try {
state FileBackupAgent backupAgent;
state Reference<IBackupContainer> bc = openBackupContainer(exeRestore.toString().c_str(), container);
state Reference<IBackupContainer> bc =
openBackupContainer(exeRestore.toString().c_str(), container, encryptionKeyFile);
// If targetVersion is unset then use the maximum restorable version from the backup description
if (targetVersion == invalidVersion) {
@ -2306,7 +2317,7 @@ ACTOR Future<Void> runRestore(Database db,
BackupDescription desc = wait(bc->describeBackup());
if (onlyAppyMutationLogs && desc.contiguousLogEnd.present()) {
if (onlyApplyMutationLogs && desc.contiguousLogEnd.present()) {
targetVersion = desc.contiguousLogEnd.get() - 1;
} else if (desc.maxRestorableVersion.present()) {
targetVersion = desc.maxRestorableVersion.get();
@ -2331,9 +2342,10 @@ ACTOR Future<Void> runRestore(Database db,
KeyRef(addPrefix),
KeyRef(removePrefix),
true,
onlyAppyMutationLogs,
onlyApplyMutationLogs,
inconsistentSnapshotOnly,
beginVersion));
beginVersion,
encryptionKeyFile));
if (waitForDone && verbose) {
// If restore is now complete then report version restored
@ -2512,7 +2524,8 @@ ACTOR Future<Void> expireBackupData(const char* name,
Database db,
bool force,
Version restorableAfterVersion,
std::string restorableAfterDatetime) {
std::string restorableAfterDatetime,
Optional<std::string> encryptionKeyFile) {
if (!endDatetime.empty()) {
Version v = wait(timeKeeperVersionFromDatetime(endDatetime, db));
endVersion = v;
@ -2531,7 +2544,7 @@ ACTOR Future<Void> expireBackupData(const char* name,
}
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, encryptionKeyFile);
state IBackupContainer::ExpireProgress progress;
state std::string lastProgress;
@ -2613,9 +2626,10 @@ ACTOR Future<Void> describeBackup(const char* name,
std::string destinationContainer,
bool deep,
Optional<Database> cx,
bool json) {
bool json,
Optional<std::string> encryptionKeyFile) {
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, encryptionKeyFile);
state BackupDescription desc = wait(c->describeBackup(deep));
if (cx.present())
wait(desc.resolveVersionTimes(cx.get()));
@ -3248,7 +3262,7 @@ int main(int argc, char* argv[]) {
bool stopWhenDone = true;
bool usePartitionedLog = false; // Set to true to use new backup system
bool incrementalBackupOnly = false;
bool onlyAppyMutationLogs = false;
bool onlyApplyMutationLogs = false;
bool inconsistentSnapshotOnly = false;
bool forceAction = false;
bool trace = false;
@ -3272,6 +3286,7 @@ int main(int argc, char* argv[]) {
std::string restoreClusterFileOrig;
bool jsonOutput = false;
bool deleteData = false;
Optional<std::string> encryptionKeyFile;
BackupModifyOptions modifyOptions;
@ -3513,7 +3528,10 @@ int main(int argc, char* argv[]) {
break;
case OPT_INCREMENTALONLY:
incrementalBackupOnly = true;
onlyAppyMutationLogs = true;
onlyApplyMutationLogs = true;
break;
case OPT_ENCRYPTION_KEY_FILE:
encryptionKeyFile = args->OptionArg();
break;
case OPT_RESTORECONTAINER:
restoreContainer = args->OptionArg();
@ -3853,7 +3871,7 @@ int main(int argc, char* argv[]) {
if (!initCluster())
return FDB_EXIT_ERROR;
// Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable.
openBackupContainer(argv[0], destinationContainer);
openBackupContainer(argv[0], destinationContainer, encryptionKeyFile);
f = stopAfter(submitBackup(db,
destinationContainer,
initialSnapshotIntervalSeconds,
@ -3932,7 +3950,8 @@ int main(int argc, char* argv[]) {
db,
forceAction,
expireRestorableAfterVersion,
expireRestorableAfterDatetime));
expireRestorableAfterDatetime,
encryptionKeyFile));
break;
case BackupType::DELETE_BACKUP:
@ -3952,7 +3971,8 @@ int main(int argc, char* argv[]) {
destinationContainer,
describeDeep,
describeTimestamps ? Optional<Database>(db) : Optional<Database>(),
jsonOutput));
jsonOutput,
encryptionKeyFile));
break;
case BackupType::LIST:
@ -4033,8 +4053,9 @@ int main(int argc, char* argv[]) {
waitForDone,
addPrefix,
removePrefix,
onlyAppyMutationLogs,
inconsistentSnapshotOnly));
onlyApplyMutationLogs,
inconsistentSnapshotOnly,
encryptionKeyFile));
break;
case RestoreType::WAIT:
f = stopAfter(success(ba.waitRestore(db, KeyRef(tagName), true)));

View File

@ -256,7 +256,7 @@ public:
m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) {
// Add first part
m_parts.push_back(Reference<Part>(new Part(1, m_bstore->knobs.multipart_min_part_size)));
m_parts.push_back(makeReference<Part>(1, m_bstore->knobs.multipart_min_part_size));
}
};

View File

@ -83,6 +83,6 @@ TEST_CASE("/asynctaskthread/add") {
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, 100));
}
wait(waitForAll(clients));
ASSERT(sum == 1000);
ASSERT_EQ(sum, 1000);
return Void();
}

View File

@ -164,20 +164,21 @@ public:
Key url,
Standalone<VectorRef<KeyRangeRef>> ranges,
bool waitForComplete = true,
Version targetVersion = -1,
Version targetVersion = ::invalidVersion,
bool verbose = true,
Key addPrefix = Key(),
Key removePrefix = Key(),
bool lockDB = true,
bool onlyAppyMutationLogs = false,
bool inconsistentSnapshotOnly = false,
Version beginVersion = -1);
Version beginVersion = ::invalidVersion,
Optional<std::string> const& encryptionKeyFileName = {});
Future<Version> restore(Database cx,
Optional<Database> cxOrig,
Key tagName,
Key url,
bool waitForComplete = true,
Version targetVersion = -1,
Version targetVersion = ::invalidVersion,
bool verbose = true,
KeyRange range = normalKeys,
Key addPrefix = Key(),
@ -185,7 +186,8 @@ public:
bool lockDB = true,
bool onlyAppyMutationLogs = false,
bool inconsistentSnapshotOnly = false,
Version beginVersion = -1) {
Version beginVersion = ::invalidVersion,
Optional<std::string> const& encryptionKeyFileName = {}) {
Standalone<VectorRef<KeyRangeRef>> rangeRef;
rangeRef.push_back_deep(rangeRef.arena(), range);
return restore(cx,
@ -201,7 +203,8 @@ public:
lockDB,
onlyAppyMutationLogs,
inconsistentSnapshotOnly,
beginVersion);
beginVersion,
encryptionKeyFileName);
}
Future<Version> atomicRestore(Database cx,
Key tagName,
@ -237,20 +240,22 @@ public:
Key outContainer,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
std::string const& tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
bool stopWhenDone = true,
bool partitionedLog = false,
bool incrementalBackupOnly = false);
bool incrementalBackupOnly = false,
Optional<std::string> const& encryptionKeyFileName = {});
Future<Void> submitBackup(Database cx,
Key outContainer,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
std::string const& tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
bool stopWhenDone = true,
bool partitionedLog = false,
bool incrementalBackupOnly = false) {
bool incrementalBackupOnly = false,
Optional<std::string> const& encryptionKeyFileName = {}) {
return runRYWTransactionFailIfLocked(cx, [=](Reference<ReadYourWritesTransaction> tr) {
return submitBackup(tr,
outContainer,
@ -260,7 +265,8 @@ public:
backupRanges,
stopWhenDone,
partitionedLog,
incrementalBackupOnly);
incrementalBackupOnly,
encryptionKeyFileName);
});
}

View File

@ -58,6 +58,7 @@ ACTOR Future<Void> appendStringRefWithLen(Reference<IBackupFile> file, Standalon
wait(file->append(s.begin(), s.size()));
return Void();
}
} // namespace IBackupFile_impl
Future<Void> IBackupFile::appendStringRefWithLen(Standalone<StringRef> s) {
@ -253,7 +254,8 @@ std::vector<std::string> IBackupContainer::getURLFormats() {
}
// Get an IBackupContainer based on a container URL string
Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& url) {
Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& url,
Optional<std::string> const& encryptionKeyFileName) {
static std::map<std::string, Reference<IBackupContainer>> m_cache;
Reference<IBackupContainer>& r = m_cache[url];
@ -262,9 +264,9 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
try {
StringRef u(url);
if (u.startsWith(LiteralStringRef("file://"))) {
r = Reference<IBackupContainer>(new BackupContainerLocalDirectory(url));
} else if (u.startsWith(LiteralStringRef("blobstore://"))) {
if (u.startsWith("file://"_sr)) {
r = makeReference<BackupContainerLocalDirectory>(url, encryptionKeyFileName);
} else if (u.startsWith("blobstore://"_sr)) {
std::string resource;
// The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options.
@ -277,15 +279,16 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
for (auto c : resource)
if (!isalnum(c) && c != '_' && c != '-' && c != '.' && c != '/')
throw backup_invalid_url();
r = Reference<IBackupContainer>(new BackupContainerS3BlobStore(bstore, resource, backupParams));
r = makeReference<BackupContainerS3BlobStore>(bstore, resource, backupParams, encryptionKeyFileName);
}
#ifdef BUILD_AZURE_BACKUP
else if (u.startsWith(LiteralStringRef("azure://"))) {
u.eat(LiteralStringRef("azure://"));
auto address = NetworkAddress::parse(u.eat(LiteralStringRef("/")).toString());
auto containerName = u.eat(LiteralStringRef("/")).toString();
auto accountName = u.eat(LiteralStringRef("/")).toString();
r = Reference<IBackupContainer>(new BackupContainerAzureBlobStore(address, containerName, accountName));
else if (u.startsWith("azure://"_sr)) {
u.eat("azure://"_sr);
auto address = NetworkAddress::parse(u.eat("/"_sr).toString());
auto containerName = u.eat("/"_sr).toString();
auto accountName = u.eat("/"_sr).toString();
r = makeReference<BackupContainerAzureBlobStore>(
address, containerName, accountName, encryptionKeyFileName);
}
#endif
else {
@ -315,10 +318,10 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL) {
try {
StringRef u(baseURL);
if (u.startsWith(LiteralStringRef("file://"))) {
if (u.startsWith("file://"_sr)) {
std::vector<std::string> results = wait(BackupContainerLocalDirectory::listURLs(baseURL));
return results;
} else if (u.startsWith(LiteralStringRef("blobstore://"))) {
} else if (u.startsWith("blobstore://"_sr)) {
std::string resource;
S3BlobStoreEndpoint::ParametersT backupParams;
@ -333,14 +336,14 @@ ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL)
}
// Create a dummy container to parse the backup-specific parameters from the URL and get a final bucket name
BackupContainerS3BlobStore dummy(bstore, "dummy", backupParams);
BackupContainerS3BlobStore dummy(bstore, "dummy", backupParams, {});
std::vector<std::string> results = wait(BackupContainerS3BlobStore::listURLs(bstore, dummy.getBucket()));
return results;
}
// TODO: Enable this when Azure backups are ready
/*
else if (u.startsWith(LiteralStringRef("azure://"))) {
else if (u.startsWith("azure://"_sr)) {
std::vector<std::string> results = wait(BackupContainerAzureBlobStore::listURLs(baseURL));
return results;
}

View File

@ -293,7 +293,8 @@ public:
Version beginVersion = -1) = 0;
// Get an IBackupContainer based on a container spec string
static Reference<IBackupContainer> openContainer(const std::string& url);
static Reference<IBackupContainer> openContainer(const std::string& url,
const Optional<std::string>& encryptionKeyFileName = {});
static std::vector<std::string> getURLFormats();
static Future<std::vector<std::string>> listContainers(const std::string& baseURL);

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/BackupContainerAzureBlobStore.h"
#include "fdbrpc/AsyncFileEncrypted.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -167,8 +168,12 @@ public:
if (!exists) {
throw file_not_found();
}
return Reference<IAsyncFile>(
new ReadFile(self->asyncTaskThread, self->containerName, fileName, self->client.get()));
Reference<IAsyncFile> f =
makeReference<ReadFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
if (self->usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, false);
}
return f;
}
ACTOR static Future<Reference<IBackupFile>> writeFile(BackupContainerAzureBlobStore* self, std::string fileName) {
@ -177,10 +182,11 @@ public:
auto outcome = client->create_append_blob(containerName, fileName).get();
return Void();
}));
return Reference<IBackupFile>(
new BackupFile(fileName,
Reference<IAsyncFile>(new WriteFile(
self->asyncTaskThread, self->containerName, fileName, self->client.get()))));
auto f = makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
if (self->usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, true);
}
return makeReference<BackupFile>(fileName, f);
}
static void listFiles(AzureClient* client,
@ -213,6 +219,16 @@ public:
}
return Void();
}
ACTOR static Future<Void> create(BackupContainerAzureBlobStore* self) {
state Future<Void> f1 =
self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
client->create_container(containerName).wait();
return Void();
});
state Future<Void> f2 = self->usesEncryption() ? self->encryptionSetupComplete() : Void();
return f1 && f2;
}
};
Future<bool> BackupContainerAzureBlobStore::blobExists(const std::string& fileName) {
@ -225,10 +241,11 @@ Future<bool> BackupContainerAzureBlobStore::blobExists(const std::string& fileNa
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore(const NetworkAddress& address,
const std::string& accountName,
const std::string& containerName)
const std::string& containerName,
const Optional<std::string>& encryptionKeyFileName)
: containerName(containerName) {
setEncryptionKey(encryptionKeyFileName);
std::string accountKey = std::getenv("AZURE_KEY");
auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(accountName, accountKey);
auto storageAccount = std::make_shared<azure::storage_lite::storage_account>(
accountName, credential, false, format("http://%s/%s", address.toString().c_str(), accountName.c_str()));
@ -244,10 +261,7 @@ void BackupContainerAzureBlobStore::delref() {
}
Future<Void> BackupContainerAzureBlobStore::create() {
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
client->create_container(containerName).wait();
return Void();
});
return BackupContainerAzureBlobStoreImpl::create(this);
}
Future<bool> BackupContainerAzureBlobStore::exists() {
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {

View File

@ -44,7 +44,8 @@ class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
public:
BackupContainerAzureBlobStore(const NetworkAddress& address,
const std::string& accountName,
const std::string& containerName);
const std::string& containerName,
const Optional<std::string>& encryptionKeyFileName);
void addref() override;
void delref() override;

View File

@ -23,6 +23,7 @@
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BackupContainerLocalDirectory.h"
#include "fdbclient/JsonBuilder.h"
#include "flow/StreamCipher.h"
#include "flow/UnitTest.h"
#include <algorithm>
@ -290,13 +291,13 @@ public:
std::map<int, std::vector<int>> tagIndices; // tagId -> indices in files
for (int i = 0; i < logs.size(); i++) {
ASSERT(logs[i].tagId >= 0);
ASSERT(logs[i].tagId < logs[i].totalTags);
ASSERT_GE(logs[i].tagId, 0);
ASSERT_LT(logs[i].tagId, logs[i].totalTags);
auto& indices = tagIndices[logs[i].tagId];
// filter out if indices.back() is subset of files[i] or vice versa
if (!indices.empty()) {
if (logs[indices.back()].isSubset(logs[i])) {
ASSERT(logs[indices.back()].fileSize <= logs[i].fileSize);
ASSERT_LE(logs[indices.back()].fileSize, logs[i].fileSize);
indices.back() = i;
} else if (!logs[i].isSubset(logs[indices.back()])) {
indices.push_back(i);
@ -864,7 +865,7 @@ public:
int i = 0;
for (int j = 1; j < logs.size(); j++) {
if (logs[j].isSubset(logs[i])) {
ASSERT(logs[j].fileSize <= logs[i].fileSize);
ASSERT_LE(logs[j].fileSize, logs[i].fileSize);
continue;
}
@ -1032,10 +1033,10 @@ public:
}
static std::string versionFolderString(Version v, int smallestBucket) {
ASSERT(smallestBucket < 14);
ASSERT_LT(smallestBucket, 14);
// Get a 0-padded fixed size representation of v
std::string vFixedPrecision = format("%019lld", v);
ASSERT(vFixedPrecision.size() == 19);
ASSERT_EQ(vFixedPrecision.size(), 19);
// Truncate smallestBucket from the fixed length representation
vFixedPrecision.resize(vFixedPrecision.size() - smallestBucket);
@ -1126,6 +1127,42 @@ public:
return false;
}
ACTOR static Future<Void> createTestEncryptionKeyFile(std::string filename) {
state Reference<IAsyncFile> keyFile = wait(IAsyncFileSystem::filesystem()->open(
filename,
IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE,
0600));
StreamCipher::Key::RawKeyType testKey;
generateRandomData(testKey.data(), testKey.size());
keyFile->write(testKey.data(), testKey.size(), 0);
wait(keyFile->sync());
return Void();
}
ACTOR static Future<Void> readEncryptionKey(std::string encryptionKeyFileName) {
state Reference<IAsyncFile> keyFile;
state StreamCipher::Key::RawKeyType key;
try {
Reference<IAsyncFile> _keyFile =
wait(IAsyncFileSystem::filesystem()->open(encryptionKeyFileName, 0x0, 0400));
keyFile = _keyFile;
} catch (Error& e) {
TraceEvent(SevWarnAlways, "FailedToOpenEncryptionKeyFile")
.detail("FileName", encryptionKeyFileName)
.error(e);
throw e;
}
int bytesRead = wait(keyFile->read(key.data(), key.size(), 0));
if (bytesRead != key.size()) {
TraceEvent(SevWarnAlways, "InvalidEncryptionKeyFileSize")
.detail("ExpectedSize", key.size())
.detail("ActualSize", bytesRead);
throw invalid_encryption_key_file();
}
ASSERT_EQ(bytesRead, key.size());
StreamCipher::Key::initializeKey(std::move(key));
return Void();
}
}; // class BackupContainerFileSystemImpl
Future<Reference<IBackupFile>> BackupContainerFileSystem::writeLogFile(Version beginVersion,
@ -1432,6 +1469,20 @@ BackupContainerFileSystem::VersionProperty BackupContainerFileSystem::unreliable
BackupContainerFileSystem::VersionProperty BackupContainerFileSystem::logType() {
return { Reference<BackupContainerFileSystem>::addRef(this), "mutation_log_type" };
}
bool BackupContainerFileSystem::usesEncryption() const {
return encryptionSetupFuture.isValid();
}
Future<Void> BackupContainerFileSystem::encryptionSetupComplete() const {
return encryptionSetupFuture;
}
void BackupContainerFileSystem::setEncryptionKey(Optional<std::string> const& encryptionKeyFileName) {
if (encryptionKeyFileName.present()) {
encryptionSetupFuture = BackupContainerFileSystemImpl::readEncryptionKey(encryptionKeyFileName.get());
}
}
Future<Void> BackupContainerFileSystem::createTestEncryptionKeyFile(std::string const &filename) {
return BackupContainerFileSystemImpl::createTestEncryptionKeyFile(filename);
}
namespace backup_test {
@ -1466,12 +1517,12 @@ ACTOR Future<Void> writeAndVerifyFile(Reference<IBackupContainer> c, Reference<I
state Reference<IAsyncFile> inputFile = wait(c->readFile(f->getFileName()));
int64_t fileSize = wait(inputFile->size());
ASSERT(size == fileSize);
ASSERT_EQ(size, fileSize);
if (size > 0) {
state Standalone<VectorRef<uint8_t>> buf;
buf.resize(buf.arena(), fileSize);
int b = wait(inputFile->read(buf.begin(), buf.size(), 0));
ASSERT(b == buf.size());
ASSERT_EQ(b, buf.size());
ASSERT(buf == content);
}
return Void();
@ -1485,7 +1536,7 @@ Version nextVersion(Version v) {
// Write a snapshot file with only begin & end key
ACTOR static Future<Void> testWriteSnapshotFile(Reference<IBackupFile> file, Key begin, Key end, uint32_t blockSize) {
ASSERT(blockSize > 3 * sizeof(uint32_t) + begin.size() + end.size());
ASSERT_GT(blockSize, 3 * sizeof(uint32_t) + begin.size() + end.size());
uint32_t fileVersion = BACKUP_AGENT_SNAPSHOT_FILE_VERSION;
// write Header
@ -1506,12 +1557,16 @@ ACTOR static Future<Void> testWriteSnapshotFile(Reference<IBackupFile> file, Key
return Void();
}
ACTOR static Future<Void> testBackupContainer(std::string url) {
ACTOR Future<Void> testBackupContainer(std::string url, Optional<std::string> encryptionKeyFileName) {
state FlowLock lock(100e6);
if (encryptionKeyFileName.present()) {
wait(BackupContainerFileSystem::createTestEncryptionKeyFile(encryptionKeyFileName.get()));
}
printf("BackupContainerTest URL %s\n", url.c_str());
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url);
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url, encryptionKeyFileName);
// Make sure container doesn't exist, then create it.
try {
@ -1597,9 +1652,9 @@ ACTOR static Future<Void> testBackupContainer(std::string url) {
wait(waitForAll(writes));
state BackupFileList listing = wait(c->dumpFileList());
ASSERT(listing.ranges.size() == nRangeFiles);
ASSERT(listing.logs.size() == logs.size());
ASSERT(listing.snapshots.size() == snapshots.size());
ASSERT_EQ(listing.ranges.size(), nRangeFiles);
ASSERT_EQ(listing.logs.size(), logs.size());
ASSERT_EQ(listing.snapshots.size(), snapshots.size());
state BackupDescription desc = wait(c->describeBackup());
printf("\n%s\n", desc.toString().c_str());
@ -1629,8 +1684,8 @@ ACTOR static Future<Void> testBackupContainer(std::string url) {
// If there is an error, it must be backup_cannot_expire and we have to be on the last snapshot
if (f.isError()) {
ASSERT(f.getError().code() == error_code_backup_cannot_expire);
ASSERT(i == listing.snapshots.size() - 1);
ASSERT_EQ(f.getError().code(), error_code_backup_cannot_expire);
ASSERT_EQ(i, listing.snapshots.size() - 1);
wait(c->expireData(expireVersion, true));
}
@ -1646,31 +1701,34 @@ ACTOR static Future<Void> testBackupContainer(std::string url) {
ASSERT(d.isError() && d.getError().code() == error_code_backup_does_not_exist);
BackupFileList empty = wait(c->dumpFileList());
ASSERT(empty.ranges.size() == 0);
ASSERT(empty.logs.size() == 0);
ASSERT(empty.snapshots.size() == 0);
ASSERT_EQ(empty.ranges.size(), 0);
ASSERT_EQ(empty.logs.size(), 0);
ASSERT_EQ(empty.snapshots.size(), 0);
printf("BackupContainerTest URL=%s PASSED.\n", url.c_str());
return Void();
}
TEST_CASE("/backup/containers/localdir") {
if (g_network->isSimulated())
wait(testBackupContainer(format("file://simfdb/backups/%llx", timer_int())));
else
wait(testBackupContainer(format("file:///private/tmp/fdb_backups/%llx", timer_int())));
TEST_CASE("/backup/containers/localdir/unencrypted") {
wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {}));
return Void();
};
}
TEST_CASE("/backup/containers/localdir/encrypted") {
wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()),
format("%s/test_encryption_key", params.getDataDir().c_str())));
return Void();
}
TEST_CASE("/backup/containers/url") {
if (!g_network->isSimulated()) {
const char* url = getenv("FDB_TEST_BACKUP_URL");
ASSERT(url != nullptr);
wait(testBackupContainer(url));
wait(testBackupContainer(url, {}));
}
return Void();
};
}
TEST_CASE("/backup/containers_list") {
if (!g_network->isSimulated()) {
@ -1683,7 +1741,7 @@ TEST_CASE("/backup/containers_list") {
}
}
return Void();
};
}
TEST_CASE("/backup/time") {
// test formatTime()

View File

@ -153,6 +153,13 @@ public:
bool logsOnly,
Version beginVersion) final;
static Future<Void> createTestEncryptionKeyFile(std::string const& filename);
protected:
bool usesEncryption() const;
void setEncryptionKey(Optional<std::string> const& encryptionKeyFileName);
Future<Void> encryptionSetupComplete() const;
private:
struct VersionProperty {
VersionProperty(Reference<BackupContainerFileSystem> bc, const std::string& name)
@ -186,6 +193,8 @@ private:
Future<std::vector<RangeFile>> old_listRangeFiles(Version beginVersion, Version endVersion);
friend class BackupContainerFileSystemImpl;
Future<Void> encryptionSetupFuture;
};
#endif

View File

@ -131,7 +131,10 @@ std::string BackupContainerLocalDirectory::getURLFormat() {
return "file://</path/to/base/dir/>";
}
BackupContainerLocalDirectory::BackupContainerLocalDirectory(const std::string& url) {
BackupContainerLocalDirectory::BackupContainerLocalDirectory(const std::string& url,
const Optional<std::string>& encryptionKeyFileName) {
setEncryptionKey(encryptionKeyFileName);
std::string path;
if (url.find("file://") != 0) {
TraceEvent(SevWarn, "BackupContainerLocalDirectory")
@ -193,7 +196,10 @@ Future<std::vector<std::string>> BackupContainerLocalDirectory::listURLs(const s
}
Future<Void> BackupContainerLocalDirectory::create() {
// Nothing should be done here because create() can be called by any process working with the container URL,
if (usesEncryption()) {
return encryptionSetupComplete();
}
// No directory should be created here because create() can be called by any process working with the container URL,
// such as fdbbackup. Since "local directory" containers are by definition local to the machine they are
// accessed from, the container's creation (in this case the creation of a directory) must be ensured prior to
// every file creation, which is done in openFile(). Creating the directory here will result in unnecessary
@ -207,6 +213,9 @@ Future<bool> BackupContainerLocalDirectory::exists() {
Future<Reference<IAsyncFile>> BackupContainerLocalDirectory::readFile(const std::string& path) {
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED;
if (usesEncryption()) {
flags |= IAsyncFile::OPEN_ENCRYPTED;
}
// Simulation does not properly handle opening the same file from multiple machines using a shared filesystem,
// so create a symbolic link to make each file opening appear to be unique. This could also work in production
// but only if the source directory is writeable which shouldn't be required for a restore.
@ -258,8 +267,11 @@ Future<Reference<IAsyncFile>> BackupContainerLocalDirectory::readFile(const std:
}
Future<Reference<IBackupFile>> BackupContainerLocalDirectory::writeFile(const std::string& path) {
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE |
IAsyncFile::OPEN_READWRITE;
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_CREATE |
IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE;
if (usesEncryption()) {
flags |= IAsyncFile::OPEN_ENCRYPTED;
}
std::string fullPath = joinPath(m_path, path);
platform::createDirectory(parentDirectory(fullPath));
std::string temp = fullPath + "." + deterministicRandom()->randomUniqueID().toString() + ".temp";

View File

@ -33,7 +33,7 @@ public:
static std::string getURLFormat();
BackupContainerLocalDirectory(const std::string& url);
BackupContainerLocalDirectory(const std::string& url, Optional<std::string> const& encryptionKeyFileName);
static Future<std::vector<std::string>> listURLs(const std::string& url);

View File

@ -20,6 +20,7 @@
#include "fdbclient/AsyncFileS3BlobStore.actor.h"
#include "fdbclient/BackupContainerS3BlobStore.h"
#include "fdbrpc/AsyncFileEncrypted.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -103,6 +104,10 @@ public:
wait(bc->m_bstore->writeEntireFile(bc->m_bucket, bc->indexEntry(), ""));
}
if (bc->usesEncryption()) {
wait(bc->encryptionSetupComplete());
}
return Void();
}
@ -137,9 +142,10 @@ std::string BackupContainerS3BlobStore::indexEntry() {
BackupContainerS3BlobStore::BackupContainerS3BlobStore(Reference<S3BlobStoreEndpoint> bstore,
const std::string& name,
const S3BlobStoreEndpoint::ParametersT& params)
const S3BlobStoreEndpoint::ParametersT& params,
const Optional<std::string>& encryptionKeyFileName)
: m_bstore(bstore), m_name(name), m_bucket("FDB_BACKUPS_V2") {
setEncryptionKey(encryptionKeyFileName);
// Currently only one parameter is supported, "bucket"
for (const auto& [name, value] : params) {
if (name == "bucket") {
@ -164,12 +170,16 @@ std::string BackupContainerS3BlobStore::getURLFormat() {
}
Future<Reference<IAsyncFile>> BackupContainerS3BlobStore::readFile(const std::string& path) {
return Reference<IAsyncFile>(new AsyncFileReadAheadCache(
Reference<IAsyncFile>(new AsyncFileS3BlobStoreRead(m_bstore, m_bucket, dataPath(path))),
m_bstore->knobs.read_block_size,
m_bstore->knobs.read_ahead_blocks,
m_bstore->knobs.concurrent_reads_per_file,
m_bstore->knobs.read_cache_blocks_per_file));
Reference<IAsyncFile> f = makeReference<AsyncFileS3BlobStoreRead>(m_bstore, m_bucket, dataPath(path));
if (usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, AsyncFileEncrypted::Mode::READ_ONLY);
}
f = makeReference<AsyncFileReadAheadCache>(f,
m_bstore->knobs.read_block_size,
m_bstore->knobs.read_ahead_blocks,
m_bstore->knobs.concurrent_reads_per_file,
m_bstore->knobs.read_cache_blocks_per_file);
return f;
}
Future<std::vector<std::string>> BackupContainerS3BlobStore::listURLs(Reference<S3BlobStoreEndpoint> bstore,
@ -178,8 +188,11 @@ Future<std::vector<std::string>> BackupContainerS3BlobStore::listURLs(Reference<
}
Future<Reference<IBackupFile>> BackupContainerS3BlobStore::writeFile(const std::string& path) {
return Reference<IBackupFile>(new BackupContainerS3BlobStoreImpl::BackupFile(
path, Reference<IAsyncFile>(new AsyncFileS3BlobStoreWrite(m_bstore, m_bucket, dataPath(path)))));
Reference<IAsyncFile> f = makeReference<AsyncFileS3BlobStoreWrite>(m_bstore, m_bucket, dataPath(path));
if (usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, AsyncFileEncrypted::Mode::APPEND_ONLY);
}
return Future<Reference<IBackupFile>>(makeReference<BackupContainerS3BlobStoreImpl::BackupFile>(path, f));
}
Future<Void> BackupContainerS3BlobStore::deleteFile(const std::string& path) {

View File

@ -43,7 +43,8 @@ class BackupContainerS3BlobStore final : public BackupContainerFileSystem,
public:
BackupContainerS3BlobStore(Reference<S3BlobStoreEndpoint> bstore,
const std::string& name,
const S3BlobStoreEndpoint::ParametersT& params);
const S3BlobStoreEndpoint::ParametersT& params,
const Optional<std::string>& encryptionKeyFileName);
void addref() override;
void delref() override;

View File

@ -4506,6 +4506,7 @@ public:
}
}
// TODO: Get rid of all of these confusing boolean flags
ACTOR static Future<Void> submitBackup(FileBackupAgent* backupAgent,
Reference<ReadYourWritesTransaction> tr,
Key outContainer,
@ -4515,7 +4516,8 @@ public:
Standalone<VectorRef<KeyRangeRef>> backupRanges,
bool stopWhenDone,
bool partitionedLog,
bool incrementalBackupOnly) {
bool incrementalBackupOnly,
Optional<std::string> encryptionKeyFileName) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
@ -4553,7 +4555,7 @@ public:
backupContainer = joinPath(backupContainer, std::string("backup-") + nowStr.toString());
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupContainer);
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupContainer, encryptionKeyFileName);
try {
wait(timeoutError(bc->create(), 30));
} catch (Error& e) {
@ -5310,6 +5312,7 @@ public:
bool onlyAppyMutationLogs,
bool inconsistentSnapshotOnly,
Version beginVersion,
Optional<std::string> encryptionKeyFileName,
UID randomUid) {
// The restore command line tool won't allow ranges to be empty, but correctness workloads somehow might.
if (ranges.empty()) {
@ -5523,6 +5526,7 @@ public:
false,
false,
invalidVersion,
{},
randomUid));
return ver;
}
@ -5582,7 +5586,8 @@ Future<Version> FileBackupAgent::restore(Database cx,
bool lockDB,
bool onlyAppyMutationLogs,
bool inconsistentSnapshotOnly,
Version beginVersion) {
Version beginVersion,
Optional<std::string> const& encryptionKeyFileName) {
return FileBackupAgentImpl::restore(this,
cx,
cxOrig,
@ -5598,6 +5603,7 @@ Future<Version> FileBackupAgent::restore(Database cx,
onlyAppyMutationLogs,
inconsistentSnapshotOnly,
beginVersion,
encryptionKeyFileName,
deterministicRandom()->randomUniqueID());
}
@ -5629,11 +5635,12 @@ Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction>
Key outContainer,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
std::string const& tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
bool stopWhenDone,
bool partitionedLog,
bool incrementalBackupOnly) {
bool incrementalBackupOnly,
Optional<std::string> const& encryptionKeyFileName) {
return FileBackupAgentImpl::submitBackup(this,
tr,
outContainer,
@ -5643,7 +5650,8 @@ Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction>
backupRanges,
stopWhenDone,
partitionedLog,
incrementalBackupOnly);
incrementalBackupOnly,
encryptionKeyFileName);
}
Future<Void> FileBackupAgent::discontinueBackup(Reference<ReadYourWritesTransaction> tr, Key tagName) {
@ -5737,8 +5745,8 @@ ACTOR static Future<Void> writeKVs(Database cx, Standalone<VectorRef<KeyValueRef
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
KeyRef k1 = kvs[begin].key;
KeyRef k2 = end < kvs.size() ? kvs[end].key : normalKeys.end;
TraceEvent(SevFRTestInfo, "TransformDatabaseContentsWriteKVReadBack")

View File

@ -0,0 +1,274 @@
/*
* AsyncFileEncrypted.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/AsyncFileEncrypted.h"
#include "flow/StreamCipher.h"
#include "flow/UnitTest.h"
#include "flow/xxhash.h"
#include "flow/actorcompiler.h" // must be last include
class AsyncFileEncryptedImpl {
public:
// Determine the initialization for the first block of a file based on a hash of
// the filename.
static auto getFirstBlockIV(const std::string& filename) {
StreamCipher::IV iv;
auto salt = basename(filename);
auto pos = salt.find('.');
salt = salt.substr(0, pos);
auto hash = XXH3_128bits(salt.c_str(), salt.size());
auto high = reinterpret_cast<unsigned char*>(&hash.high64);
auto low = reinterpret_cast<unsigned char*>(&hash.low64);
std::copy(high, high + 8, &iv[0]);
std::copy(low, low + 6, &iv[8]);
iv[14] = iv[15] = 0; // last 16 bits identify block
return iv;
}
// Read a single block of size ENCRYPTION_BLOCK_SIZE bytes, and decrypt.
ACTOR static Future<Standalone<StringRef>> readBlock(AsyncFileEncrypted* self, uint16_t block) {
state Arena arena;
state unsigned char* encrypted = new (arena) unsigned char[FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE];
int bytes = wait(
self->file->read(encrypted, FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE, FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE * block));
DecryptionStreamCipher decryptor(StreamCipher::Key::getKey(), self->getIV(block));
auto decrypted = decryptor.decrypt(encrypted, bytes, arena);
return Standalone<StringRef>(decrypted, arena);
}
ACTOR static Future<int> read(AsyncFileEncrypted* self, void* data, int length, int offset) {
state const uint16_t firstBlock = offset / FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE;
state const uint16_t lastBlock = (offset + length - 1) / FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE;
state uint16_t block;
state unsigned char* output = reinterpret_cast<unsigned char*>(data);
state int bytesRead = 0;
ASSERT(self->mode == AsyncFileEncrypted::Mode::READ_ONLY);
for (block = firstBlock; block <= lastBlock; ++block) {
state StringRef plaintext;
auto cachedBlock = self->readBuffers.get(block);
if (cachedBlock.present()) {
plaintext = cachedBlock.get();
} else {
Standalone<StringRef> _plaintext = wait(readBlock(self, block));
self->readBuffers.insert(block, _plaintext);
plaintext = _plaintext;
}
auto start = (block == firstBlock) ? plaintext.begin() + (offset % FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE)
: plaintext.begin();
auto end = (block == lastBlock)
? plaintext.begin() + ((offset + length) % FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE)
: plaintext.end();
if ((offset + length) % FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE == 0) {
end = plaintext.end();
}
std::copy(start, end, output);
output += (end - start);
bytesRead += (end - start);
}
return bytesRead;
}
ACTOR static Future<Void> write(AsyncFileEncrypted* self, void const* data, int length, int64_t offset) {
ASSERT(self->mode == AsyncFileEncrypted::Mode::APPEND_ONLY);
// All writes must append to the end of the file:
ASSERT_EQ(offset, self->currentBlock * FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE + self->offsetInBlock);
state unsigned char const* input = reinterpret_cast<unsigned char const*>(data);
while (length > 0) {
const auto chunkSize = std::min(length, FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE - self->offsetInBlock);
Arena arena;
auto encrypted = self->encryptor->encrypt(input, chunkSize, arena);
std::copy(encrypted.begin(), encrypted.end(), &self->writeBuffer[self->offsetInBlock]);
offset += encrypted.size();
self->offsetInBlock += chunkSize;
length -= chunkSize;
input += chunkSize;
if (self->offsetInBlock == FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE) {
wait(self->writeLastBlockToFile());
self->offsetInBlock = 0;
ASSERT_LT(self->currentBlock, std::numeric_limits<uint16_t>::max());
++self->currentBlock;
self->encryptor = std::make_unique<EncryptionStreamCipher>(StreamCipher::Key::getKey(),
self->getIV(self->currentBlock));
}
}
return Void();
}
ACTOR static Future<Void> sync(AsyncFileEncrypted* self) {
ASSERT(self->mode == AsyncFileEncrypted::Mode::APPEND_ONLY);
wait(self->writeLastBlockToFile());
wait(self->file->sync());
return Void();
}
ACTOR static Future<Void> zeroRange(AsyncFileEncrypted* self, int64_t offset, int64_t length) {
ASSERT(self->mode == AsyncFileEncrypted::Mode::APPEND_ONLY);
// TODO: Could optimize this
Arena arena;
auto zeroes = new (arena) unsigned char[length];
memset(zeroes, 0, length);
wait(self->write(zeroes, length, offset));
return Void();
}
};
AsyncFileEncrypted::AsyncFileEncrypted(Reference<IAsyncFile> file, Mode mode)
: file(file), mode(mode), currentBlock(0), readBuffers(FLOW_KNOBS->MAX_DECRYPTED_BLOCKS) {
firstBlockIV = AsyncFileEncryptedImpl::getFirstBlockIV(file->getFilename());
if (mode == Mode::APPEND_ONLY) {
encryptor = std::make_unique<EncryptionStreamCipher>(StreamCipher::Key::getKey(), getIV(currentBlock));
writeBuffer = std::vector<unsigned char>(FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE, 0);
}
}
void AsyncFileEncrypted::addref() {
ReferenceCounted<AsyncFileEncrypted>::addref();
}
void AsyncFileEncrypted::delref() {
ReferenceCounted<AsyncFileEncrypted>::delref();
}
Future<int> AsyncFileEncrypted::read(void* data, int length, int64_t offset) {
return AsyncFileEncryptedImpl::read(this, data, length, offset);
}
Future<Void> AsyncFileEncrypted::write(void const* data, int length, int64_t offset) {
return AsyncFileEncryptedImpl::write(this, data, length, offset);
}
Future<Void> AsyncFileEncrypted::zeroRange(int64_t offset, int64_t length) {
return AsyncFileEncryptedImpl::zeroRange(this, offset, length);
}
Future<Void> AsyncFileEncrypted::truncate(int64_t size) {
ASSERT(mode == Mode::APPEND_ONLY);
return file->truncate(size);
}
Future<Void> AsyncFileEncrypted::sync() {
ASSERT(mode == Mode::APPEND_ONLY);
return AsyncFileEncryptedImpl::sync(this);
}
Future<Void> AsyncFileEncrypted::flush() {
ASSERT(mode == Mode::APPEND_ONLY);
return Void();
}
Future<int64_t> AsyncFileEncrypted::size() const {
ASSERT(mode == Mode::READ_ONLY);
return file->size();
}
std::string AsyncFileEncrypted::getFilename() const {
return file->getFilename();
}
Future<Void> AsyncFileEncrypted::readZeroCopy(void** data, int* length, int64_t offset) {
throw io_error();
return Void();
}
void AsyncFileEncrypted::releaseZeroCopy(void* data, int length, int64_t offset) {
throw io_error();
}
int64_t AsyncFileEncrypted::debugFD() const {
return file->debugFD();
}
StreamCipher::IV AsyncFileEncrypted::getIV(uint16_t block) const {
auto iv = firstBlockIV;
iv[14] = block / 256;
iv[15] = block % 256;
return iv;
}
Future<Void> AsyncFileEncrypted::writeLastBlockToFile() {
return file->write(&writeBuffer[0], offsetInBlock, currentBlock * FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE);
}
size_t AsyncFileEncrypted::RandomCache::evict() {
ASSERT_EQ(vec.size(), maxSize);
auto index = deterministicRandom()->randomInt(0, maxSize);
hashMap.erase(vec[index]);
return index;
}
AsyncFileEncrypted::RandomCache::RandomCache(size_t maxSize) : maxSize(maxSize) {
vec.reserve(maxSize);
}
void AsyncFileEncrypted::RandomCache::insert(uint16_t block, const Standalone<StringRef>& value) {
auto [_, found] = hashMap.insert({ block, value });
if (found) {
return;
} else if (vec.size() < maxSize) {
vec.push_back(block);
} else {
auto index = evict();
vec[index] = block;
}
}
Optional<Standalone<StringRef>> AsyncFileEncrypted::RandomCache::get(uint16_t block) const {
auto it = hashMap.find(block);
if (it == hashMap.end()) {
return {};
} else {
return it->second;
}
}
// This test writes random data into an encrypted file in random increments,
// then reads this data back from the file in random increments, then confirms that
// the bytes read match the bytes written.
TEST_CASE("fdbrpc/AsyncFileEncrypted") {
state const int bytes = FLOW_KNOBS->ENCRYPTION_BLOCK_SIZE * deterministicRandom()->randomInt(0, 1000);
state std::vector<unsigned char> writeBuffer(bytes, 0);
generateRandomData(&writeBuffer.front(), bytes);
state std::vector<unsigned char> readBuffer(bytes, 0);
ASSERT(g_network->isSimulated());
StreamCipher::Key::initializeRandomTestKey();
int flags = IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE |
IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_ENCRYPTED | IAsyncFile::OPEN_UNCACHED |
IAsyncFile::OPEN_NO_AIO;
state Reference<IAsyncFile> file =
wait(IAsyncFileSystem::filesystem()->open(joinPath(params.getDataDir(), "test-encrypted-file"), flags, 0600));
state int bytesWritten = 0;
while (bytesWritten < bytes) {
chunkSize = std::min(deterministicRandom()->randomInt(0, 100), bytes - bytesWritten);
wait(file->write(&writeBuffer[bytesWritten], chunkSize, bytesWritten));
bytesWritten += chunkSize;
}
wait(file->sync());
state int bytesRead = 0;
state int chunkSize;
while (bytesRead < bytes) {
chunkSize = std::min(deterministicRandom()->randomInt(0, 100), bytes - bytesRead);
int bytesReadInChunk = wait(file->read(&readBuffer[bytesRead], chunkSize, bytesRead));
ASSERT_EQ(bytesReadInChunk, chunkSize);
bytesRead += bytesReadInChunk;
}
ASSERT(writeBuffer == readBuffer);
return Void();
}

View File

@ -0,0 +1,81 @@
/*
* AsyncFileEncrypted.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbrpc/IAsyncFile.h"
#include "flow/FastRef.h"
#include "flow/flow.h"
#include "flow/IRandom.h"
#include "flow/StreamCipher.h"
#include <array>
/*
* Append-only file encrypted using AES-128-GCM.
* */
class AsyncFileEncrypted : public IAsyncFile, public ReferenceCounted<AsyncFileEncrypted> {
public:
enum class Mode { APPEND_ONLY, READ_ONLY };
private:
Reference<IAsyncFile> file;
StreamCipher::IV firstBlockIV;
StreamCipher::IV getIV(uint16_t block) const;
Mode mode;
Future<Void> writeLastBlockToFile();
friend class AsyncFileEncryptedImpl;
// Reading:
class RandomCache {
size_t maxSize;
std::vector<uint16_t> vec;
std::unordered_map<uint16_t, Standalone<StringRef>> hashMap;
size_t evict();
public:
RandomCache(size_t maxSize);
void insert(uint16_t block, const Standalone<StringRef>& value);
Optional<Standalone<StringRef>> get(uint16_t block) const;
} readBuffers;
// Writing (append only):
std::unique_ptr<EncryptionStreamCipher> encryptor;
uint16_t currentBlock{ 0 };
int offsetInBlock{ 0 };
std::vector<unsigned char> writeBuffer;
Future<Void> initialize();
public:
AsyncFileEncrypted(Reference<IAsyncFile>, Mode);
void addref() override;
void delref() override;
Future<int> read(void* data, int length, int64_t offset) override;
Future<Void> write(void const* data, int length, int64_t offset) override;
Future<Void> zeroRange(int64_t offset, int64_t length) override;
Future<Void> truncate(int64_t size) override;
Future<Void> sync() override;
Future<Void> flush() override;
Future<int64_t> size() const override;
std::string getFilename() const override;
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) override;
void releaseZeroCopy(void* data, int length, int64_t offset) override;
int64_t debugFD() const override;
};

View File

@ -33,6 +33,13 @@ set(FDBRPC_SRCS
TraceFileIO.cpp
TSSComparison.h)
if(WITH_TLS AND NOT WIN32)
set(FDBRPC_SRCS
${FDBRPC_SRCS}
AsyncFileEncrypted.h
AsyncFileEncrypted.actor.cpp)
endif()
set(COMPILE_EIO OFF)
if(NOT WIN32)

View File

@ -53,7 +53,8 @@ public:
OPEN_LARGE_PAGES = 0x100000,
OPEN_NO_AIO =
0x200000, // Don't use AsyncFileKAIO or similar implementations that rely on filesystem support for AIO
OPEN_CACHED_READ_ONLY = 0x400000 // AsyncFileCached opens files read/write even if you specify read only
OPEN_CACHED_READ_ONLY = 0x400000, // AsyncFileCached opens files read/write even if you specify read only
OPEN_ENCRYPTED = 0x800000 // File is encrypted using AES-128-GCM (must be either read-only or write-only)
};
virtual void addref() = 0;

View File

@ -32,6 +32,9 @@
#include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileEIO.actor.h"
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#include "fdbrpc/AsyncFileEncrypted.h"
#endif
#include "fdbrpc/AsyncFileWinASIO.actor.h"
#include "fdbrpc/AsyncFileKAIO.actor.h"
#include "flow/AsioReactor.h"
@ -76,6 +79,14 @@ Future<Reference<class IAsyncFile>> Net2FileSystem::open(const std::string& file
static_cast<boost::asio::io_service*>((void*)g_network->global(INetwork::enASIOService)));
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
if (flags & IAsyncFile::OPEN_ENCRYPTED)
f = map(f, [flags](Reference<IAsyncFile> r) {
auto mode = flags & IAsyncFile::OPEN_READWRITE ? AsyncFileEncrypted::Mode::APPEND_ONLY
: AsyncFileEncrypted::Mode::READ_ONLY;
return Reference<IAsyncFile>(new AsyncFileEncrypted(r, mode));
});
#endif
return f;
}

View File

@ -33,6 +33,9 @@
#include "flow/Util.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/AsyncFileCached.actor.h"
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#include "fdbrpc/AsyncFileEncrypted.h"
#endif
#include "fdbrpc/AsyncFileNonDurable.actor.h"
#include "flow/crc32c.h"
#include "fdbrpc/TraceFileIO.h"
@ -2473,6 +2476,14 @@ Future<Reference<class IAsyncFile>> Sim2FileSystem::open(const std::string& file
f = AsyncFileDetachable::open(f);
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
if (flags & IAsyncFile::OPEN_ENCRYPTED)
f = map(f, [flags](Reference<IAsyncFile> r) {
auto mode = flags & IAsyncFile::OPEN_READWRITE ? AsyncFileEncrypted::Mode::APPEND_ONLY
: AsyncFileEncrypted::Mode::READ_ONLY;
return Reference<IAsyncFile>(new AsyncFileEncrypted(r, mode));
});
#endif
return f;
} else
return AsyncFileCached::open(filename, flags, mode);

View File

@ -21,6 +21,7 @@
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -41,35 +42,39 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
bool allowPauses;
bool shareLogRange;
bool shouldSkipRestoreRanges;
Optional<std::string> encryptionKeyFileName;
BackupAndRestoreCorrectnessWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
locked = sharedRandomNumber % 2;
backupAfter = getOption(options, LiteralStringRef("backupAfter"), 10.0);
restoreAfter = getOption(options, LiteralStringRef("restoreAfter"), 35.0);
performRestore = getOption(options, LiteralStringRef("performRestore"), true);
backupTag = getOption(options, LiteralStringRef("backupTag"), BackupAgentBase::getDefaultTag());
backupRangesCount = getOption(options, LiteralStringRef("backupRangesCount"), 5);
backupRangeLengthMax = getOption(options, LiteralStringRef("backupRangeLengthMax"), 1);
backupAfter = getOption(options, "backupAfter"_sr, 10.0);
restoreAfter = getOption(options, "restoreAfter"_sr, 35.0);
performRestore = getOption(options, "performRestore"_sr, true);
backupTag = getOption(options, "backupTag"_sr, BackupAgentBase::getDefaultTag());
backupRangesCount = getOption(options, "backupRangesCount"_sr, 5);
backupRangeLengthMax = getOption(options, "backupRangeLengthMax"_sr, 1);
abortAndRestartAfter =
getOption(options,
LiteralStringRef("abortAndRestartAfter"),
"abortAndRestartAfter"_sr,
deterministicRandom()->random01() < 0.5
? deterministicRandom()->random01() * (restoreAfter - backupAfter) + backupAfter
: 0.0);
differentialBackup = getOption(
options, LiteralStringRef("differentialBackup"), deterministicRandom()->random01() < 0.5 ? true : false);
differentialBackup =
getOption(options, "differentialBackup"_sr, deterministicRandom()->random01() < 0.5 ? true : false);
stopDifferentialAfter =
getOption(options,
LiteralStringRef("stopDifferentialAfter"),
"stopDifferentialAfter"_sr,
differentialBackup ? deterministicRandom()->random01() *
(restoreAfter - std::max(abortAndRestartAfter, backupAfter)) +
std::max(abortAndRestartAfter, backupAfter)
: 0.0);
agentRequest = getOption(options, LiteralStringRef("simBackupAgents"), true);
allowPauses = getOption(options, LiteralStringRef("allowPauses"), true);
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
restorePrefixesToInclude = getOption(options, LiteralStringRef("restorePrefixesToInclude"), std::vector<std::string>());
agentRequest = getOption(options, "simBackupAgents"_sr, true);
allowPauses = getOption(options, "allowPauses"_sr, true);
shareLogRange = getOption(options, "shareLogRange"_sr, false);
restorePrefixesToInclude = getOption(options, "restorePrefixesToInclude"_sr, std::vector<std::string>());
shouldSkipRestoreRanges = deterministicRandom()->random01() < 0.3 ? true : false;
if (getOption(options, "encrypted"_sr, deterministicRandom()->random01() < 0.1)) {
encryptionKeyFileName = "simfdb/test_encryption_key_file";
}
TraceEvent("BARW_ClientId").detail("Id", wcx.clientId);
UID randomID = nondeterministicRandom()->randomUniqueID();
@ -77,11 +82,10 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
if (shareLogRange) {
bool beforePrefix = sharedRandomNumber & 1;
if (beforePrefix)
backupRanges.push_back_deep(backupRanges.arena(),
KeyRangeRef(normalKeys.begin, LiteralStringRef("\xfe\xff\xfe")));
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, "\xfe\xff\xfe"_sr));
else
backupRanges.push_back_deep(backupRanges.arena(),
KeyRangeRef(strinc(LiteralStringRef("\x00\x00\x01")), normalKeys.end));
KeyRangeRef(strinc("\x00\x00\x01"_sr), normalKeys.end));
} else if (backupRangesCount <= 0) {
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
} else {
@ -168,6 +172,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
TraceEvent(SevInfo, "BARW_Param").detail("DifferentialBackup", differentialBackup);
TraceEvent(SevInfo, "BARW_Param").detail("StopDifferentialAfter", stopDifferentialAfter);
TraceEvent(SevInfo, "BARW_Param").detail("AgentRequest", agentRequest);
TraceEvent(SevInfo, "BARW_Param").detail("Encrypted", encryptionKeyFileName.present());
return _start(cx, this);
}
@ -265,7 +270,10 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
deterministicRandom()->randomInt(0, 100),
tag.toString(),
backupRanges,
stopDifferentialDelay ? false : true));
stopDifferentialDelay ? false : true,
false,
false,
self->encryptionKeyFileName));
} catch (Error& e) {
TraceEvent("BARW_DoBackupSubmitBackupException", randomID).error(e).detail("Tag", printable(tag));
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
@ -456,6 +464,10 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
BackupAndRestoreCorrectnessWorkload::backupAgentRequests++;
}
if (self->encryptionKeyFileName.present()) {
wait(BackupContainerFileSystem::createTestEncryptionKeyFile(self->encryptionKeyFileName.get()));
}
try {
state Future<Void> startRestore = delay(self->restoreAfter);
@ -510,7 +522,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
TraceEvent("BARW_SubmitBackup2", randomID).detail("Tag", printable(self->backupTag));
try {
extraBackup = backupAgent.submitBackup(cx,
LiteralStringRef("file://simfdb/backups/"),
"file://simfdb/backups/"_sr,
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
@ -587,7 +599,11 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
range,
Key(),
Key(),
self->locked));
self->locked,
false,
false,
::invalidVersion,
self->encryptionKeyFileName));
}
} else {
multipleRangesInOneTag = true;
@ -606,7 +622,11 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
true,
Key(),
Key(),
self->locked));
self->locked,
false,
false,
::invalidVersion,
self->encryptionKeyFileName));
}
// Sometimes kill and restart the restore
@ -632,7 +652,11 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
true,
Key(),
Key(),
self->locked);
self->locked,
false,
false,
::invalidVersion,
self->encryptionKeyFileName);
}
} else {
for (restoreIndex = 0; restoreIndex < restores.size(); restoreIndex++) {
@ -657,7 +681,11 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
self->restoreRanges[restoreIndex],
Key(),
Key(),
self->locked);
self->locked,
false,
false,
::invalidVersion,
self->encryptionKeyFileName);
}
}
}
@ -721,7 +749,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
.detail("TaskCount", taskCount)
.detail("WaitCycles", waitCycles);
printf("EndingNonZeroTasks: %ld\n", (long)taskCount);
wait(TaskBucket::debugPrintRange(cx, LiteralStringRef("\xff"), StringRef()));
wait(TaskBucket::debugPrintRange(cx, normalKeys.end, StringRef()));
}
loop {
@ -820,7 +848,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
}
if (displaySystemKeys) {
wait(TaskBucket::debugPrintRange(cx, LiteralStringRef("\xff"), StringRef()));
wait(TaskBucket::debugPrintRange(cx, normalKeys.end, StringRef()));
}
TraceEvent("BARW_Complete", randomID).detail("BackupTag", printable(self->backupTag));

View File

@ -28,6 +28,9 @@ void forceLinkFlowTests();
void forceLinkVersionedMapTests();
void forceLinkMemcpyTests();
void forceLinkMemcpyPerfTests();
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
void forceLinkStreamCipherTests();
#endif
void forceLinkParallelStreamTests();
void forceLinkSimExternalConnectionTests();
void forceLinkIThreadPoolTests();
@ -37,6 +40,7 @@ struct UnitTestWorkload : TestWorkload {
std::string testPattern;
int testRunLimit;
UnitTestParameters testParams;
bool cleanupAfterTests;
PerfIntCounter testsAvailable, testsExecuted, testsFailed;
PerfDoubleCounter totalWallTime, totalSimTime;
@ -46,9 +50,14 @@ struct UnitTestWorkload : TestWorkload {
testsFailed("Test Cases Failed"), totalWallTime("Total wall clock time (s)"),
totalSimTime("Total flow time (s)") {
enabled = !clientId; // only do this on the "first" client
testPattern = getOption(options, LiteralStringRef("testsMatching"), Value()).toString();
testRunLimit = getOption(options, LiteralStringRef("maxTestCases"), -1);
testParams.setDataDir(getOption(options, LiteralStringRef("dataDir"), "simfdb/unittests/"_sr).toString());
testPattern = getOption(options, "testsMatching"_sr, Value()).toString();
testRunLimit = getOption(options, "maxTestCases"_sr, -1);
if (g_network->isSimulated()) {
testParams.setDataDir(getOption(options, "dataDir"_sr, "simfdb/unittests/"_sr).toString());
} else {
testParams.setDataDir(getOption(options, "dataDir"_sr, "/private/tmp/"_sr).toString());
}
cleanupAfterTests = getOption(options, "cleanupAfterTests"_sr, true);
// Consume all remaining options as testParams which the unit test can access
for (auto& kv : options) {
@ -63,6 +72,9 @@ struct UnitTestWorkload : TestWorkload {
forceLinkVersionedMapTests();
forceLinkMemcpyTests();
forceLinkMemcpyPerfTests();
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
forceLinkStreamCipherTests();
#endif
forceLinkParallelStreamTests();
forceLinkSimExternalConnectionTests();
forceLinkIThreadPoolTests();
@ -117,7 +129,9 @@ struct UnitTestWorkload : TestWorkload {
++self->testsFailed;
result = e;
}
platform::eraseDirectoryRecursive(self->testParams.getDataDir());
if (self->cleanupAfterTests) {
platform::eraseDirectoryRecursive(self->testParams.getDataDir());
}
++self->testsExecuted;
double wallTime = timer() - start_timer;
double simTime = now() - start_now;

View File

@ -96,6 +96,13 @@ set(FLOW_SRCS
xxhash.c
xxhash.h)
if(WITH_TLS AND NOT WIN32)
set(FLOW_SRCS
${FLOW_SRCS}
StreamCipher.cpp
StreamCipher.h)
endif()
add_library(stacktrace stacktrace.amalgamation.cpp stacktrace.h)
if (USE_ASAN)
target_compile_definitions(stacktrace PRIVATE ADDRESS_SANITIZER)
@ -133,8 +140,7 @@ target_link_libraries(flow PRIVATE ${FLOW_LIBS})
if(USE_VALGRIND)
target_link_libraries(flow PUBLIC Valgrind)
endif()
# TODO(atn34) Re-enable TLS for OPEN_FOR_IDE build once #2201 is resolved
if(NOT WITH_TLS OR OPEN_FOR_IDE)
if(NOT WITH_TLS)
target_compile_definitions(flow PUBLIC TLS_DISABLED)
else()
target_link_libraries(flow PUBLIC OpenSSL::SSL)

View File

@ -129,6 +129,10 @@ void FlowKnobs::initialize(Randomize _randomize, IsSimulated _isSimulated) {
init( EIO_MAX_PARALLELISM, 4 );
init( EIO_USE_ODIRECT, 0 );
//AsyncFileEncrypted
init( ENCRYPTION_BLOCK_SIZE, 4096 );
init( MAX_DECRYPTED_BLOCKS, 10 );
//AsyncFileKAIO
init( MAX_OUTSTANDING, 64 );
init( MIN_SUBMIT, 10 );

View File

@ -168,6 +168,10 @@ public:
int EIO_MAX_PARALLELISM;
int EIO_USE_ODIRECT;
// AsyncFileEncrypted
int ENCRYPTION_BLOCK_SIZE;
int MAX_DECRYPTED_BLOCKS;
// AsyncFileKAIO
int MAX_OUTSTANDING;
int MIN_SUBMIT;

View File

@ -30,6 +30,9 @@
#include "flow/Platform.actor.h"
#include "flow/Arena.h"
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#include "flow/StreamCipher.h"
#endif
#include "flow/Trace.h"
#include "flow/Error.h"
@ -3420,6 +3423,10 @@ void crashHandler(int sig) {
bool error = (sig != SIGUSR2);
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
StreamCipher::cleanup();
#endif
fflush(stdout);
{
TraceEvent te(error ? SevError : SevInfo, error ? "Crash" : "ProcessTerminated");

192
flow/StreamCipher.cpp Normal file
View File

@ -0,0 +1,192 @@
/*
* StreamCipher.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/StreamCipher.h"
#include "flow/UnitTest.h"
std::unordered_set<EVP_CIPHER_CTX*> StreamCipher::ctxs;
std::unique_ptr<StreamCipher::Key> StreamCipher::Key::globalKey;
StreamCipher::StreamCipher() : ctx(EVP_CIPHER_CTX_new()) {
ctxs.insert(ctx);
}
StreamCipher::~StreamCipher() {
EVP_CIPHER_CTX_free(ctx);
ctxs.erase(ctx);
}
EVP_CIPHER_CTX* StreamCipher::getCtx() {
return ctx;
}
void StreamCipher::cleanup() noexcept {
Key::cleanup();
for (auto ctx : ctxs) {
EVP_CIPHER_CTX_free(ctx);
}
}
void StreamCipher::Key::initializeKey(RawKeyType&& arr) {
if (globalKey) {
ASSERT(globalKey->arr == arr);
}
globalKey = std::make_unique<Key>(ConstructorTag{});
globalKey->arr = std::move(arr);
memset(arr.data(), 0, arr.size());
}
void StreamCipher::Key::initializeRandomTestKey() {
ASSERT(g_network->isSimulated());
if (globalKey) return;
globalKey = std::make_unique<Key>(ConstructorTag{});
generateRandomData(globalKey->arr.data(), globalKey->arr.size());
}
const StreamCipher::Key& StreamCipher::Key::getKey() {
ASSERT(globalKey);
return *globalKey;
}
StreamCipher::Key::Key(Key&& rhs) : arr(std::move(rhs.arr)) {
memset(arr.data(), 0, arr.size());
}
StreamCipher::Key& StreamCipher::Key::operator=(Key&& rhs) {
arr = std::move(rhs.arr);
memset(arr.data(), 0, arr.size());
return *this;
}
StreamCipher::Key::~Key() {
memset(arr.data(), 0, arr.size());
}
void StreamCipher::Key::cleanup() noexcept {
globalKey.reset();
}
EncryptionStreamCipher::EncryptionStreamCipher(const StreamCipher::Key& key, const StreamCipher::IV& iv) {
EVP_EncryptInit_ex(cipher.getCtx(), EVP_aes_128_gcm(), nullptr, nullptr, nullptr);
EVP_CIPHER_CTX_ctrl(cipher.getCtx(), EVP_CTRL_AEAD_SET_IVLEN, iv.size(), nullptr);
EVP_EncryptInit_ex(cipher.getCtx(), nullptr, nullptr, key.data(), iv.data());
}
StringRef EncryptionStreamCipher::encrypt(unsigned char const* plaintext, int len, Arena& arena) {
auto ciphertext = new (arena) unsigned char[len + AES_BLOCK_SIZE];
int bytes{ 0 };
EVP_EncryptUpdate(cipher.getCtx(), ciphertext, &bytes, plaintext, len);
return StringRef(ciphertext, bytes);
}
StringRef EncryptionStreamCipher::finish(Arena& arena) {
auto ciphertext = new (arena) unsigned char[AES_BLOCK_SIZE];
int bytes{ 0 };
EVP_EncryptFinal_ex(cipher.getCtx(), ciphertext, &bytes);
return StringRef(ciphertext, bytes);
}
DecryptionStreamCipher::DecryptionStreamCipher(const StreamCipher::Key& key, const StreamCipher::IV& iv) {
EVP_DecryptInit_ex(cipher.getCtx(), EVP_aes_128_gcm(), nullptr, nullptr, nullptr);
EVP_CIPHER_CTX_ctrl(cipher.getCtx(), EVP_CTRL_AEAD_SET_IVLEN, iv.size(), nullptr);
EVP_DecryptInit_ex(cipher.getCtx(), nullptr, nullptr, key.data(), iv.data());
}
StringRef DecryptionStreamCipher::decrypt(unsigned char const* ciphertext, int len, Arena& arena) {
auto plaintext = new (arena) unsigned char[len];
int bytesDecrypted{ 0 };
EVP_DecryptUpdate(cipher.getCtx(), plaintext, &bytesDecrypted, ciphertext, len);
int finalBlockBytes{ 0 };
EVP_DecryptFinal_ex(cipher.getCtx(), plaintext + bytesDecrypted, &finalBlockBytes);
return StringRef(plaintext, bytesDecrypted + finalBlockBytes);
}
StringRef DecryptionStreamCipher::finish(Arena& arena) {
auto plaintext = new (arena) unsigned char[AES_BLOCK_SIZE];
int finalBlockBytes{ 0 };
EVP_DecryptFinal_ex(cipher.getCtx(), plaintext, &finalBlockBytes);
return StringRef(plaintext, finalBlockBytes);
}
// Only used to link unit tests
void forceLinkStreamCipherTests() {}
// Tests both encryption and decryption of random data
// using the StreamCipher class
TEST_CASE("flow/StreamCipher") {
StreamCipher::Key::initializeRandomTestKey();
const auto& key = StreamCipher::Key::getKey();
StreamCipher::IV iv;
generateRandomData(iv.data(), iv.size());
Arena arena;
std::vector<unsigned char> plaintext(deterministicRandom()->randomInt(0, 10001));
generateRandomData(&plaintext.front(), plaintext.size());
std::vector<unsigned char> ciphertext(plaintext.size() + AES_BLOCK_SIZE);
std::vector<unsigned char> decryptedtext(plaintext.size() + AES_BLOCK_SIZE);
TraceEvent("StreamCipherTestStart")
.detail("PlaintextSize", plaintext.size())
.detail("AESBlockSize", AES_BLOCK_SIZE);
{
EncryptionStreamCipher encryptor(key, iv);
int index = 0;
int encryptedOffset = 0;
while (index < plaintext.size()) {
const auto chunkSize = std::min<int>(deterministicRandom()->randomInt(1, 101), plaintext.size() - index);
const auto encrypted = encryptor.encrypt(&plaintext[index], chunkSize, arena);
TraceEvent("StreamCipherTestEcryptedChunk")
.detail("EncryptedSize", encrypted.size())
.detail("EncryptedOffset", encryptedOffset)
.detail("Index", index);
std::copy(encrypted.begin(), encrypted.end(), &ciphertext[encryptedOffset]);
encryptedOffset += encrypted.size();
index += chunkSize;
}
const auto encrypted = encryptor.finish(arena);
std::copy(encrypted.begin(), encrypted.end(), &ciphertext[encryptedOffset]);
ciphertext.resize(encryptedOffset + encrypted.size());
}
{
DecryptionStreamCipher decryptor(key, iv);
int index = 0;
int decryptedOffset = 0;
while (index < plaintext.size()) {
const auto chunkSize = std::min<int>(deterministicRandom()->randomInt(1, 101), plaintext.size() - index);
const auto decrypted = decryptor.decrypt(&ciphertext[index], chunkSize, arena);
TraceEvent("StreamCipherTestDecryptedChunk")
.detail("DecryptedSize", decrypted.size())
.detail("DecryptedOffset", decryptedOffset)
.detail("Index", index);
std::copy(decrypted.begin(), decrypted.end(), &decryptedtext[decryptedOffset]);
decryptedOffset += decrypted.size();
index += chunkSize;
}
const auto decrypted = decryptor.finish(arena);
std::copy(decrypted.begin(), decrypted.end(), &decryptedtext[decryptedOffset]);
ASSERT_EQ(decryptedOffset + decrypted.size(), plaintext.size());
decryptedtext.resize(decryptedOffset + decrypted.size());
}
ASSERT(plaintext == decryptedtext);
return Void();
}

80
flow/StreamCipher.h Normal file
View File

@ -0,0 +1,80 @@
/*
* StreamCipher.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/flow.h"
#include <openssl/aes.h>
#include <openssl/evp.h>
#include <string>
#include <unordered_set>
#include <vector>
// Wrapper class for openssl implementation of AES-128-GCM
// encryption/decryption
class StreamCipher final : NonCopyable {
static std::unordered_set<EVP_CIPHER_CTX*> ctxs;
EVP_CIPHER_CTX* ctx;
public:
StreamCipher();
~StreamCipher();
EVP_CIPHER_CTX* getCtx();
class Key : NonCopyable {
std::array<unsigned char, 16> arr;
static std::unique_ptr<Key> globalKey;
struct ConstructorTag {};
public:
using RawKeyType = decltype(arr);
Key(ConstructorTag) {}
Key(Key&&);
Key& operator=(Key&&);
~Key();
unsigned char const* data() const { return arr.data(); }
static void initializeKey(RawKeyType&&);
static void initializeRandomTestKey();
static const Key& getKey();
static void cleanup() noexcept;
};
static void cleanup() noexcept;
using IV = std::array<unsigned char, 16>;
};
class EncryptionStreamCipher final : NonCopyable, public ReferenceCounted<EncryptionStreamCipher> {
StreamCipher cipher;
public:
EncryptionStreamCipher(const StreamCipher::Key& key, const StreamCipher::IV& iv);
StringRef encrypt(unsigned char const* plaintext, int len, Arena&);
StringRef finish(Arena&);
};
class DecryptionStreamCipher final : NonCopyable, public ReferenceCounted<DecryptionStreamCipher> {
StreamCipher cipher;
public:
DecryptionStreamCipher(const StreamCipher::Key& key, const StreamCipher::IV& iv);
StringRef decrypt(unsigned char const* ciphertext, int len, Arena&);
StringRef finish(Arena&);
};

View File

@ -227,6 +227,7 @@ ERROR( restore_destination_not_empty, 2370, "Attempted to restore into a non-emp
ERROR( restore_duplicate_uid, 2371, "Attempted to restore using a UID that had been used for an aborted restore")
ERROR( task_invalid_version, 2381, "Invalid task version")
ERROR( task_interrupted, 2382, "Task execution stopped due to timeout, abort, or completion by another worker")
ERROR( invalid_encryption_key_file, 2383, "The provided encryption key file has invalid contents" )
ERROR( key_not_found, 2400, "Expected key is missing")
ERROR( json_malformed, 2401, "JSON string was malformed")

View File

@ -0,0 +1,77 @@
/*
* BenchEncrypt.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "benchmark/benchmark.h"
#include "flow/StreamCipher.h"
#include "flowbench/GlobalData.h"
static StreamCipher::IV getRandomIV() {
StreamCipher::IV iv;
generateRandomData(iv.data(), iv.size());
return iv;
}
static inline Standalone<StringRef> encrypt(const StreamCipher::Key& key, const StreamCipher::IV& iv,
unsigned char const* data, size_t len) {
EncryptionStreamCipher encryptor(key, iv);
Arena arena;
auto encrypted = encryptor.encrypt(data, len, arena);
return Standalone<StringRef>(encrypted, arena);
}
static void bench_encrypt(benchmark::State& state) {
auto bytes = state.range(0);
auto chunks = state.range(1);
auto chunkSize = bytes / chunks;
StreamCipher::Key::initializeRandomTestKey();
const auto& key = StreamCipher::Key::getKey();
auto iv = getRandomIV();
auto data = getKey(bytes);
while (state.KeepRunning()) {
for (int chunk = 0; chunk < chunks; ++chunk) {
benchmark::DoNotOptimize(encrypt(key, iv, data.begin() + chunk * chunkSize, chunkSize));
}
}
state.SetBytesProcessed(bytes * static_cast<long>(state.iterations()));
}
static void bench_decrypt(benchmark::State& state) {
auto bytes = state.range(0);
auto chunks = state.range(1);
auto chunkSize = bytes / chunks;
StreamCipher::Key::initializeRandomTestKey();
const auto& key = StreamCipher::Key::getKey();
auto iv = getRandomIV();
auto data = getKey(bytes);
auto encrypted = encrypt(key, iv, data.begin(), data.size());
while (state.KeepRunning()) {
Arena arena;
DecryptionStreamCipher decryptor(key, iv);
for (int chunk = 0; chunk < chunks; ++chunk) {
benchmark::DoNotOptimize(
Standalone<StringRef>(decryptor.decrypt(encrypted.begin() + chunk * chunkSize, chunkSize, arena)));
}
}
state.SetBytesProcessed(bytes * static_cast<long>(state.iterations()));
}
BENCHMARK(bench_encrypt)->Ranges({ { 1 << 12, 1 << 20 }, { 1, 1 << 12 } });
BENCHMARK(bench_decrypt)->Ranges({ { 1 << 12, 1 << 20 }, { 1, 1 << 12 } });

View File

@ -11,6 +11,12 @@ set(FLOWBENCH_SRCS
GlobalData.h
GlobalData.cpp)
if(WITH_TLS AND NOT WIN32)
set(FLOWBENCH_SRCS
${FLOWBENCH_SRCS}
BenchEncrypt.cpp)
endif()
project (flowbench)
# include the configurations from benchmark.cmake
configure_file(benchmark.cmake googlebenchmark-download/CMakeLists.txt)