Merge pull request #5694 from sfc-gh-vgasiunas/multi-version-client-2

Operations to upload and manage client binaries in the system keyspace
This commit is contained in:
A.J. Beamon 2021-10-27 14:28:10 -07:00 committed by GitHub
commit 6174229a1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1378 additions and 4 deletions

View File

@ -78,6 +78,8 @@ set(FDBCLIENT_SRCS
MonitorLeader.actor.cpp
MonitorLeader.h
MultiVersionAssignmentVars.h
ClientLibManagement.actor.cpp
ClientLibManagement.actor.h
MultiVersionTransaction.actor.cpp
MultiVersionTransaction.h
MutationList.h

View File

@ -231,8 +231,8 @@ void ClientKnobs::initialize(Randomize randomize) {
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec
init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days
//fdbcli
//fdbcli
init( CLI_CONNECT_PARALLELISM, 400 );
init( CLI_CONNECT_TIMEOUT, 10.0 );
@ -254,7 +254,11 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BUSYNESS_SPIKE_START_THRESHOLD, 0.100 );
init( BUSYNESS_SPIKE_SATURATED_THRESHOLD, 0.500 );
// blob granules
// multi-version client control
init( MVC_CLIENTLIB_CHUNK_SIZE, 8*1024 );
init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 );
// blob granules
init( ENABLE_BLOB_GRANULES, false );
// clang-format on

View File

@ -245,6 +245,10 @@ public:
double BUSYNESS_SPIKE_START_THRESHOLD;
double BUSYNESS_SPIKE_SATURATED_THRESHOLD;
// multi-version client control
int MVC_CLIENTLIB_CHUNK_SIZE;
int MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION;
// blob granules
bool ENABLE_BLOB_GRANULES;

View File

