MVC2.0: Check byte sum on client lib uploads and downloads, rollback upload in case of an error
This commit is contained in:
parent
39017c3b41
commit
436ed7e497
|
@ -24,6 +24,8 @@
|
|||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/ClientKnobs.h"
|
||||
#include "fdbclient/versions.h"
|
||||
#include "fdbclient/md5/md5.h"
|
||||
#include "fdbclient/libb64/encode.h"
|
||||
#include "fdbrpc/IAsyncFile.h"
|
||||
#include "flow/Platform.h"
|
||||
|
||||
|
@ -40,6 +42,7 @@ struct ClientLibBinaryInfo {
|
|||
size_t totalBytes;
|
||||
size_t chunkCnt;
|
||||
size_t chunkSize;
|
||||
std::string sumBytes;
|
||||
ClientLibBinaryInfo() : totalBytes(0), chunkCnt(0), chunkSize(0) {}
|
||||
};
|
||||
|
||||
|
@ -213,6 +216,9 @@ ACTOR Future<Void> uploadClientLibBinary(Database db,
|
|||
state Reference<IAsyncFile> fClientLib = wait(IAsyncFileSystem::filesystem()->open(
|
||||
libFilePath.toString(), IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0));
|
||||
|
||||
state MD5_CTX sum;
|
||||
::MD5_Init(&sum);
|
||||
|
||||
loop {
|
||||
state Arena arena;
|
||||
state StringRef buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena);
|
||||
|
@ -222,6 +228,8 @@ ACTOR Future<Void> uploadClientLibBinary(Database db,
|
|||
break;
|
||||
}
|
||||
|
||||
::MD5_Update(&sum, buf.begin(), bytesRead);
|
||||
|
||||
state Transaction tr(db);
|
||||
state size_t firstChunkNo = chunkNo;
|
||||
loop {
|
||||
|
@ -250,6 +258,52 @@ ACTOR Future<Void> uploadClientLibBinary(Database db,
|
|||
binInfo->totalBytes = fileOffset;
|
||||
binInfo->chunkCnt = chunkNo;
|
||||
binInfo->chunkSize = chunkSize;
|
||||
|
||||
std::string sumBytes;
|
||||
sumBytes.resize(16);
|
||||
::MD5_Final((unsigned char*)sumBytes.data(), &sum);
|
||||
binInfo->sumBytes = base64::encoder::from_string(sumBytes);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteClientLibBinary(Database db, Key chunkKeyPrefix, size_t chunkCount) {
|
||||
state size_t chunkNo = 0;
|
||||
state size_t keysPerTransaction = CLIENT_KNOBS->MVC_CLIENTLIB_DELETE_KEYS_PER_TRANSACTION;
|
||||
loop {
|
||||
state Arena arena;
|
||||
state Transaction tr1(db);
|
||||
loop {
|
||||
try {
|
||||
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state size_t endChunk = std::min(chunkNo + keysPerTransaction, chunkCount);
|
||||
tr1.clear(KeyRangeRef(chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena),
|
||||
chunkKeyFromNo(chunkKeyPrefix, endChunk, arena)));
|
||||
wait(tr1.commit());
|
||||
chunkNo = endChunk;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr1.onError(e));
|
||||
}
|
||||
}
|
||||
if (chunkNo == chunkCount) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteClientLibMetadataEntry(Database db, Key clientLibMetaKey) {
|
||||
loop {
|
||||
state Transaction tr2(db);
|
||||
try {
|
||||
tr2.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr2.clear(clientLibMetaKey);
|
||||
wait(tr2.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr2.onError(e));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -271,7 +325,7 @@ ACTOR Future<Void> uploadClientLibrary(Database db, StringRef metadataString, St
|
|||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
|
||||
std::string clientLibId;
|
||||
state std::string clientLibId;
|
||||
getIdFromMetadataJson(metadataJson, clientLibId);
|
||||
state Key clientLibMetaKey = metadataKeyFromId(clientLibId);
|
||||
state Key clientLibBinPrefix = chunkKeyPrefixFromId(clientLibId);
|
||||
|
@ -327,6 +381,22 @@ ACTOR Future<Void> uploadClientLibrary(Database db, StringRef metadataString, St
|
|||
state ClientLibBinaryInfo binInfo;
|
||||
wait(uploadClientLibBinary(db, libFilePath, clientLibBinPrefix, &binInfo));
|
||||
|
||||
std::string checkSum = getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM);
|
||||
if (binInfo.sumBytes != checkSum) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryChecksumMismatch")
|
||||
.detail("Expected", checkSum)
|
||||
.detail("Actual", binInfo.sumBytes)
|
||||
.detail("Configuration", metadataString);
|
||||
// Rollback the upload operation
|
||||
try {
|
||||
wait(deleteClientLibBinary(db, clientLibBinPrefix, binInfo.chunkCnt));
|
||||
wait(deleteClientLibMetadataEntry(db, clientLibMetaKey));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "ClientLibraryUploadRollbackFailed").error(e);
|
||||
}
|
||||
throw client_lib_invalid_binary();
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the metadata entry, with additional information about the binary
|
||||
* and change its state from "uploading" to the given one
|
||||
|
@ -351,7 +421,6 @@ ACTOR Future<Void> uploadClientLibrary(Database db, StringRef metadataString, St
|
|||
}
|
||||
|
||||
TraceEvent("ClientLibraryUploadDone").detail("Key", clientLibMetaKey);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -395,14 +464,18 @@ ACTOR Future<Void> downloadClientLibrary(Database db, StringRef clientLibId, Str
|
|||
state Reference<IAsyncFile> fClientLib =
|
||||
wait(IAsyncFileSystem::filesystem()->open(libFilePath.toString(), flags, 0666));
|
||||
|
||||
state size_t fileOffset = 0;
|
||||
state std::string checkSum = getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM);
|
||||
state int transactionSize = getAlignedUpperBound(CLIENT_KNOBS->MVC_CLIENTLIB_TRANSACTION_SIZE, _PAGE_SIZE);
|
||||
state size_t chunkCount = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_COUNT);
|
||||
state size_t binarySize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_SIZE);
|
||||
state size_t expectedChunkSize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_SIZE);
|
||||
ASSERT(transactionSize % chunkCount == 0);
|
||||
ASSERT(transactionSize % expectedChunkSize == 0);
|
||||
state size_t fileOffset = 0;
|
||||
state size_t chunkNo = 0;
|
||||
|
||||
state MD5_CTX sum;
|
||||
::MD5_Init(&sum);
|
||||
|
||||
loop {
|
||||
state Arena arena;
|
||||
state Transaction tr1(db);
|
||||
|
@ -454,6 +527,7 @@ ACTOR Future<Void> downloadClientLibrary(Database db, StringRef clientLibId, Str
|
|||
if (bufferOffset > 0) {
|
||||
wait(fClientLib->write(buf.begin(), bufferOffset, fileOffset));
|
||||
fileOffset += bufferOffset;
|
||||
::MD5_Update(&sum, buf.begin(), bufferOffset);
|
||||
}
|
||||
|
||||
if (chunkNo == chunkCount) {
|
||||
|
@ -468,10 +542,21 @@ ACTOR Future<Void> downloadClientLibrary(Database db, StringRef clientLibId, Str
|
|||
throw client_lib_invalid_binary();
|
||||
}
|
||||
|
||||
std::string sumBytes;
|
||||
sumBytes.resize(16);
|
||||
::MD5_Final((unsigned char*)sumBytes.data(), &sum);
|
||||
std::string sumBase64 = base64::encoder::from_string(sumBytes);
|
||||
if (sumBase64 != checkSum) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryChecksumMismatch")
|
||||
.detail("Expected", checkSum)
|
||||
.detail("Actual", sumBase64)
|
||||
.detail("Key", clientLibMetaKey);
|
||||
throw client_lib_invalid_binary();
|
||||
}
|
||||
|
||||
wait(fClientLib->sync());
|
||||
|
||||
TraceEvent("ClientLibraryDownloadDone").detail("Key", clientLibMetaKey);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -508,41 +593,9 @@ ACTOR Future<Void> deleteClientLibrary(Database db, StringRef clientLibId) {
|
|||
}
|
||||
}
|
||||
|
||||
state size_t chunkNo = 0;
|
||||
state size_t chunkCount = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_COUNT);
|
||||
state size_t keysPerTransaction = CLIENT_KNOBS->MVC_CLIENTLIB_DELETE_KEYS_PER_TRANSACTION;
|
||||
loop {
|
||||
state Arena arena;
|
||||
state Transaction tr1(db);
|
||||
loop {
|
||||
try {
|
||||
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state size_t endChunk = std::min(chunkNo + keysPerTransaction, chunkCount);
|
||||
tr1.clear(KeyRangeRef(chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena),
|
||||
chunkKeyFromNo(chunkKeyPrefix, endChunk, arena)));
|
||||
wait(tr1.commit());
|
||||
chunkNo = endChunk;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr1.onError(e));
|
||||
}
|
||||
}
|
||||
if (chunkNo == chunkCount) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
state Transaction tr2(db);
|
||||
try {
|
||||
tr2.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr2.clear(clientLibMetaKey);
|
||||
wait(tr2.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr2.onError(e));
|
||||
}
|
||||
}
|
||||
wait(deleteClientLibBinary(db, chunkKeyPrefix, chunkCount));
|
||||
wait(deleteClientLibMetadataEntry(db, clientLibMetaKey));
|
||||
|
||||
TraceEvent("ClientLibraryDeleteDone").detail("Key", clientLibMetaKey);
|
||||
return Void();
|
||||
|
|
|
@ -21,7 +21,8 @@
|
|||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbclient/MultiVersionClientControl.actor.h"
|
||||
#include "fdbserver/workloads/AsyncFile.actor.h"
|
||||
|
||||
#include "fdbclient/md5/md5.h"
|
||||
#include "fdbclient/libb64/encode.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -34,13 +35,14 @@ using namespace ClientLibUtils;
|
|||
*/
|
||||
struct ClientLibManagementWorkload : public TestWorkload {
|
||||
|
||||
static constexpr const char TEST_FILE_NAME[] = "dummyclientlib";
|
||||
static constexpr size_t FILE_CHUNK_SIZE = 128 * 1024;
|
||||
static constexpr size_t TEST_FILE_SIZE = FILE_CHUNK_SIZE * 8; // 1MB
|
||||
|
||||
RandomByteGenerator rbg;
|
||||
std::string uploadedClientLibId;
|
||||
json_spirit::mObject uploadedMetadataJson;
|
||||
std::string generatedChecksum;
|
||||
std::string generatedFileName;
|
||||
bool success;
|
||||
|
||||
/*----------------------------------------------------------------
|
||||
|
@ -51,12 +53,7 @@ struct ClientLibManagementWorkload : public TestWorkload {
|
|||
|
||||
std::string description() const override { return "ClientLibManagement"; }
|
||||
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
if (clientId == 0)
|
||||
return _setup(this);
|
||||
|
||||
return Void();
|
||||
}
|
||||
Future<Void> setup(Database const& cx) override { return _setup(this); }
|
||||
|
||||
Future<Void> start(Database const& cx) override { return _start(this, cx); }
|
||||
|
||||
|
@ -69,18 +66,32 @@ struct ClientLibManagementWorkload : public TestWorkload {
|
|||
*/
|
||||
|
||||
ACTOR Future<Void> _setup(ClientLibManagementWorkload* self) {
|
||||
self->generatedFileName = format("clientLibUpload%d", self->clientId);
|
||||
int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE;
|
||||
state Reference<IAsyncFile> file = wait(IAsyncFileSystem::filesystem()->open(TEST_FILE_NAME, flags, 0666));
|
||||
state Reference<IAsyncFile> file =
|
||||
wait(IAsyncFileSystem::filesystem()->open(self->generatedFileName, flags, 0666));
|
||||
ASSERT(file.isValid());
|
||||
|
||||
state Reference<AsyncFileBuffer> data = self->allocateBuffer(FILE_CHUNK_SIZE);
|
||||
state int64_t i;
|
||||
state Future<Void> lastWrite = Void();
|
||||
|
||||
state MD5_CTX sum;
|
||||
::MD5_Init(&sum);
|
||||
|
||||
for (i = 0; i < TEST_FILE_SIZE; i += FILE_CHUNK_SIZE) {
|
||||
self->rbg.writeRandomBytesToBuffer(data->buffer, FILE_CHUNK_SIZE);
|
||||
wait(file->write(data->buffer, FILE_CHUNK_SIZE, i));
|
||||
|
||||
::MD5_Update(&sum, data->buffer, FILE_CHUNK_SIZE);
|
||||
}
|
||||
wait(file->sync());
|
||||
|
||||
std::string sumBytes;
|
||||
sumBytes.resize(16);
|
||||
::MD5_Final((unsigned char*)sumBytes.data(), &sum);
|
||||
self->generatedChecksum = base64::encoder::from_string(sumBytes);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -125,7 +136,7 @@ struct ClientLibManagementWorkload : public TestWorkload {
|
|||
state StringRef metadataStr = StringRef(testMetadataStr);
|
||||
try {
|
||||
// Try to pass some invalid metadata input
|
||||
wait(uploadClientLibrary(cx, metadataStr, LiteralStringRef(TEST_FILE_NAME)));
|
||||
wait(uploadClientLibrary(cx, metadataStr, self->generatedFileName));
|
||||
self->unexpectedSuccess(
|
||||
"InvalidMetadata", error_code_client_lib_invalid_metadata, metadataStr.toString().c_str());
|
||||
} catch (Error& e) {
|
||||
|
@ -153,8 +164,25 @@ struct ClientLibManagementWorkload : public TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testUploadClientLibWrongChecksum(ClientLibManagementWorkload* self, Database cx) {
|
||||
validClientLibMetadataSample(self->uploadedMetadataJson);
|
||||
state Standalone<StringRef> metadataStr =
|
||||
StringRef(json_spirit::write_string(json_spirit::mValue(self->uploadedMetadataJson)));
|
||||
getClientLibIdFromMetadataJson(metadataStr, self->uploadedClientLibId);
|
||||
try {
|
||||
wait(uploadClientLibrary(cx, metadataStr, self->generatedFileName));
|
||||
self->unexpectedSuccess("ChecksumMismatch", error_code_client_lib_invalid_binary);
|
||||
} catch (Error& e) {
|
||||
self->testErrorCode("ChecksumMismatch", error_code_client_lib_invalid_binary, e.code());
|
||||
}
|
||||
|
||||
wait(testUploadedClientLibInList(self, cx, ClientLibFilter(), false, "After upload with wrong checksum"));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testUploadClientLib(ClientLibManagementWorkload* self, Database cx) {
|
||||
validClientLibMetadataSample(self->uploadedMetadataJson);
|
||||
self->uploadedMetadataJson[CLIENTLIB_ATTR_CHECKSUM] = self->generatedChecksum;
|
||||
state Standalone<StringRef> metadataStr =
|
||||
StringRef(json_spirit::write_string(json_spirit::mValue(self->uploadedMetadataJson)));
|
||||
getClientLibIdFromMetadataJson(metadataStr, self->uploadedClientLibId);
|
||||
|
@ -162,7 +190,7 @@ struct ClientLibManagementWorkload : public TestWorkload {
|
|||
// Test two concurrent uploads of the same library, one of the must fail and another succeed
|
||||
state std::vector<Future<ErrorOr<Void>>> concurrentUploads;
|
||||
for (int i1 = 0; i1 < 2; i1++) {
|
||||
Future<Void> uploadActor = uploadClientLibrary(cx, metadataStr, LiteralStringRef(TEST_FILE_NAME));
|
||||
Future<Void> uploadActor = uploadClientLibrary(cx, metadataStr, self->generatedFileName);
|
||||
concurrentUploads.push_back(errorOr(uploadActor));
|
||||
}
|
||||
|
||||
|
@ -322,7 +350,7 @@ struct ClientLibManagementWorkload : public TestWorkload {
|
|||
metadataJson[CLIENTLIB_ATTR_VERSION] = "7.1.0";
|
||||
metadataJson[CLIENTLIB_ATTR_GIT_HASH] = randomHexadecimalStr(40);
|
||||
metadataJson[CLIENTLIB_ATTR_TYPE] = "debug";
|
||||
metadataJson[CLIENTLIB_ATTR_CHECKSUM] = randomHexadecimalStr(32);
|
||||
metadataJson[CLIENTLIB_ATTR_CHECKSUM] = randomHexadecimalStr(22);
|
||||
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(CLIENTLIB_AVAILABLE);
|
||||
metadataJson[CLIENTLIB_ATTR_API_VERSION] = 710;
|
||||
metadataJson[CLIENTLIB_ATTR_PROTOCOL] = "fdb00b07001001";
|
||||
|
|
Loading…
Reference in New Issue