@ -0,0 +1,710 @@
/*
* ClientLibManagement.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/ClientLibManagement.actor.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/ClientKnobs.h"
#include "fdbclient/versions.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/Platform.h"
#include <algorithm>
#include <string>
#include <stdio.h>
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace ClientLibManagement {
struct ClientLibBinaryInfo {
size_t totalBytes = 0;
size_t chunkCnt = 0;
size_t chunkSize = 0;
Standalone<StringRef> sumBytes;
};
#define ASSERT_INDEX_IN_RANGE(idx, arr) ASSERT(idx >= 0 && idx < sizeof(arr) / sizeof(arr[0]))
const std::string& getStatusName(ClientLibStatus status) {
static const std::string statusNames[] = { "disabled", "available", "uploading" };
int idx = static_cast<int>(status);
ASSERT_INDEX_IN_RANGE(idx, statusNames);
return statusNames[idx];
}
ClientLibStatus getStatusByName(std::string_view statusName) {
static std::map<std::string_view, ClientLibStatus> statusByName;
// initialize the map on demand
if (statusByName.empty()) {
for (int i = 0; i < static_cast<int>(ClientLibStatus::COUNT); i++) {
ClientLibStatus status = static_cast<ClientLibStatus>(i);
statusByName[getStatusName(status)] = status;
}
}
auto statusIter = statusByName.find(statusName);
if (statusIter == statusByName.cend()) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Error", format("Unknown status value %s", std::string(statusName).c_str()));
throw client_lib_invalid_metadata();
}
return statusIter->second;
}
const std::string& getPlatformName(ClientLibPlatform platform) {
static const std::string platformNames[] = { "unknown", "x84_64-linux", "x86_64-windows", "x86_64-macos" };
int idx = static_cast<int>(platform);
ASSERT_INDEX_IN_RANGE(idx, platformNames);
return platformNames[idx];
}
ClientLibPlatform getPlatformByName(std::string_view platformName) {
static std::map<std::string_view, ClientLibPlatform> platformByName;
// initialize the map on demand
if (platformByName.empty()) {
for (int i = 0; i < static_cast<int>(ClientLibPlatform::COUNT); i++) {
ClientLibPlatform platform = static_cast<ClientLibPlatform>(i);
platformByName[getPlatformName(platform)] = platform;
}
}
auto platfIter = platformByName.find(platformName);
if (platfIter == platformByName.cend()) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Error", format("Unknown platform value %s", std::string(platformName).c_str()));
throw client_lib_invalid_metadata();
}
return platfIter->second;
}
const std::string& getChecksumAlgName(ClientLibChecksumAlg checksumAlg) {
static const std::string checksumAlgNames[] = { "md5" };
int idx = static_cast<int>(checksumAlg);
ASSERT_INDEX_IN_RANGE(idx, checksumAlgNames);
return checksumAlgNames[idx];
}
ClientLibChecksumAlg getChecksumAlgByName(std::string_view checksumAlgName) {
static std::map<std::string_view, ClientLibChecksumAlg> checksumAlgByName;
// initialize the map on demand
if (checksumAlgByName.empty()) {
for (int i = 0; i < (int)ClientLibChecksumAlg::COUNT; i++) {
ClientLibChecksumAlg checksumAlg = static_cast<ClientLibChecksumAlg>(i);
checksumAlgByName[getChecksumAlgName(checksumAlg)] = checksumAlg;
}
}
auto iter = checksumAlgByName.find(checksumAlgName);
if (iter == checksumAlgByName.cend()) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Error", format("Unknown checksum algorithm %s", std::string(checksumAlgName).c_str()));
throw client_lib_invalid_metadata();
}
return iter->second;
}
namespace {
bool isValidTargetStatus(ClientLibStatus status) {
return status == ClientLibStatus::AVAILABLE || status == ClientLibStatus::DISABLED;
}
json_spirit::mObject parseMetadataJson(StringRef metadataString) {
json_spirit::mValue parsedMetadata;
if (!json_spirit::read_string(metadataString.toString(), parsedMetadata) ||
parsedMetadata.type() != json_spirit::obj_type) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Reason", "InvalidJSON")
.detail("Configuration", metadataString);
throw client_lib_invalid_metadata();
}
return parsedMetadata.get_obj();
}
const std::string& getMetadataStrAttr(const json_spirit::mObject& metadataJson, const std::string& attrName) {
auto attrIter = metadataJson.find(attrName);
if (attrIter == metadataJson.cend() || attrIter->second.type() != json_spirit::str_type) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Error", format("Missing attribute %s", attrName.c_str()));
throw client_lib_invalid_metadata();
}
return attrIter->second.get_str();
}
int getMetadataIntAttr(const json_spirit::mObject& metadataJson, const std::string& attrName) {
auto attrIter = metadataJson.find(attrName);
if (attrIter == metadataJson.cend() || attrIter->second.type() != json_spirit::int_type) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Error", format("Missing attribute %s", attrName.c_str()));
throw client_lib_invalid_metadata();
}
return attrIter->second.get_int();
}
bool validVersionPartNum(int num) {
return (num >= 0 && num < 1000);
}
int getNumericVersionEncoding(const std::string& versionStr) {
int major, minor, patch;
int charsScanned;
int numScanned = sscanf(versionStr.c_str(), "%d.%d.%d%n", &major, &minor, &patch, &charsScanned);
if (numScanned != 3 || !validVersionPartNum(major) || !validVersionPartNum(minor) || !validVersionPartNum(patch) ||
charsScanned != versionStr.size()) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Error", format("Invalid version string %s", versionStr.c_str()));
throw client_lib_invalid_metadata();
}
return ((major * 1000) + minor) * 1000 + patch;
}
Standalone<StringRef> getIdFromMetadataJson(const json_spirit::mObject& metadataJson) {
std::ostringstream libIdBuilder;
libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PLATFORM) << "/";
libIdBuilder << format("%09d", getNumericVersionEncoding(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_VERSION)))
<< "/";
libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_TYPE) << "/";
libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM);
return Standalone<StringRef>(libIdBuilder.str());
}
Key metadataKeyFromId(StringRef clientLibId) {
return clientLibId.withPrefix(clientLibMetadataPrefix);
}
Key chunkKeyPrefixFromId(StringRef clientLibId) {
return clientLibId.withPrefix(clientLibBinaryPrefix).withSuffix(LiteralStringRef("/"));
}
KeyRef chunkKeyFromNo(StringRef clientLibBinPrefix, size_t chunkNo, Arena& arena) {
return clientLibBinPrefix.withSuffix(format("%06zu", chunkNo), arena);
}
ClientLibPlatform getCurrentClientPlatform() {
#ifdef __x86_64__
#if defined(_WIN32)
return ClientLibPlatform::X86_64_WINDOWS;
#elif defined(__linux__)
return ClientLibPlatform::X86_64_LINUX;
#elif defined(__FreeBSD__) || defined(__APPLE__)
return ClientLibPlatform::X86_64_MACOS;
#else
return ClientLibPlatform::UNKNOWN;
#endif
#else // not __x86_64__
return ClientLibPlatform::UNKNOWN;
#endif
}
Standalone<StringRef> byteArrayToHexString(StringRef input) {
static const char* digits = "0123456789abcdef";
Standalone<StringRef> output = makeString(input.size() * 2);
char* pout = reinterpret_cast<char*>(mutateString(output));
for (const uint8_t* pin = input.begin(); pin != input.end(); ++pin) {
*pout++ = digits[(*pin >> 4) & 0xF];
*pout++ = digits[(*pin) & 0xF];
}
return output;
}
} // namespace
Standalone<StringRef> md5SumToHexString(MD5_CTX& sum) {
Standalone<StringRef> sumBytes = makeString(16);
::MD5_Final(mutateString(sumBytes), &sum);
return byteArrayToHexString(sumBytes);
}
ClientLibFilter& ClientLibFilter::filterNewerPackageVersion(const std::string& versionStr) {
matchNewerPackageVersion = true;
this->numericPkgVersion = getNumericVersionEncoding(versionStr);
return *this;
}
Standalone<StringRef> getClientLibIdFromMetadataJson(StringRef metadataString) {
json_spirit::mObject parsedMetadata = parseMetadataJson(metadataString);
return getIdFromMetadataJson(parsedMetadata);
}
namespace {
ACTOR Future<Void> uploadClientLibBinary(Database db,
StringRef libFilePath,
KeyRef chunkKeyPrefix,
ClientLibBinaryInfo* binInfo) {
state int chunkSize = getAlignedUpperBound(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNK_SIZE, 1024);
state int transactionSize = std::max(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 1) * chunkSize;
state size_t fileOffset = 0;
state size_t chunkNo = 0;
state MD5_CTX sum;
state Arena arena;
state StringRef buf;
state Transaction tr;
state size_t firstChunkNo;
// Disabling AIO, because it currently supports only page-aligned writes, but the size of a client library
// is not necessariliy page-aligned, need to investigate if it is a limitation of AIO or just the way
// we are wrapping it
state Reference<IAsyncFile> fClientLib = wait(IAsyncFileSystem::filesystem()->open(
libFilePath.toString(), IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO, 0));
::MD5_Init(&sum);
loop {
arena = Arena();
// Use page-aligned buffers for enabling possible future use with AIO
buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena);
state int bytesRead = wait(fClientLib->read(mutateString(buf), transactionSize, fileOffset));
fileOffset += bytesRead;
if (bytesRead <= 0) {
break;
}
::MD5_Update(&sum, buf.begin(), bytesRead);
tr = Transaction(db);
firstChunkNo = chunkNo;
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
int bufferOffset = 0;
chunkNo = firstChunkNo;
while (bufferOffset < bytesRead) {
size_t chunkLen = std::min(chunkSize, bytesRead - bufferOffset);
KeyRef chunkKey = chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena);
chunkNo++;
tr.set(chunkKey, ValueRef(mutateString(buf) + bufferOffset, chunkLen));
bufferOffset += chunkLen;
}
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
if (bytesRead < transactionSize) {
break;
}
}
binInfo->totalBytes = fileOffset;
binInfo->chunkCnt = chunkNo;
binInfo->chunkSize = chunkSize;
binInfo->sumBytes = md5SumToHexString(sum);
return Void();
}
} // namespace
ACTOR Future<Void> uploadClientLibrary(Database db,
Standalone<StringRef> metadataString,
Standalone<StringRef> libFilePath) {
state json_spirit::mObject metadataJson;
state Standalone<StringRef> clientLibId;
state Key clientLibMetaKey;
state Key clientLibBinPrefix;
state std::string jsStr;
state Transaction tr;
state ClientLibBinaryInfo binInfo;
state ClientLibStatus targetStatus;
metadataJson = parseMetadataJson(metadataString);
json_spirit::mValue schema;
if (!json_spirit::read_string(JSONSchemas::clientLibMetadataSchema.toString(), schema)) {
ASSERT(false);
}
std::string errorStr;
if (!schemaMatch(schema.get_obj(), metadataJson, errorStr, SevWarnAlways)) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Reason", "SchemaMismatch")
.detail("Configuration", metadataString)
.detail("Error", errorStr);
throw client_lib_invalid_metadata();
}
clientLibId = getIdFromMetadataJson(metadataJson);
clientLibMetaKey = metadataKeyFromId(clientLibId);
clientLibBinPrefix = chunkKeyPrefixFromId(clientLibId);
targetStatus = getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS));
if (!isValidTargetStatus(targetStatus)) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
.detail("Reason", "InvalidTargetStatus")
.detail("Configuration", metadataString);
throw client_lib_invalid_metadata();
}
// check if checksumalg and platform attributes have valid values
getChecksumAlgByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM_ALG));
getPlatformByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PLATFORM));
// Check if further mandatory attributes are set
getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_GIT_HASH);
getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PROTOCOL);
getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_API_VERSION);
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(ClientLibStatus::UPLOADING);
jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson));
/*
* Check if the client library with the same identifier already exists.
* If not, write its metadata with "uploading" state to prevent concurrent uploads
*/
tr = Transaction(db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> existingMeta = wait(tr.get(clientLibMetaKey));
if (existingMeta.present()) {
TraceEvent(SevWarnAlways, "ClientLibraryAlreadyExists")
.detail("Key", clientLibMetaKey)
.detail("ExistingMetadata", existingMeta.get().toString());
throw client_lib_already_exists();
}
TraceEvent("ClientLibraryBeginUpload").detail("Key", clientLibMetaKey);
tr.set(clientLibMetaKey, ValueRef(jsStr));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
/*
* Upload the binary of the client library in chunks
*/
wait(uploadClientLibBinary(db, libFilePath, clientLibBinPrefix, &binInfo));
std::string checkSum = getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM);
if (binInfo.sumBytes != StringRef(checkSum)) {
TraceEvent(SevWarnAlways, "ClientLibraryChecksumMismatch")
.detail("Expected", checkSum)
.detail("Actual", binInfo.sumBytes)
.detail("Configuration", metadataString);
// Rollback the upload operation
try {
wait(deleteClientLibrary(db, clientLibId));
} catch (Error& e) {
TraceEvent(SevError, "ClientLibraryUploadRollbackFailed").error(e);
}
throw client_lib_invalid_binary();
}
/*
* Update the metadata entry, with additional information about the binary
* and change its state from "uploading" to the given one
*/
metadataJson[CLIENTLIB_ATTR_SIZE] = static_cast<int64_t>(binInfo.totalBytes);
metadataJson[CLIENTLIB_ATTR_CHUNK_COUNT] = static_cast<int64_t>(binInfo.chunkCnt);
metadataJson[CLIENTLIB_ATTR_CHUNK_SIZE] = static_cast<int64_t>(binInfo.chunkSize);
metadataJson[CLIENTLIB_ATTR_FILENAME] = basename(libFilePath.toString());
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(targetStatus);
jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson));
tr.reset();
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.set(clientLibMetaKey, ValueRef(jsStr));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
TraceEvent("ClientLibraryUploadDone").detail("Key", clientLibMetaKey);
return Void();
}
ACTOR Future<Void> downloadClientLibrary(Database db,
Standalone<StringRef> clientLibId,
Standalone<StringRef> libFilePath) {
state Key clientLibMetaKey = metadataKeyFromId(clientLibId);
state Key chunkKeyPrefix = chunkKeyPrefixFromId(clientLibId);
state int chunksPerTransaction = std::max(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 1);
state int transactionSize;
state json_spirit::mObject metadataJson;
state std::string checkSum;
state size_t chunkCount;
state size_t binarySize;
state size_t expectedChunkSize;
state Transaction tr;
state size_t fileOffset;
state MD5_CTX sum;
state Arena arena;
state StringRef buf;
state size_t bufferOffset;
state size_t fromChunkNo;
state size_t toChunkNo;
state std::vector<Future<Optional<Value>>> chunkFutures;
TraceEvent("ClientLibraryBeginDownload").detail("Key", clientLibMetaKey);
/*
* First read the metadata to get information about the status and
* the chunk count of the client library
*/
loop {
tr = Transaction(db);
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
Optional<Value> metadataOpt = wait(tr.get(clientLibMetaKey));
if (!metadataOpt.present()) {
TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey);
throw client_lib_not_found();
}
metadataJson = parseMetadataJson(metadataOpt.get());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
// Allow downloading only libraries in the available state
if (getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)) != ClientLibStatus::AVAILABLE) {
throw client_lib_not_available();
}
// Disabling AIO, because it currently supports only page-aligned writes, but the size of a client library
// is not necessariliy page-aligned, need to investigate if it is a limitation of AIO or just the way
// we are wrapping it
int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE |
IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO;
state Reference<IAsyncFile> fClientLib =
wait(IAsyncFileSystem::filesystem()->open(libFilePath.toString(), flags, 0666));
checkSum = getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM);
chunkCount = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_COUNT);
binarySize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_SIZE);
expectedChunkSize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_SIZE);
transactionSize = chunksPerTransaction * expectedChunkSize;
fileOffset = 0;
fromChunkNo = 0;
::MD5_Init(&sum);
arena = Arena();
// Use page-aligned buffers for enabling possible future use with AIO
buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena);
loop {
if (fromChunkNo == chunkCount) {
break;
}
tr = Transaction(db);
toChunkNo = std::min(chunkCount, fromChunkNo + chunksPerTransaction);
// read a batch of file chunks concurrently
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
chunkFutures.clear();
for (size_t chunkNo = fromChunkNo; chunkNo < toChunkNo; chunkNo++) {
KeyRef chunkKey = chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena);
chunkFutures.push_back(tr.get(chunkKey));
}
wait(waitForAll(chunkFutures));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
// check the read chunks and copy them to a buffer
bufferOffset = 0;
size_t chunkNo = fromChunkNo;
for (auto chunkOptFuture : chunkFutures) {
if (!chunkOptFuture.get().present()) {
TraceEvent(SevWarnAlways, "ClientLibraryChunkNotFound")
.detail("Key", chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena));
throw client_lib_invalid_binary();
}
StringRef chunkVal = chunkOptFuture.get().get();
// All chunks exept for the last one must be of the expected size to guarantee
// alignment when writing to file
if ((chunkNo != (chunkCount - 1) && chunkVal.size() != expectedChunkSize) ||
chunkVal.size() > expectedChunkSize) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidChunkSize")
.detail("Key", chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena))
.detail("MaxSize", expectedChunkSize)
.detail("ActualSize", chunkVal.size());
throw client_lib_invalid_binary();
}
memcpy(mutateString(buf) + bufferOffset, chunkVal.begin(), chunkVal.size());
bufferOffset += chunkVal.size();
chunkNo++;
}
// write the chunks to the file, update checksum
if (bufferOffset > 0) {
wait(fClientLib->write(buf.begin(), bufferOffset, fileOffset));
fileOffset += bufferOffset;
::MD5_Update(&sum, buf.begin(), bufferOffset);
}
// move to the next batch
fromChunkNo = toChunkNo;
}
// check if the downloaded file size is as expected
if (fileOffset != binarySize) {
TraceEvent(SevWarnAlways, "ClientLibraryInvalidSize")
.detail("ExpectedSize", binarySize)
.detail("ActualSize", fileOffset);
throw client_lib_invalid_binary();
}
// check if the checksum of downloaded file is as expected
Standalone<StringRef> sumBytesStr = md5SumToHexString(sum);
if (sumBytesStr != StringRef(checkSum)) {
TraceEvent(SevWarnAlways, "ClientLibraryChecksumMismatch")
.detail("Expected", checkSum)
.detail("Actual", sumBytesStr)
.detail("Key", clientLibMetaKey);
throw client_lib_invalid_binary();
}
wait(fClientLib->sync());
TraceEvent("ClientLibraryDownloadDone").detail("Key", clientLibMetaKey);
return Void();
}
ACTOR Future<Void> deleteClientLibrary(Database db, Standalone<StringRef> clientLibId) {
state Key clientLibMetaKey = metadataKeyFromId(clientLibId.toString());
state Key chunkKeyPrefix = chunkKeyPrefixFromId(clientLibId.toString());
TraceEvent("ClientLibraryBeginDelete").detail("Key", clientLibMetaKey);
loop {
state Transaction tr(db);
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> metadataOpt = wait(tr.get(clientLibMetaKey));
if (!metadataOpt.present()) {
TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey);
throw client_lib_not_found();
}
tr.clear(prefixRange(chunkKeyPrefix));
tr.clear(clientLibMetaKey);
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
TraceEvent("ClientLibraryDeleteDone").detail("Key", clientLibMetaKey);
return Void();
}
namespace {
void applyClientLibFilter(const ClientLibFilter& filter,
const RangeResultRef& scanResults,
Standalone<VectorRef<StringRef>>& filteredResults) {
for (const auto& [k, v] : scanResults) {
try {
json_spirit::mObject metadataJson = parseMetadataJson(v);
if (filter.matchAvailableOnly && getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)) !=
ClientLibStatus::AVAILABLE) {
continue;
}
if (filter.matchCompatibleAPI &&
getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_API_VERSION) < filter.apiVersion) {
continue;
}
if (filter.matchNewerPackageVersion && !filter.matchPlatform &&
getNumericVersionEncoding(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_VERSION)) <=
filter.numericPkgVersion) {
continue;
}
filteredResults.push_back_deep(filteredResults.arena(), v);
} catch (Error& e) {
// Entries with invalid metadata on the cluster
// Can happen only if the official management interface is bypassed
ASSERT(e.code() == error_code_client_lib_invalid_metadata);
TraceEvent(SevError, "ClientLibraryIgnoringInvalidMetadata").detail("Metadata", v);
}
}
}
} // namespace
ACTOR Future<Standalone<VectorRef<StringRef>>> listClientLibraries(Database db, ClientLibFilter filter) {
state Standalone<VectorRef<StringRef>> result;
state Transaction tr(db);
state PromiseStream<Standalone<RangeResultRef>> scanResults;
state Key fromKey;
state Key toKey;
state KeyRangeRef scanRange;
state Future<Void> stream;
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
if (filter.matchPlatform) {
Key prefixWithPlatform =
clientLibMetadataPrefix.withSuffix(std::string(getPlatformName(filter.platformVal)));
fromKey = prefixWithPlatform.withSuffix(LiteralStringRef("/"));
if (filter.matchNewerPackageVersion) {
fromKey = fromKey.withSuffix(format("%09d", filter.numericPkgVersion + 1));
}
toKey = prefixWithPlatform.withSuffix(LiteralStringRef("0"));
scanRange = KeyRangeRef(fromKey, toKey);
} else {
scanRange = clientLibMetadataKeys;
}
scanResults = PromiseStream<Standalone<RangeResultRef>>();
stream = tr.getRangeStream(scanResults, scanRange, GetRangeLimits());
loop {
Standalone<RangeResultRef> scanResultRange = waitNext(scanResults.getFuture());
applyClientLibFilter(filter, scanResultRange, result);
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
break;
}
wait(tr.onError(e));
}
}
return result;
}
} // namespace ClientLibManagement

View File

@ -0,0 +1,139 @@
/*
* ClientLibManagement.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_G_H)
#define FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_G_H
#include "fdbclient/ClientLibManagement.actor.g.h"
#elif !defined(FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_H)
#define FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_H
#include <string>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/md5/md5.h"
#include "flow/actorcompiler.h" // has to be last include
namespace ClientLibManagement {
enum class ClientLibStatus {
DISABLED = 0,
AVAILABLE, // 1
UPLOADING, // 2
COUNT // must be the last one
};
enum class ClientLibPlatform {
UNKNOWN = 0,
X86_64_LINUX,
X86_64_WINDOWS,
X86_64_MACOS,
COUNT // must be the last one
};
// Currently we support only one,
// but we may want to change it in the future
enum class ClientLibChecksumAlg {
MD5 = 0,
COUNT // must be the last one
};
inline const std::string CLIENTLIB_ATTR_PLATFORM{ "platform" };
inline const std::string CLIENTLIB_ATTR_STATUS{ "status" };
inline const std::string CLIENTLIB_ATTR_CHECKSUM{ "checksum" };
inline const std::string CLIENTLIB_ATTR_VERSION{ "version" };
inline const std::string CLIENTLIB_ATTR_TYPE{ "type" };
inline const std::string CLIENTLIB_ATTR_API_VERSION{ "apiversion" };
inline const std::string CLIENTLIB_ATTR_PROTOCOL{ "protocol" };
inline const std::string CLIENTLIB_ATTR_GIT_HASH{ "githash" };
inline const std::string CLIENTLIB_ATTR_FILENAME{ "filename" };
inline const std::string CLIENTLIB_ATTR_SIZE{ "size" };
inline const std::string CLIENTLIB_ATTR_CHUNK_COUNT{ "chunkcount" };
inline const std::string CLIENTLIB_ATTR_CHUNK_SIZE{ "chunksize" };
inline const std::string CLIENTLIB_ATTR_CHECKSUM_ALG{ "checksumalg" };
struct ClientLibFilter {
bool matchAvailableOnly = false;
bool matchPlatform = false;
bool matchCompatibleAPI = false;
bool matchNewerPackageVersion = false;
ClientLibPlatform platformVal = ClientLibPlatform::UNKNOWN;
int apiVersion = 0;
int numericPkgVersion = 0;
ClientLibFilter& filterAvailable() {
matchAvailableOnly = true;
return *this;
}
ClientLibFilter& filterPlatform(ClientLibPlatform platformVal) {
matchPlatform = true;
this->platformVal = platformVal;
return *this;
}
ClientLibFilter& filterCompatibleAPI(int apiVersion) {
matchCompatibleAPI = true;
this->apiVersion = apiVersion;
return *this;
}
// expects a version string like "6.3.10"
ClientLibFilter& filterNewerPackageVersion(const std::string& versionStr);
};
const std::string& getStatusName(ClientLibStatus status);
ClientLibStatus getStatusByName(std::string_view statusName);
const std::string& getPlatformName(ClientLibPlatform platform);
ClientLibPlatform getPlatformByName(std::string_view platformName);
const std::string& getChecksumAlgName(ClientLibChecksumAlg checksumAlg);
ClientLibChecksumAlg getChecksumAlgByName(std::string_view checksumAlgName);
// encodes MD5 result to a hexadecimal string to be provided in the checksum attribute
Standalone<StringRef> md5SumToHexString(MD5_CTX& sum);
// Upload a client library binary from a file and associated metadata JSON
// to the system keyspace of the database
ACTOR Future<Void> uploadClientLibrary(Database db,
Standalone<StringRef> metadataString,
Standalone<StringRef> libFilePath);
// Determine clientLibId from the relevant attributes of the metadata JSON
Standalone<StringRef> getClientLibIdFromMetadataJson(StringRef metadataString);
// Download a client library binary from the system keyspace of the database
// and save it at the given file path
ACTOR Future<Void> downloadClientLibrary(Database db,
Standalone<StringRef> clientLibId,
Standalone<StringRef> libFilePath);
// Delete the client library binary from to the system keyspace of the database
ACTOR Future<Void> deleteClientLibrary(Database db, Standalone<StringRef> clientLibId);
// List client libraries available on the cluster, with the specified filter
// Returns metadata JSON of each library
ACTOR Future<Standalone<VectorRef<StringRef>>> listClientLibraries(Database db, ClientLibFilter filter);
} // namespace ClientLibManagement
#include "flow/unactorcompiler.h"
#endif

View File

@ -1040,3 +1040,19 @@ const KeyRef JSONSchemas::managementApiErrorSchema = LiteralStringRef(R"""(
"message": "The reason of the error"
}
)""");
const KeyRef JSONSchemas::clientLibMetadataSchema = LiteralStringRef(R"""(
{
"platform": "x86_64-linux",
"version": "7.1.0",
"githash": "e28fef6264d05ab0c9488238022d1ee885a30bea",
"type": "debug",
"checksum": "fcef53fb4ae86d2c4fff4dc17c7e5d08",
"checksumalg": "md5",
"apiversion": 710,
"protocol": "fdb00b07001001",
"filename": "libfdb_c.7.1.0.so",
"size" : 19467552,
"chunkcount" : 2377,
"status": "available"
})""");

View File

@ -35,6 +35,7 @@ struct JSONSchemas {
static const KeyRef storageHealthSchema;
static const KeyRef aggregateHealthSchema;
static const KeyRef managementApiErrorSchema;
static const KeyRef clientLibMetadataSchema;
};
#endif /* FDBCLIENT_SCHEMAS_H */

View File

@ -1025,6 +1025,14 @@ std::pair<Key, Version> decodeHealthyZoneValue(ValueRef const& value) {
return std::make_pair(zoneId, version);
}
const KeyRangeRef clientLibMetadataKeys(LiteralStringRef("\xff\x02/clientlib/meta/"),
LiteralStringRef("\xff\x02/clientlib/meta0"));
const KeyRef clientLibMetadataPrefix = clientLibMetadataKeys.begin;
const KeyRangeRef clientLibBinaryKeys(LiteralStringRef("\xff\x02/clientlib/bin/"),
LiteralStringRef("\xff\x02/clientlib/bin0"));
const KeyRef clientLibBinaryPrefix = clientLibBinaryKeys.begin;
const KeyRangeRef testOnlyTxnStateStorePrefixRange(LiteralStringRef("\xff/TESTONLYtxnStateStore/"),
LiteralStringRef("\xff/TESTONLYtxnStateStore0"));

View File

@ -480,6 +480,14 @@ extern const KeyRef rebalanceDDIgnoreKey;
const Value healthyZoneValue(StringRef const& zoneId, Version version);
std::pair<Key, Version> decodeHealthyZoneValue(ValueRef const&);
// Key ranges reserved for storing client library binaries and respective
// json documents with the metadata describing the libaries
extern const KeyRangeRef clientLibMetadataKeys;
extern const KeyRef clientLibMetadataPrefix;
extern const KeyRangeRef clientLibBinaryKeys;
extern const KeyRef clientLibBinaryPrefix;
// All mutations done to this range are blindly copied into txnStateStore.
// Used to create artifically large txnStateStore instances in testing.
extern const KeyRangeRef testOnlyTxnStateStorePrefixRange;

View File

@ -164,6 +164,7 @@ set(FDBSERVER_SRCS
workloads/BulkSetup.actor.h
workloads/Cache.actor.cpp
workloads/ChangeConfig.actor.cpp
workloads/ClientLibManagementWorkload.actor.cpp
workloads/ClientTransactionProfileCorrectness.actor.cpp
workloads/TriggerRecovery.actor.cpp
workloads/SuspendProcesses.actor.cpp

View File

@ -1703,3 +1703,70 @@ ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
}
}
}
namespace {
ACTOR Future<Void> testExpectedErrorImpl(Future<Void> test,
const char* testDescr,
Optional<Error> expectedError,
Optional<bool*> successFlag,
std::map<std::string, std::string> details,
Optional<Error> throwOnError,
UID id) {
state Error actualError;
try {
wait(test);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
actualError = e;
// The test failed as expected
if (!expectedError.present() || actualError.code() == expectedError.get().code()) {
return Void();
}
}
// The test has failed
if (successFlag.present()) {
*(successFlag.get()) = false;
}
TraceEvent evt(SevError, "TestErrorFailed", id);
evt.detail("TestDescription", testDescr);
if (expectedError.present()) {
evt.detail("ExpectedError", expectedError.get().name());
evt.detail("ExpectedErrorCode", expectedError.get().code());
}
if (actualError.isValid()) {
evt.detail("ActualError", actualError.name());
evt.detail("ActualErrorCode", actualError.code());
} else {
evt.detail("Reason", "Unexpected success");
}
// Make sure that no duplicate details were provided
ASSERT(details.count("TestDescription") == 0);
ASSERT(details.count("ExpectedError") == 0);
ASSERT(details.count("ExpectedErrorCode") == 0);
ASSERT(details.count("ActualError") == 0);
ASSERT(details.count("ActualErrorCode") == 0);
ASSERT(details.count("Reason") == 0);
for (auto& p : details) {
evt.detail(p.first.c_str(), p.second);
}
if (throwOnError.present()) {
throw throwOnError.get();
}
return Void();
}
} // namespace
Future<Void> testExpectedError(Future<Void> test,
const char* testDescr,
Optional<Error> expectedError,
Optional<bool*> successFlag,
std::map<std::string, std::string> details,
Optional<Error> throwOnError,
UID id) {
return testExpectedErrorImpl(test, testDescr, expectedError, successFlag, details, throwOnError, id);
}

View File

@ -0,0 +1,374 @@
/*
* ClientLibManagementWorkload.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/IAsyncFile.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbclient/ClientLibManagement.actor.h"
#include "fdbserver/workloads/AsyncFile.actor.h"
#include "fdbclient/md5/md5.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace ClientLibManagement;
/**
* Workload for testing ClientLib management operations, declared in
* MultiVersionClientControl.actor.h
*/
struct ClientLibManagementWorkload : public TestWorkload {
static constexpr size_t FILE_CHUNK_SIZE = 128 * 1024; // Used for test setup only
size_t testFileSize = 0;
RandomByteGenerator rbg;
Standalone<StringRef> uploadedClientLibId;
json_spirit::mObject uploadedMetadataJson;
Standalone<StringRef> generatedChecksum;
std::string generatedFileName;
bool success = true;
/*----------------------------------------------------------------
* Interface
*/
ClientLibManagementWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
int minTestFileSize = getOption(options, LiteralStringRef("minTestFileSize"), 0);
int maxTestFileSize = getOption(options, LiteralStringRef("maxTestFileSize"), 1024 * 1024);
testFileSize = deterministicRandom()->randomInt(minTestFileSize, maxTestFileSize + 1);
}
std::string description() const override { return "ClientLibManagement"; }
Future<Void> setup(Database const& cx) override { return _setup(this); }
Future<Void> start(Database const& cx) override { return _start(this, cx); }
Future<bool> check(Database const& cx) override { return success; }
void getMetrics(std::vector<PerfMetric>& m) override {}
/*----------------------------------------------------------------
* Setup
*/
ACTOR Future<Void> _setup(ClientLibManagementWorkload* self) {
state Reference<AsyncFileBuffer> data = self->allocateBuffer(FILE_CHUNK_SIZE);
state size_t fileOffset;
state MD5_CTX sum;
state size_t bytesToWrite;
self->generatedFileName = format("clientLibUpload%d", self->clientId);
int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE |
IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO;
state Reference<IAsyncFile> file =
wait(IAsyncFileSystem::filesystem()->open(self->generatedFileName, flags, 0666));
::MD5_Init(&sum);
for (fileOffset = 0; fileOffset < self->testFileSize; fileOffset += FILE_CHUNK_SIZE) {
self->rbg.writeRandomBytesToBuffer(data->buffer, FILE_CHUNK_SIZE);
bytesToWrite = std::min(FILE_CHUNK_SIZE, self->testFileSize - fileOffset);
wait(file->write(data->buffer, bytesToWrite, fileOffset));
::MD5_Update(&sum, data->buffer, bytesToWrite);
}
wait(file->sync());
self->generatedChecksum = md5SumToHexString(sum);
return Void();
}
/*----------------------------------------------------------------
* Tests
*/
ACTOR static Future<Void> _start(ClientLibManagementWorkload* self, Database cx) {
wait(testUploadClientLibInvalidInput(self, cx));
wait(testClientLibUploadFileDoesNotExist(self, cx));
wait(testUploadClientLib(self, cx));
wait(testClientLibListAfterUpload(self, cx));
wait(testDownloadClientLib(self, cx));
wait(testClientLibDownloadNotExisting(self, cx));
wait(testDeleteClientLib(self, cx));
wait(testUploadedClientLibInList(self, cx, ClientLibFilter(), false, "No filter, after delete"));
return Void();
}
ACTOR static Future<Void> testUploadClientLibInvalidInput(ClientLibManagementWorkload* self, Database cx) {
state std::vector<std::string> invalidMetadataStrs = {
"{foo", // invalid json
"[]", // json array
};
state StringRef metadataStr;
// add garbage attribute
json_spirit::mObject metadataJson;
validClientLibMetadataSample(metadataJson);
metadataJson["unknownattr"] = "someval";
invalidMetadataStrs.push_back(json_spirit::write_string(json_spirit::mValue(metadataJson)));
const std::string mandatoryAttrs[] = { CLIENTLIB_ATTR_PLATFORM, CLIENTLIB_ATTR_VERSION,
CLIENTLIB_ATTR_CHECKSUM, CLIENTLIB_ATTR_TYPE,
CLIENTLIB_ATTR_GIT_HASH, CLIENTLIB_ATTR_PROTOCOL,
CLIENTLIB_ATTR_API_VERSION, CLIENTLIB_ATTR_CHECKSUM_ALG };
for (const std::string& attr : mandatoryAttrs) {
validClientLibMetadataSample(metadataJson);
metadataJson.erase(attr);
invalidMetadataStrs.push_back(json_spirit::write_string(json_spirit::mValue(metadataJson)));
}
for (auto& testMetadataStr : invalidMetadataStrs) {
metadataStr = StringRef(testMetadataStr);
wait(testExpectedError(uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName)),
"uploadClientLibrary with invalid metadata",
client_lib_invalid_metadata(),
&self->success,
{ { "Metadata", metadataStr.toString().c_str() } }));
}
return Void();
}
ACTOR static Future<Void> testClientLibUploadFileDoesNotExist(ClientLibManagementWorkload* self, Database cx) {
state Standalone<StringRef> metadataStr;
json_spirit::mObject metadataJson;
validClientLibMetadataSample(metadataJson);
metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(metadataJson)));
wait(testExpectedError(uploadClientLibrary(cx, metadataStr, "some_not_existing_file_name"_sr),
"uploadClientLibrary with a not existing file",
file_not_found(),
&self->success));
return Void();
}
ACTOR static Future<Void> testUploadClientLibWrongChecksum(ClientLibManagementWorkload* self, Database cx) {
state Standalone<StringRef> metadataStr;
validClientLibMetadataSample(self->uploadedMetadataJson);
metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(self->uploadedMetadataJson)));
self->uploadedClientLibId = getClientLibIdFromMetadataJson(metadataStr);
wait(testExpectedError(uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName)),
"uploadClientLibrary wrong checksum",
client_lib_invalid_binary(),
&self->success));
wait(testUploadedClientLibInList(self, cx, ClientLibFilter(), false, "After upload with wrong checksum"));
return Void();
}
ACTOR static Future<Void> testUploadClientLib(ClientLibManagementWorkload* self, Database cx) {
state Standalone<StringRef> metadataStr;
state std::vector<Future<ErrorOr<Void>>> concurrentUploads;
validClientLibMetadataSample(self->uploadedMetadataJson);
self->uploadedMetadataJson[CLIENTLIB_ATTR_CHECKSUM] = self->generatedChecksum.toString();
// avoid clientLibId clashes, when multiple clients try to upload the same file
self->uploadedMetadataJson[CLIENTLIB_ATTR_TYPE] = format("devbuild%d", self->clientId);
metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(self->uploadedMetadataJson)));
self->uploadedClientLibId = getClientLibIdFromMetadataJson(metadataStr);
// Test two concurrent uploads of the same library, one of the must fail and another succeed
for (int i1 = 0; i1 < 2; i1++) {
Future<Void> uploadActor = uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName));
concurrentUploads.push_back(errorOr(uploadActor));
}
wait(waitForAll(concurrentUploads));
int successCnt = 0;
for (auto uploadRes : concurrentUploads) {
if (uploadRes.get().isError()) {
self->testErrorCode(
"concurrent client lib upload", client_lib_already_exists(), uploadRes.get().getError());
} else {
successCnt++;
}
}
if (successCnt == 0) {
TraceEvent(SevError, "ClientLibUploadFailed").log();
self->success = false;
throw operation_failed();
} else if (successCnt > 1) {
TraceEvent(SevError, "ClientLibConflictingUpload").log();
self->success = false;
}
return Void();
}
ACTOR static Future<Void> testClientLibDownloadNotExisting(ClientLibManagementWorkload* self, Database cx) {
// Generate a random valid clientLibId
state Standalone<StringRef> clientLibId;
state std::string destFileName;
json_spirit::mObject metadataJson;
validClientLibMetadataSample(metadataJson);
Standalone<StringRef> metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(metadataJson)));
clientLibId = getClientLibIdFromMetadataJson(metadataStr);
destFileName = format("clientLibDownload%d", self->clientId);
wait(testExpectedError(downloadClientLibrary(cx, StringRef(clientLibId), StringRef(destFileName)),
"download not existing client library",
client_lib_not_found(),
&self->success));
return Void();
}
ACTOR static Future<Void> testDownloadClientLib(ClientLibManagementWorkload* self, Database cx) {
state std::string destFileName = format("clientLibDownload%d", self->clientId);
wait(downloadClientLibrary(cx, self->uploadedClientLibId, StringRef(destFileName)));
FILE* f = fopen(destFileName.c_str(), "r");
if (f == nullptr) {
TraceEvent(SevError, "ClientLibDownloadFileDoesNotExist").detail("FileName", destFileName);
self->success = false;
} else {
fseek(f, 0L, SEEK_END);
size_t fileSize = ftell(f);
if (fileSize != self->testFileSize) {
TraceEvent(SevError, "ClientLibDownloadFileSizeMismatch")
.detail("ExpectedSize", self->testFileSize)
.detail("ActualSize", fileSize);
self->success = false;
}
fclose(f);
}
return Void();
}
ACTOR static Future<Void> testDeleteClientLib(ClientLibManagementWorkload* self, Database cx) {
wait(deleteClientLibrary(cx, self->uploadedClientLibId));
return Void();
}
ACTOR static Future<Void> testClientLibListAfterUpload(ClientLibManagementWorkload* self, Database cx) {
state int uploadedApiVersion = self->uploadedMetadataJson[CLIENTLIB_ATTR_API_VERSION].get_int();
state ClientLibPlatform uploadedPlatform =
getPlatformByName(self->uploadedMetadataJson[CLIENTLIB_ATTR_PLATFORM].get_str());
state std::string uploadedVersion = self->uploadedMetadataJson[CLIENTLIB_ATTR_VERSION].get_str();
state ClientLibFilter filter;
filter = ClientLibFilter();
wait(testUploadedClientLibInList(self, cx, filter, true, "No filter"));
filter = ClientLibFilter().filterAvailable();
wait(testUploadedClientLibInList(self, cx, filter, true, "Filter available"));
filter = ClientLibFilter().filterAvailable().filterCompatibleAPI(uploadedApiVersion);
wait(testUploadedClientLibInList(self, cx, filter, true, "Filter available, the same API"));
filter = ClientLibFilter().filterAvailable().filterCompatibleAPI(uploadedApiVersion + 1);
wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, newer API"));
filter = ClientLibFilter().filterCompatibleAPI(uploadedApiVersion).filterPlatform(uploadedPlatform);
wait(testUploadedClientLibInList(self, cx, filter, true, "Filter the same API, the same platform"));
ASSERT(uploadedPlatform != ClientLibPlatform::X86_64_WINDOWS);
filter = ClientLibFilter().filterAvailable().filterPlatform(ClientLibPlatform::X86_64_WINDOWS);
wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, different platform"));
filter = ClientLibFilter().filterAvailable().filterNewerPackageVersion(uploadedVersion);
wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, the same version"));
filter =
ClientLibFilter().filterAvailable().filterNewerPackageVersion("1.15.10").filterPlatform(uploadedPlatform);
wait(testUploadedClientLibInList(
self, cx, filter, true, "Filter available, an older version, the same platform"));
filter = ClientLibFilter()
.filterAvailable()
.filterNewerPackageVersion(uploadedVersion)
.filterPlatform(uploadedPlatform);
wait(testUploadedClientLibInList(
self, cx, filter, false, "Filter available, the same version, the same platform"));
filter = ClientLibFilter().filterNewerPackageVersion("100.1.1");
wait(testUploadedClientLibInList(self, cx, filter, false, "Filter a newer version"));
filter = ClientLibFilter().filterNewerPackageVersion("1.15.10");
wait(testUploadedClientLibInList(self, cx, filter, true, "Filter an older version"));
return Void();
}
ACTOR static Future<Void> testUploadedClientLibInList(ClientLibManagementWorkload* self,
Database cx,
ClientLibFilter filter,
bool expectInList,
const char* testDescr) {
Standalone<VectorRef<StringRef>> allLibs = wait(listClientLibraries(cx, filter));
bool found = false;
for (StringRef metadataJson : allLibs) {
Standalone<StringRef> clientLibId;
clientLibId = getClientLibIdFromMetadataJson(metadataJson);
if (clientLibId == self->uploadedClientLibId) {
found = true;
}
}
if (found != expectInList) {
TraceEvent(SevError, "ClientLibInListTestFailed")
.detail("Test", testDescr)
.detail("ClientLibId", self->uploadedClientLibId)
.detail("Expected", expectInList)
.detail("Actual", found);
self->success = false;
}
return Void();
}
/* ----------------------------------------------------------------
* Utility methods
*/
Reference<AsyncFileBuffer> allocateBuffer(size_t size) { return makeReference<AsyncFileBuffer>(size, false); }
static std::string randomHexadecimalStr(int length) {
std::string s;
s.reserve(length);
for (int i = 0; i < length; i++) {
uint32_t hexDigit = static_cast<char>(deterministicRandom()->randomUInt32() % 16);
char ch = (hexDigit >= 10 ? hexDigit - 10 + 'a' : hexDigit + '0');
s += ch;
}
return s;
}
static void validClientLibMetadataSample(json_spirit::mObject& metadataJson) {
metadataJson.clear();
metadataJson[CLIENTLIB_ATTR_PLATFORM] = getPlatformName(ClientLibPlatform::X86_64_LINUX);
metadataJson[CLIENTLIB_ATTR_VERSION] = "7.1.0";
metadataJson[CLIENTLIB_ATTR_GIT_HASH] = randomHexadecimalStr(40);
metadataJson[CLIENTLIB_ATTR_TYPE] = "debug";
metadataJson[CLIENTLIB_ATTR_CHECKSUM] = randomHexadecimalStr(32);
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(ClientLibStatus::AVAILABLE);
metadataJson[CLIENTLIB_ATTR_API_VERSION] = 710;
metadataJson[CLIENTLIB_ATTR_PROTOCOL] = "fdb00b07001001";
metadataJson[CLIENTLIB_ATTR_CHECKSUM_ALG] = "md5";
}
void testErrorCode(const char* testDescr,
Error expectedError,
Error actualError,
std::map<std::string, std::string> details = {},
UID id = UID()) {
ASSERT(expectedError.isValid());
ASSERT(actualError.isValid());
if (expectedError.code() != actualError.code()) {
TraceEvent evt(SevError, "TestErrorCodeFailed", id);
evt.detail("TestDescription", testDescr);
evt.detail("ExpectedError", expectedError.code());
evt.error(actualError);
for (auto& p : details) {
evt.detail(p.first.c_str(), p.second);
}
success = false;
}
}
};
WorkloadFactory<ClientLibManagementWorkload> ClientLibOperationsWorkloadFactory("ClientLibManagement");

View File

@ -230,6 +230,24 @@ Future<Void> quietDatabase(Database const& cx,
int64_t maxDataDistributionQueueSize = 0,
int64_t maxPoppedVersionLag = 30e6);
/**
* A utility function for testing error situations. It succeeds if the given test
* throws an error. If expectedError is provided, it additionally checks if the
* error code is as expected.
*
* In case of a failure, logs an corresponding error in the trace with the given
* description and details, sets the given success flag to false (optional)
* and throws the given exception (optional). Note that in case of a successful
* test execution, the success flag is kept unchanged.
*/
Future<Void> testExpectedError(Future<Void> test,
const char* testDescr,
Optional<Error> expectedError = Optional<Error>(),
Optional<bool*> successFlag = Optional<bool*>(),
std::map<std::string, std::string> details = {},
Optional<Error> throwOnError = Optional<Error>(),
UID id = UID());
#include "flow/unactorcompiler.h"
#endif

View File

@ -687,6 +687,10 @@ inline StringRef operator"" _sr(const char* str, size_t size) {
return StringRef(reinterpret_cast<const uint8_t*>(str), size);
}
inline static uintptr_t getAlignedUpperBound(uintptr_t value, uintptr_t alignment) {
return ((value + alignment - 1) / alignment) * alignment;
}
// makeString is used to allocate a Standalone<StringRef> of a known length for later
// mutation (via mutateString). If you need to append to a string of unknown length,
// consider factoring StringBuffer from DiskQueue.actor.cpp.
@ -700,7 +704,7 @@ inline static Standalone<StringRef> makeString(int length) {
inline static Standalone<StringRef> makeAlignedString(int alignment, int length) {
Standalone<StringRef> returnString;
uint8_t* outData = new (returnString.arena()) uint8_t[alignment + length];
outData = (uint8_t*)((((uintptr_t)outData + (alignment - 1)) / alignment) * alignment);
outData = (uint8_t*)getAlignedUpperBound((uintptr_t)outData, alignment);
((StringRef&)returnString) = StringRef(outData, length);
return returnString;
}
@ -710,6 +714,12 @@ inline static StringRef makeString(int length, Arena& arena) {
return StringRef(outData, length);
}
inline static StringRef makeAlignedString(int alignment, int length, Arena& arena) {
uint8_t* outData = new (arena) uint8_t[alignment + length];
outData = (uint8_t*)getAlignedUpperBound((uintptr_t)outData, alignment);
return StringRef(outData, length);
}
// mutateString() simply casts away const and returns a pointer that can be used to mutate the
// contents of the given StringRef (it will also accept Standalone<StringRef>). Obviously this
// is only legitimate if you know where the StringRef's memory came from and that it is not shared!

View File

@ -178,6 +178,11 @@ ERROR( special_keys_write_disabled, 2114, "Special Key space is not allowed to w
ERROR( special_keys_no_write_module_found, 2115, "Special key space key or keyrange in set or clear does not intersect a module" )
ERROR( special_keys_cross_module_clear, 2116, "Special key space clear crosses modules" )
ERROR( special_keys_api_failure, 2117, "Api call through special keys failed. For more information, call get on special key 0xff0xff/error_message to get a json string of the error message." )
ERROR( client_lib_invalid_metadata, 2118, "Invalid client library metadata." )
ERROR( client_lib_already_exists, 2119, "Client library with same identifier already exists on the cluster." )
ERROR( client_lib_not_found, 2120, "Client library for the given identifier not found." )
ERROR( client_lib_not_available, 2121, "Client library exists, but is not available for download." )
ERROR( client_lib_invalid_binary, 2122, "Invalid client library binary." )
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )

View File

@ -0,0 +1,7 @@
[[test]]
testTitle = 'ClientLibManagement'
[[test.workload]]
testName = 'ClientLibManagement'
minTestFileSize = 0
maxTestFileSize = 1048576