Merge remote-tracking branch 'origin/main' into dd-refactor
This commit is contained in:
commit
32bbabefd7
|
@ -86,8 +86,6 @@ set(FDBCLIENT_SRCS
|
|||
MonitorLeader.actor.cpp
|
||||
MonitorLeader.h
|
||||
MultiVersionAssignmentVars.h
|
||||
ClientLibManagement.actor.cpp
|
||||
ClientLibManagement.actor.h
|
||||
MultiVersionTransaction.actor.cpp
|
||||
MultiVersionTransaction.h
|
||||
MutationList.h
|
||||
|
|
|
@ -263,10 +263,6 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( BUSYNESS_SPIKE_START_THRESHOLD, 0.100 );
|
||||
init( BUSYNESS_SPIKE_SATURATED_THRESHOLD, 0.500 );
|
||||
|
||||
// multi-version client control
|
||||
init( MVC_CLIENTLIB_CHUNK_SIZE, 8*1024 );
|
||||
init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 );
|
||||
|
||||
// blob granules
|
||||
init( ENABLE_BLOB_GRANULES, false );
|
||||
|
||||
|
|
|
@ -254,10 +254,6 @@ public:
|
|||
double BUSYNESS_SPIKE_START_THRESHOLD;
|
||||
double BUSYNESS_SPIKE_SATURATED_THRESHOLD;
|
||||
|
||||
// multi-version client control
|
||||
int MVC_CLIENTLIB_CHUNK_SIZE;
|
||||
int MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION;
|
||||
|
||||
// blob granules
|
||||
bool ENABLE_BLOB_GRANULES;
|
||||
|
||||
|
|
|
@ -1,801 +0,0 @@
|
|||
/*
|
||||
* ClientLibManagement.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ClientLibManagement.actor.h"
|
||||
#include "fdbclient/Schemas.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/ClientKnobs.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/versions.h"
|
||||
#include "fdbrpc/IAsyncFile.h"
|
||||
#include "flow/Platform.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <stdio.h>
|
||||
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace ClientLibManagement {
|
||||
|
||||
struct ClientLibBinaryInfo {
|
||||
size_t totalBytes = 0;
|
||||
size_t chunkCnt = 0;
|
||||
size_t chunkSize = 0;
|
||||
Standalone<StringRef> sumBytes;
|
||||
};
|
||||
|
||||
#define ASSERT_INDEX_IN_RANGE(idx, arr) ASSERT(idx >= 0 && idx < sizeof(arr) / sizeof(arr[0]))
|
||||
|
||||
const std::string& getStatusName(ClientLibStatus status) {
|
||||
static const std::string statusNames[] = { "disabled", "uploading", "download", "active" };
|
||||
int idx = static_cast<int>(status);
|
||||
ASSERT_INDEX_IN_RANGE(idx, statusNames);
|
||||
return statusNames[idx];
|
||||
}
|
||||
|
||||
ClientLibStatus getStatusByName(std::string_view statusName) {
|
||||
static std::map<std::string_view, ClientLibStatus> statusByName;
|
||||
// initialize the map on demand
|
||||
if (statusByName.empty()) {
|
||||
for (int i = 0; i < static_cast<int>(ClientLibStatus::COUNT); i++) {
|
||||
ClientLibStatus status = static_cast<ClientLibStatus>(i);
|
||||
statusByName[getStatusName(status)] = status;
|
||||
}
|
||||
}
|
||||
auto statusIter = statusByName.find(statusName);
|
||||
if (statusIter == statusByName.cend()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Error", format("Unknown status value %s", std::string(statusName).c_str()));
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
return statusIter->second;
|
||||
}
|
||||
|
||||
const std::string& getPlatformName(ClientLibPlatform platform) {
|
||||
static const std::string platformNames[] = { "unknown", "x84_64-linux", "x86_64-windows", "x86_64-macos" };
|
||||
int idx = static_cast<int>(platform);
|
||||
ASSERT_INDEX_IN_RANGE(idx, platformNames);
|
||||
return platformNames[idx];
|
||||
}
|
||||
|
||||
ClientLibPlatform getPlatformByName(std::string_view platformName) {
|
||||
static std::map<std::string_view, ClientLibPlatform> platformByName;
|
||||
// initialize the map on demand
|
||||
if (platformByName.empty()) {
|
||||
for (int i = 0; i < static_cast<int>(ClientLibPlatform::COUNT); i++) {
|
||||
ClientLibPlatform platform = static_cast<ClientLibPlatform>(i);
|
||||
platformByName[getPlatformName(platform)] = platform;
|
||||
}
|
||||
}
|
||||
auto platfIter = platformByName.find(platformName);
|
||||
if (platfIter == platformByName.cend()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Error", format("Unknown platform value %s", std::string(platformName).c_str()));
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
return platfIter->second;
|
||||
}
|
||||
|
||||
const std::string& getChecksumAlgName(ClientLibChecksumAlg checksumAlg) {
|
||||
static const std::string checksumAlgNames[] = { "md5" };
|
||||
int idx = static_cast<int>(checksumAlg);
|
||||
ASSERT_INDEX_IN_RANGE(idx, checksumAlgNames);
|
||||
return checksumAlgNames[idx];
|
||||
}
|
||||
|
||||
ClientLibChecksumAlg getChecksumAlgByName(std::string_view checksumAlgName) {
|
||||
static std::map<std::string_view, ClientLibChecksumAlg> checksumAlgByName;
|
||||
// initialize the map on demand
|
||||
if (checksumAlgByName.empty()) {
|
||||
for (int i = 0; i < (int)ClientLibChecksumAlg::COUNT; i++) {
|
||||
ClientLibChecksumAlg checksumAlg = static_cast<ClientLibChecksumAlg>(i);
|
||||
checksumAlgByName[getChecksumAlgName(checksumAlg)] = checksumAlg;
|
||||
}
|
||||
}
|
||||
auto iter = checksumAlgByName.find(checksumAlgName);
|
||||
if (iter == checksumAlgByName.cend()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Error", format("Unknown checksum algorithm %s", std::string(checksumAlgName).c_str()));
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
return iter->second;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
bool isValidTargetStatus(ClientLibStatus status) {
|
||||
return status == ClientLibStatus::DISABLED || status == ClientLibStatus::DOWNLOAD ||
|
||||
status == ClientLibStatus::ACTIVE;
|
||||
}
|
||||
|
||||
bool isAvailableForDownload(ClientLibStatus status) {
|
||||
return status == ClientLibStatus::DOWNLOAD || status == ClientLibStatus::ACTIVE;
|
||||
}
|
||||
|
||||
void updateClientLibChangeCounter(Transaction& tr, ClientLibStatus prevStatus, ClientLibStatus newStatus) {
|
||||
static const int64_t counterIncVal = 1;
|
||||
if ((prevStatus != newStatus) &&
|
||||
(newStatus == ClientLibStatus::DOWNLOAD || newStatus == ClientLibStatus::ACTIVE ||
|
||||
prevStatus == ClientLibStatus::DOWNLOAD || prevStatus == ClientLibStatus::ACTIVE)) {
|
||||
tr.atomicOp(clientLibChangeCounterKey,
|
||||
StringRef(reinterpret_cast<const uint8_t*>(&counterIncVal), sizeof(counterIncVal)),
|
||||
MutationRef::AddValue);
|
||||
}
|
||||
}
|
||||
|
||||
json_spirit::mObject parseMetadataJson(StringRef metadataString) {
|
||||
json_spirit::mValue parsedMetadata;
|
||||
if (!json_spirit::read_string(metadataString.toString(), parsedMetadata) ||
|
||||
parsedMetadata.type() != json_spirit::obj_type) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Reason", "InvalidJSON")
|
||||
.detail("Configuration", metadataString);
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
|
||||
return parsedMetadata.get_obj();
|
||||
}
|
||||
|
||||
const std::string& getMetadataStrAttr(const json_spirit::mObject& metadataJson, const std::string& attrName) {
|
||||
auto attrIter = metadataJson.find(attrName);
|
||||
if (attrIter == metadataJson.cend() || attrIter->second.type() != json_spirit::str_type) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Error", format("Missing attribute %s", attrName.c_str()));
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
return attrIter->second.get_str();
|
||||
}
|
||||
|
||||
int getMetadataIntAttr(const json_spirit::mObject& metadataJson, const std::string& attrName) {
|
||||
auto attrIter = metadataJson.find(attrName);
|
||||
if (attrIter == metadataJson.cend() || attrIter->second.type() != json_spirit::int_type) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Error", format("Missing attribute %s", attrName.c_str()));
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
return attrIter->second.get_int();
|
||||
}
|
||||
|
||||
bool validVersionPartNum(int num) {
|
||||
return (num >= 0 && num < 1000);
|
||||
}
|
||||
|
||||
int getNumericVersionEncoding(const std::string& versionStr) {
|
||||
int major, minor, patch;
|
||||
int charsScanned;
|
||||
int numScanned = sscanf(versionStr.c_str(), "%d.%d.%d%n", &major, &minor, &patch, &charsScanned);
|
||||
if (numScanned != 3 || !validVersionPartNum(major) || !validVersionPartNum(minor) || !validVersionPartNum(patch) ||
|
||||
charsScanned != versionStr.size()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Error", format("Invalid version string %s", versionStr.c_str()));
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
return ((major * 1000) + minor) * 1000 + patch;
|
||||
}
|
||||
|
||||
Standalone<StringRef> getIdFromMetadataJson(const json_spirit::mObject& metadataJson) {
|
||||
std::ostringstream libIdBuilder;
|
||||
libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PLATFORM) << "/";
|
||||
libIdBuilder << format("%09d", getNumericVersionEncoding(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_VERSION)))
|
||||
<< "/";
|
||||
libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_TYPE) << "/";
|
||||
libIdBuilder << getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM);
|
||||
return Standalone<StringRef>(libIdBuilder.str());
|
||||
}
|
||||
|
||||
Key metadataKeyFromId(StringRef clientLibId) {
|
||||
return clientLibId.withPrefix(clientLibMetadataPrefix);
|
||||
}
|
||||
|
||||
Key chunkKeyPrefixFromId(StringRef clientLibId) {
|
||||
return clientLibId.withPrefix(clientLibBinaryPrefix).withSuffix(LiteralStringRef("/"));
|
||||
}
|
||||
|
||||
KeyRef chunkKeyFromNo(StringRef clientLibBinPrefix, size_t chunkNo, Arena& arena) {
|
||||
return clientLibBinPrefix.withSuffix(format("%06zu", chunkNo), arena);
|
||||
}
|
||||
|
||||
[[maybe_unused]] ClientLibPlatform getCurrentClientPlatform() {
|
||||
#ifdef __x86_64__
|
||||
#if defined(_WIN32)
|
||||
return ClientLibPlatform::X86_64_WINDOWS;
|
||||
#elif defined(__linux__)
|
||||
return ClientLibPlatform::X86_64_LINUX;
|
||||
#elif defined(__FreeBSD__) || defined(__APPLE__)
|
||||
return ClientLibPlatform::X86_64_MACOS;
|
||||
#else
|
||||
return ClientLibPlatform::UNKNOWN;
|
||||
#endif
|
||||
#else // not __x86_64__
|
||||
return ClientLibPlatform::UNKNOWN;
|
||||
#endif
|
||||
}
|
||||
|
||||
Standalone<StringRef> byteArrayToHexString(StringRef input) {
|
||||
static const char* digits = "0123456789abcdef";
|
||||
Standalone<StringRef> output = makeString(input.size() * 2);
|
||||
char* pout = reinterpret_cast<char*>(mutateString(output));
|
||||
for (const uint8_t* pin = input.begin(); pin != input.end(); ++pin) {
|
||||
*pout++ = digits[(*pin >> 4) & 0xF];
|
||||
*pout++ = digits[(*pin) & 0xF];
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Standalone<StringRef> md5SumToHexString(MD5_CTX& sum) {
|
||||
Standalone<StringRef> sumBytes = makeString(16);
|
||||
::MD5_Final(mutateString(sumBytes), &sum);
|
||||
return byteArrayToHexString(sumBytes);
|
||||
}
|
||||
|
||||
ClientLibFilter& ClientLibFilter::filterNewerPackageVersion(const std::string& versionStr) {
|
||||
matchNewerPackageVersion = true;
|
||||
this->numericPkgVersion = getNumericVersionEncoding(versionStr);
|
||||
return *this;
|
||||
}
|
||||
|
||||
Standalone<StringRef> getClientLibIdFromMetadataJson(StringRef metadataString) {
|
||||
json_spirit::mObject parsedMetadata = parseMetadataJson(metadataString);
|
||||
return getIdFromMetadataJson(parsedMetadata);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
ACTOR Future<Void> uploadClientLibBinary(Database db,
|
||||
StringRef libFilePath,
|
||||
KeyRef chunkKeyPrefix,
|
||||
ClientLibBinaryInfo* binInfo) {
|
||||
|
||||
state int chunkSize = getAlignedUpperBound(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNK_SIZE, 1024);
|
||||
state int transactionSize = std::max(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 1) * chunkSize;
|
||||
state size_t fileOffset = 0;
|
||||
state size_t chunkNo = 0;
|
||||
state MD5_CTX sum;
|
||||
state Arena arena;
|
||||
state StringRef buf;
|
||||
state Transaction tr;
|
||||
state size_t firstChunkNo;
|
||||
|
||||
// Disabling AIO, because it currently supports only page-aligned writes, but the size of a client library
|
||||
// is not necessariliy page-aligned, need to investigate if it is a limitation of AIO or just the way
|
||||
// we are wrapping it
|
||||
state Reference<IAsyncFile> fClientLib = wait(IAsyncFileSystem::filesystem()->open(
|
||||
libFilePath.toString(), IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO, 0));
|
||||
|
||||
::MD5_Init(&sum);
|
||||
|
||||
loop {
|
||||
arena = Arena();
|
||||
// Use page-aligned buffers for enabling possible future use with AIO
|
||||
buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena);
|
||||
state int bytesRead = wait(fClientLib->read(mutateString(buf), transactionSize, fileOffset));
|
||||
fileOffset += bytesRead;
|
||||
if (bytesRead <= 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
::MD5_Update(&sum, buf.begin(), bytesRead);
|
||||
|
||||
tr = Transaction(db);
|
||||
firstChunkNo = chunkNo;
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
int bufferOffset = 0;
|
||||
chunkNo = firstChunkNo;
|
||||
while (bufferOffset < bytesRead) {
|
||||
size_t chunkLen = std::min(chunkSize, bytesRead - bufferOffset);
|
||||
KeyRef chunkKey = chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena);
|
||||
chunkNo++;
|
||||
tr.set(chunkKey, ValueRef(mutateString(buf) + bufferOffset, chunkLen));
|
||||
bufferOffset += chunkLen;
|
||||
}
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
if (bytesRead < transactionSize) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
binInfo->totalBytes = fileOffset;
|
||||
binInfo->chunkCnt = chunkNo;
|
||||
binInfo->chunkSize = chunkSize;
|
||||
binInfo->sumBytes = md5SumToHexString(sum);
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<Void> uploadClientLibrary(Database db,
|
||||
Standalone<StringRef> metadataString,
|
||||
Standalone<StringRef> libFilePath) {
|
||||
state json_spirit::mObject metadataJson;
|
||||
state Standalone<StringRef> clientLibId;
|
||||
state Key clientLibMetaKey;
|
||||
state Key clientLibBinPrefix;
|
||||
state std::string jsStr;
|
||||
state Transaction tr;
|
||||
state ClientLibBinaryInfo binInfo;
|
||||
state ClientLibStatus targetStatus;
|
||||
|
||||
metadataJson = parseMetadataJson(metadataString);
|
||||
|
||||
json_spirit::mValue schema;
|
||||
if (!json_spirit::read_string(JSONSchemas::clientLibMetadataSchema.toString(), schema)) {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
std::string errorStr;
|
||||
if (!schemaMatch(schema.get_obj(), metadataJson, errorStr, SevWarnAlways)) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Reason", "SchemaMismatch")
|
||||
.detail("Configuration", metadataString)
|
||||
.detail("Error", errorStr);
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
|
||||
clientLibId = getIdFromMetadataJson(metadataJson);
|
||||
clientLibMetaKey = metadataKeyFromId(clientLibId);
|
||||
clientLibBinPrefix = chunkKeyPrefixFromId(clientLibId);
|
||||
|
||||
targetStatus = getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS));
|
||||
if (!isValidTargetStatus(targetStatus)) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Reason", "InvalidTargetStatus")
|
||||
.detail("Configuration", metadataString);
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
|
||||
// check if checksumalg and platform attributes have valid values
|
||||
getChecksumAlgByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM_ALG));
|
||||
getPlatformByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PLATFORM));
|
||||
|
||||
// Check if further mandatory attributes are set
|
||||
getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_GIT_HASH);
|
||||
getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_PROTOCOL);
|
||||
getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_API_VERSION);
|
||||
|
||||
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(ClientLibStatus::UPLOADING);
|
||||
jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson));
|
||||
|
||||
/*
|
||||
* Check if the client library with the same identifier already exists.
|
||||
* If not, write its metadata with "uploading" state to prevent concurrent uploads
|
||||
*/
|
||||
tr = Transaction(db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> existingMeta = wait(tr.get(clientLibMetaKey));
|
||||
if (existingMeta.present()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryAlreadyExists")
|
||||
.detail("Key", clientLibMetaKey)
|
||||
.detail("ExistingMetadata", existingMeta.get().toString());
|
||||
throw client_lib_already_exists();
|
||||
}
|
||||
|
||||
TraceEvent("ClientLibraryBeginUpload").detail("Key", clientLibMetaKey);
|
||||
|
||||
tr.set(clientLibMetaKey, ValueRef(jsStr));
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Upload the binary of the client library in chunks
|
||||
*/
|
||||
wait(uploadClientLibBinary(db, libFilePath, clientLibBinPrefix, &binInfo));
|
||||
|
||||
std::string checkSum = getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM);
|
||||
if (binInfo.sumBytes != StringRef(checkSum)) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryChecksumMismatch")
|
||||
.detail("Expected", checkSum)
|
||||
.detail("Actual", binInfo.sumBytes)
|
||||
.detail("Configuration", metadataString);
|
||||
// Rollback the upload operation
|
||||
try {
|
||||
wait(deleteClientLibrary(db, clientLibId));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "ClientLibraryUploadRollbackFailed").error(e);
|
||||
}
|
||||
throw client_lib_invalid_binary();
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the metadata entry, with additional information about the binary
|
||||
* and change its state from "uploading" to the given one
|
||||
*/
|
||||
metadataJson[CLIENTLIB_ATTR_SIZE] = static_cast<int64_t>(binInfo.totalBytes);
|
||||
metadataJson[CLIENTLIB_ATTR_CHUNK_COUNT] = static_cast<int64_t>(binInfo.chunkCnt);
|
||||
metadataJson[CLIENTLIB_ATTR_CHUNK_SIZE] = static_cast<int64_t>(binInfo.chunkSize);
|
||||
metadataJson[CLIENTLIB_ATTR_FILENAME] = basename(libFilePath.toString());
|
||||
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(targetStatus);
|
||||
jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson));
|
||||
|
||||
tr.reset();
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.set(clientLibMetaKey, ValueRef(jsStr));
|
||||
updateClientLibChangeCounter(tr, ClientLibStatus::DISABLED, targetStatus);
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("ClientLibraryUploadDone").detail("Key", clientLibMetaKey);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> downloadClientLibrary(Database db,
|
||||
Standalone<StringRef> clientLibId,
|
||||
Standalone<StringRef> libFilePath) {
|
||||
state Key clientLibMetaKey = metadataKeyFromId(clientLibId);
|
||||
state Key chunkKeyPrefix = chunkKeyPrefixFromId(clientLibId);
|
||||
state int chunksPerTransaction = std::max(CLIENT_KNOBS->MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 1);
|
||||
state int transactionSize;
|
||||
state json_spirit::mObject metadataJson;
|
||||
state std::string checkSum;
|
||||
state size_t chunkCount;
|
||||
state size_t binarySize;
|
||||
state size_t expectedChunkSize;
|
||||
state Transaction tr;
|
||||
state size_t fileOffset;
|
||||
state MD5_CTX sum;
|
||||
state Arena arena;
|
||||
state StringRef buf;
|
||||
state size_t bufferOffset;
|
||||
state size_t fromChunkNo;
|
||||
state size_t toChunkNo;
|
||||
state std::vector<Future<Optional<Value>>> chunkFutures;
|
||||
|
||||
TraceEvent("ClientLibraryBeginDownload").detail("Key", clientLibMetaKey);
|
||||
|
||||
/*
|
||||
* First read the metadata to get information about the status and
|
||||
* the chunk count of the client library
|
||||
*/
|
||||
loop {
|
||||
tr = Transaction(db);
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
Optional<Value> metadataOpt = wait(tr.get(clientLibMetaKey));
|
||||
if (!metadataOpt.present()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey);
|
||||
throw client_lib_not_found();
|
||||
}
|
||||
metadataJson = parseMetadataJson(metadataOpt.get());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent downloading not yet uploaded and disabled libraries
|
||||
if (!isAvailableForDownload(getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)))) {
|
||||
throw client_lib_not_available();
|
||||
}
|
||||
|
||||
// Disabling AIO, because it currently supports only page-aligned writes, but the size of a client library
|
||||
// is not necessariliy page-aligned, need to investigate if it is a limitation of AIO or just the way
|
||||
// we are wrapping it
|
||||
int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE |
|
||||
IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO;
|
||||
state Reference<IAsyncFile> fClientLib =
|
||||
wait(IAsyncFileSystem::filesystem()->open(libFilePath.toString(), flags, 0666));
|
||||
|
||||
checkSum = getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_CHECKSUM);
|
||||
chunkCount = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_COUNT);
|
||||
binarySize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_SIZE);
|
||||
expectedChunkSize = getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_CHUNK_SIZE);
|
||||
transactionSize = chunksPerTransaction * expectedChunkSize;
|
||||
fileOffset = 0;
|
||||
fromChunkNo = 0;
|
||||
|
||||
::MD5_Init(&sum);
|
||||
|
||||
arena = Arena();
|
||||
// Use page-aligned buffers for enabling possible future use with AIO
|
||||
buf = makeAlignedString(_PAGE_SIZE, transactionSize, arena);
|
||||
|
||||
loop {
|
||||
if (fromChunkNo == chunkCount) {
|
||||
break;
|
||||
}
|
||||
|
||||
tr = Transaction(db);
|
||||
toChunkNo = std::min(chunkCount, fromChunkNo + chunksPerTransaction);
|
||||
|
||||
// read a batch of file chunks concurrently
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
|
||||
chunkFutures.clear();
|
||||
for (size_t chunkNo = fromChunkNo; chunkNo < toChunkNo; chunkNo++) {
|
||||
KeyRef chunkKey = chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena);
|
||||
chunkFutures.push_back(tr.get(chunkKey));
|
||||
}
|
||||
|
||||
wait(waitForAll(chunkFutures));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// check the read chunks and copy them to a buffer
|
||||
bufferOffset = 0;
|
||||
size_t chunkNo = fromChunkNo;
|
||||
for (auto chunkOptFuture : chunkFutures) {
|
||||
if (!chunkOptFuture.get().present()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryChunkNotFound")
|
||||
.detail("Key", chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena));
|
||||
throw client_lib_invalid_binary();
|
||||
}
|
||||
StringRef chunkVal = chunkOptFuture.get().get();
|
||||
|
||||
// All chunks exept for the last one must be of the expected size to guarantee
|
||||
// alignment when writing to file
|
||||
if ((chunkNo != (chunkCount - 1) && chunkVal.size() != expectedChunkSize) ||
|
||||
chunkVal.size() > expectedChunkSize) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidChunkSize")
|
||||
.detail("Key", chunkKeyFromNo(chunkKeyPrefix, chunkNo, arena))
|
||||
.detail("MaxSize", expectedChunkSize)
|
||||
.detail("ActualSize", chunkVal.size());
|
||||
throw client_lib_invalid_binary();
|
||||
}
|
||||
|
||||
memcpy(mutateString(buf) + bufferOffset, chunkVal.begin(), chunkVal.size());
|
||||
bufferOffset += chunkVal.size();
|
||||
chunkNo++;
|
||||
}
|
||||
|
||||
// write the chunks to the file, update checksum
|
||||
if (bufferOffset > 0) {
|
||||
wait(fClientLib->write(buf.begin(), bufferOffset, fileOffset));
|
||||
fileOffset += bufferOffset;
|
||||
::MD5_Update(&sum, buf.begin(), bufferOffset);
|
||||
}
|
||||
|
||||
// move to the next batch
|
||||
fromChunkNo = toChunkNo;
|
||||
}
|
||||
|
||||
// check if the downloaded file size is as expected
|
||||
if (fileOffset != binarySize) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidSize")
|
||||
.detail("ExpectedSize", binarySize)
|
||||
.detail("ActualSize", fileOffset);
|
||||
throw client_lib_invalid_binary();
|
||||
}
|
||||
|
||||
// check if the checksum of downloaded file is as expected
|
||||
Standalone<StringRef> sumBytesStr = md5SumToHexString(sum);
|
||||
if (sumBytesStr != StringRef(checkSum)) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryChecksumMismatch")
|
||||
.detail("Expected", checkSum)
|
||||
.detail("Actual", sumBytesStr)
|
||||
.detail("Key", clientLibMetaKey);
|
||||
throw client_lib_invalid_binary();
|
||||
}
|
||||
|
||||
wait(fClientLib->sync());
|
||||
|
||||
TraceEvent("ClientLibraryDownloadDone").detail("Key", clientLibMetaKey);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteClientLibrary(Database db, Standalone<StringRef> clientLibId) {
|
||||
state Key clientLibMetaKey = metadataKeyFromId(clientLibId.toString());
|
||||
state Key chunkKeyPrefix = chunkKeyPrefixFromId(clientLibId.toString());
|
||||
|
||||
TraceEvent("ClientLibraryBeginDelete").detail("Key", clientLibMetaKey);
|
||||
|
||||
loop {
|
||||
state Transaction tr(db);
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> metadataOpt = wait(tr.get(clientLibMetaKey));
|
||||
if (!metadataOpt.present()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey);
|
||||
throw client_lib_not_found();
|
||||
}
|
||||
json_spirit::mObject metadataJson = parseMetadataJson(metadataOpt.get());
|
||||
ClientLibStatus status = getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS));
|
||||
tr.clear(prefixRange(chunkKeyPrefix));
|
||||
tr.clear(clientLibMetaKey);
|
||||
updateClientLibChangeCounter(tr, status, ClientLibStatus::DISABLED);
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("ClientLibraryDeleteDone").detail("Key", clientLibMetaKey);
|
||||
return Void();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
void applyClientLibFilter(const ClientLibFilter& filter,
|
||||
const RangeResultRef& scanResults,
|
||||
Standalone<VectorRef<StringRef>>& filteredResults) {
|
||||
for (const auto& [k, v] : scanResults) {
|
||||
try {
|
||||
json_spirit::mObject metadataJson = parseMetadataJson(v);
|
||||
if (filter.matchAvailableOnly &&
|
||||
!isAvailableForDownload(getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS)))) {
|
||||
continue;
|
||||
}
|
||||
if (filter.matchCompatibleAPI &&
|
||||
getMetadataIntAttr(metadataJson, CLIENTLIB_ATTR_API_VERSION) < filter.apiVersion) {
|
||||
continue;
|
||||
}
|
||||
if (filter.matchNewerPackageVersion && !filter.matchPlatform &&
|
||||
getNumericVersionEncoding(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_VERSION)) <=
|
||||
filter.numericPkgVersion) {
|
||||
continue;
|
||||
}
|
||||
filteredResults.push_back_deep(filteredResults.arena(), v);
|
||||
} catch (Error& e) {
|
||||
// Entries with invalid metadata on the cluster
|
||||
// Can happen only if the official management interface is bypassed
|
||||
ASSERT(e.code() == error_code_client_lib_invalid_metadata);
|
||||
TraceEvent(SevError, "ClientLibraryIgnoringInvalidMetadata").detail("Metadata", v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<Standalone<VectorRef<StringRef>>> listClientLibraries(Database db, ClientLibFilter filter) {
|
||||
state Standalone<VectorRef<StringRef>> result;
|
||||
state Transaction tr(db);
|
||||
state PromiseStream<Standalone<RangeResultRef>> scanResults;
|
||||
state Key fromKey;
|
||||
state Key toKey;
|
||||
state KeyRangeRef scanRange;
|
||||
state Future<Void> stream;
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
if (filter.matchPlatform) {
|
||||
Key prefixWithPlatform =
|
||||
clientLibMetadataPrefix.withSuffix(std::string(getPlatformName(filter.platformVal)));
|
||||
fromKey = prefixWithPlatform.withSuffix(LiteralStringRef("/"));
|
||||
if (filter.matchNewerPackageVersion) {
|
||||
fromKey = fromKey.withSuffix(format("%09d", filter.numericPkgVersion + 1));
|
||||
}
|
||||
toKey = prefixWithPlatform.withSuffix(LiteralStringRef("0"));
|
||||
scanRange = KeyRangeRef(fromKey, toKey);
|
||||
} else {
|
||||
scanRange = clientLibMetadataKeys;
|
||||
}
|
||||
scanResults = PromiseStream<Standalone<RangeResultRef>>();
|
||||
stream = tr.getRangeStream(scanResults, scanRange, GetRangeLimits());
|
||||
loop {
|
||||
Standalone<RangeResultRef> scanResultRange = waitNext(scanResults.getFuture());
|
||||
applyClientLibFilter(filter, scanResultRange, result);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_end_of_stream) {
|
||||
break;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
ACTOR Future<ClientLibStatus> getClientLibraryStatus(Database db, Standalone<StringRef> clientLibId) {
|
||||
state Key clientLibMetaKey = metadataKeyFromId(clientLibId);
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
Optional<Value> metadataOpt = wait(tr.get(clientLibMetaKey));
|
||||
if (!metadataOpt.present()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey);
|
||||
throw client_lib_not_found();
|
||||
}
|
||||
json_spirit::mObject metadataJson = parseMetadataJson(metadataOpt.get());
|
||||
return getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS));
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeClientLibraryStatus(Database db,
|
||||
Standalone<StringRef> clientLibId,
|
||||
ClientLibStatus newStatus) {
|
||||
state Key clientLibMetaKey = metadataKeyFromId(clientLibId);
|
||||
state json_spirit::mObject metadataJson;
|
||||
state std::string jsStr;
|
||||
state Transaction tr;
|
||||
|
||||
if (!isValidTargetStatus(newStatus)) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryInvalidMetadata")
|
||||
.detail("Reason", "InvalidTargetStatus")
|
||||
.detail("Status", getStatusName(newStatus));
|
||||
throw client_lib_invalid_metadata();
|
||||
}
|
||||
|
||||
loop {
|
||||
tr = Transaction(db);
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> metadataOpt = wait(tr.get(clientLibMetaKey));
|
||||
if (!metadataOpt.present()) {
|
||||
TraceEvent(SevWarnAlways, "ClientLibraryNotFound").detail("Key", clientLibMetaKey);
|
||||
throw client_lib_not_found();
|
||||
}
|
||||
metadataJson = parseMetadataJson(metadataOpt.get());
|
||||
ClientLibStatus prevStatus = getStatusByName(getMetadataStrAttr(metadataJson, CLIENTLIB_ATTR_STATUS));
|
||||
if (prevStatus == newStatus) {
|
||||
return Void();
|
||||
}
|
||||
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(newStatus);
|
||||
jsStr = json_spirit::write_string(json_spirit::mValue(metadataJson));
|
||||
tr.set(clientLibMetaKey, ValueRef(jsStr));
|
||||
|
||||
updateClientLibChangeCounter(tr, prevStatus, newStatus);
|
||||
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_client_lib_not_found) {
|
||||
throw;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("ClientLibraryStatusChanged").detail("Key", clientLibMetaKey).detail("Status", getStatusName(newStatus));
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // namespace ClientLibManagement
|
|
@ -1,146 +0,0 @@
|
|||
/*
|
||||
* ClientLibManagement.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_G_H)
|
||||
#define FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_G_H
|
||||
#include "fdbclient/ClientLibManagement.actor.g.h"
|
||||
#elif !defined(FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_H)
|
||||
#define FDBCLIENT_MULTI_VERSION_CLIENT_CONTROL_ACTOR_H
|
||||
|
||||
#include <string>
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/md5/md5.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
namespace ClientLibManagement {
|
||||
|
||||
enum class ClientLibStatus {
|
||||
DISABLED = 0,
|
||||
UPLOADING, // 1
|
||||
DOWNLOAD, // 2
|
||||
ACTIVE, // 3
|
||||
COUNT // must be the last one
|
||||
};
|
||||
|
||||
enum class ClientLibPlatform {
|
||||
UNKNOWN = 0,
|
||||
X86_64_LINUX,
|
||||
X86_64_WINDOWS,
|
||||
X86_64_MACOS,
|
||||
COUNT // must be the last one
|
||||
};
|
||||
|
||||
// Currently we support only one,
|
||||
// but we may want to change it in the future
|
||||
enum class ClientLibChecksumAlg {
|
||||
MD5 = 0,
|
||||
COUNT // must be the last one
|
||||
};
|
||||
|
||||
inline const std::string CLIENTLIB_ATTR_PLATFORM{ "platform" };
|
||||
inline const std::string CLIENTLIB_ATTR_STATUS{ "status" };
|
||||
inline const std::string CLIENTLIB_ATTR_CHECKSUM{ "checksum" };
|
||||
inline const std::string CLIENTLIB_ATTR_VERSION{ "version" };
|
||||
inline const std::string CLIENTLIB_ATTR_TYPE{ "type" };
|
||||
inline const std::string CLIENTLIB_ATTR_API_VERSION{ "apiversion" };
|
||||
inline const std::string CLIENTLIB_ATTR_PROTOCOL{ "protocol" };
|
||||
inline const std::string CLIENTLIB_ATTR_GIT_HASH{ "githash" };
|
||||
inline const std::string CLIENTLIB_ATTR_FILENAME{ "filename" };
|
||||
inline const std::string CLIENTLIB_ATTR_SIZE{ "size" };
|
||||
inline const std::string CLIENTLIB_ATTR_CHUNK_COUNT{ "chunkcount" };
|
||||
inline const std::string CLIENTLIB_ATTR_CHUNK_SIZE{ "chunksize" };
|
||||
inline const std::string CLIENTLIB_ATTR_CHECKSUM_ALG{ "checksumalg" };
|
||||
|
||||
struct ClientLibFilter {
|
||||
bool matchAvailableOnly = false;
|
||||
bool matchPlatform = false;
|
||||
bool matchCompatibleAPI = false;
|
||||
bool matchNewerPackageVersion = false;
|
||||
ClientLibPlatform platformVal = ClientLibPlatform::UNKNOWN;
|
||||
int apiVersion = 0;
|
||||
int numericPkgVersion = 0;
|
||||
|
||||
ClientLibFilter& filterAvailable() {
|
||||
matchAvailableOnly = true;
|
||||
return *this;
|
||||
}
|
||||
|
||||
ClientLibFilter& filterPlatform(ClientLibPlatform platformVal) {
|
||||
matchPlatform = true;
|
||||
this->platformVal = platformVal;
|
||||
return *this;
|
||||
}
|
||||
|
||||
ClientLibFilter& filterCompatibleAPI(int apiVersion) {
|
||||
matchCompatibleAPI = true;
|
||||
this->apiVersion = apiVersion;
|
||||
return *this;
|
||||
}
|
||||
|
||||
// expects a version string like "6.3.10"
|
||||
ClientLibFilter& filterNewerPackageVersion(const std::string& versionStr);
|
||||
};
|
||||
|
||||
const std::string& getStatusName(ClientLibStatus status);
|
||||
ClientLibStatus getStatusByName(std::string_view statusName);
|
||||
|
||||
const std::string& getPlatformName(ClientLibPlatform platform);
|
||||
ClientLibPlatform getPlatformByName(std::string_view platformName);
|
||||
|
||||
const std::string& getChecksumAlgName(ClientLibChecksumAlg checksumAlg);
|
||||
ClientLibChecksumAlg getChecksumAlgByName(std::string_view checksumAlgName);
|
||||
|
||||
// encodes MD5 result to a hexadecimal string to be provided in the checksum attribute
|
||||
Standalone<StringRef> md5SumToHexString(MD5_CTX& sum);
|
||||
|
||||
// Upload a client library binary from a file and associated metadata JSON
|
||||
// to the system keyspace of the database
|
||||
ACTOR Future<Void> uploadClientLibrary(Database db,
|
||||
Standalone<StringRef> metadataString,
|
||||
Standalone<StringRef> libFilePath);
|
||||
|
||||
// Determine clientLibId from the relevant attributes of the metadata JSON
|
||||
Standalone<StringRef> getClientLibIdFromMetadataJson(StringRef metadataString);
|
||||
|
||||
// Download a client library binary from the system keyspace of the database
|
||||
// and save it at the given file path
|
||||
ACTOR Future<Void> downloadClientLibrary(Database db,
|
||||
Standalone<StringRef> clientLibId,
|
||||
Standalone<StringRef> libFilePath);
|
||||
|
||||
// Delete the client library binary from to the system keyspace of the database
|
||||
ACTOR Future<Void> deleteClientLibrary(Database db, Standalone<StringRef> clientLibId);
|
||||
|
||||
// List client libraries available on the cluster, with the specified filter
|
||||
// Returns metadata JSON of each library
|
||||
ACTOR Future<Standalone<VectorRef<StringRef>>> listClientLibraries(Database db, ClientLibFilter filter);
|
||||
|
||||
// Get the current status of an uploaded client library
|
||||
ACTOR Future<ClientLibStatus> getClientLibraryStatus(Database db, Standalone<StringRef> clientLibId);
|
||||
|
||||
// Change client library metadata status
|
||||
ACTOR Future<Void> changeClientLibraryStatus(Database db, Standalone<StringRef> clientLibId, ClientLibStatus newStatus);
|
||||
|
||||
} // namespace ClientLibManagement
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
|
@ -115,9 +115,6 @@ struct ClientDBInfo {
|
|||
firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk
|
||||
Optional<Value> forward;
|
||||
std::vector<VersionHistory> history;
|
||||
// a counter increased every time a change of uploaded client libraries
|
||||
// happens, the clients need to be aware of
|
||||
uint64_t clientLibChangeCounter = 0;
|
||||
|
||||
ClientDBInfo() {}
|
||||
|
||||
|
@ -129,7 +126,7 @@ struct ClientDBInfo {
|
|||
if constexpr (!is_fb_function<Archive>) {
|
||||
ASSERT(ar.protocolVersion().isValid());
|
||||
}
|
||||
serializer(ar, grvProxies, commitProxies, id, forward, history, clientLibChangeCounter);
|
||||
serializer(ar, grvProxies, commitProxies, id, forward, history);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -249,7 +249,6 @@ public:
|
|||
Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(UseProvisionalProxies useProvisionalProxies);
|
||||
Reference<GrvProxyInfo> getGrvProxies(UseProvisionalProxies useProvisionalProxies);
|
||||
Future<Void> onProxiesChanged() const;
|
||||
Future<Void> onClientLibStatusChanged() const;
|
||||
Future<HealthMetrics> getHealthMetrics(bool detailed);
|
||||
// Pass a negative value for `shardLimit` to indicate no limit on the shard number.
|
||||
Future<StorageMetrics> getStorageMetrics(KeyRange const& keys, int shardLimit);
|
||||
|
@ -347,7 +346,6 @@ public:
|
|||
// Key DB-specific information
|
||||
Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord;
|
||||
AsyncTrigger proxiesChangeTrigger;
|
||||
AsyncTrigger clientLibChangeTrigger;
|
||||
Future<Void> clientDBInfoMonitor;
|
||||
Future<Void> monitorTssInfoChange;
|
||||
Future<Void> tssMismatchHandler;
|
||||
|
|
|
@ -732,15 +732,12 @@ Future<Void> attemptGRVFromOldProxies(std::vector<GrvProxyInterface> oldProxies,
|
|||
|
||||
ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
|
||||
Reference<AsyncVar<ClientDBInfo> const> clientDBInfo,
|
||||
AsyncTrigger* proxyChangeTrigger,
|
||||
AsyncTrigger* clientLibChangeTrigger) {
|
||||
AsyncTrigger* proxyChangeTrigger) {
|
||||
state std::vector<CommitProxyInterface> curCommitProxies;
|
||||
state std::vector<GrvProxyInterface> curGrvProxies;
|
||||
state ActorCollection actors(false);
|
||||
state uint64_t curClientLibChangeCounter;
|
||||
curCommitProxies = clientDBInfo->get().commitProxies;
|
||||
curGrvProxies = clientDBInfo->get().grvProxies;
|
||||
curClientLibChangeCounter = clientDBInfo->get().clientLibChangeCounter;
|
||||
|
||||
loop {
|
||||
choose {
|
||||
|
@ -763,9 +760,6 @@ ACTOR static Future<Void> monitorClientDBInfoChange(DatabaseContext* cx,
|
|||
curGrvProxies = clientDBInfo->get().grvProxies;
|
||||
proxyChangeTrigger->trigger();
|
||||
}
|
||||
if (curClientLibChangeCounter != clientDBInfo->get().clientLibChangeCounter) {
|
||||
clientLibChangeTrigger->trigger();
|
||||
}
|
||||
}
|
||||
when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); }
|
||||
}
|
||||
|
@ -1255,7 +1249,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
|
||||
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
|
||||
|
||||
clientDBInfoMonitor = monitorClientDBInfoChange(this, clientInfo, &proxiesChangeTrigger, &clientLibChangeTrigger);
|
||||
clientDBInfoMonitor = monitorClientDBInfoChange(this, clientInfo, &proxiesChangeTrigger);
|
||||
tssMismatchHandler = handleTssMismatches(this);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
cacheListMonitor = monitorCacheList(this);
|
||||
|
@ -1606,10 +1600,6 @@ Future<Void> DatabaseContext::onProxiesChanged() const {
|
|||
return this->proxiesChangeTrigger.onTrigger();
|
||||
}
|
||||
|
||||
Future<Void> DatabaseContext::onClientLibStatusChanged() const {
|
||||
return this->clientLibChangeTrigger.onTrigger();
|
||||
}
|
||||
|
||||
bool DatabaseContext::sampleReadTags() const {
|
||||
double sampleRate = GlobalConfig::globalConfig().get(transactionTagSampleRate, CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
|
||||
return sampleRate > 0 && deterministicRandom()->random01() <= sampleRate;
|
||||
|
|
|
@ -1077,19 +1077,3 @@ const KeyRef JSONSchemas::managementApiErrorSchema = LiteralStringRef(R"""(
|
|||
"message": "The reason of the error"
|
||||
}
|
||||
)""");
|
||||
|
||||
const KeyRef JSONSchemas::clientLibMetadataSchema = LiteralStringRef(R"""(
|
||||
{
|
||||
"platform": "x86_64-linux",
|
||||
"version": "7.1.0",
|
||||
"githash": "e28fef6264d05ab0c9488238022d1ee885a30bea",
|
||||
"type": "debug",
|
||||
"checksum": "fcef53fb4ae86d2c4fff4dc17c7e5d08",
|
||||
"checksumalg": "md5",
|
||||
"apiversion": 710,
|
||||
"protocol": "fdb00b07001001",
|
||||
"filename": "libfdb_c.7.1.0.so",
|
||||
"size" : 19467552,
|
||||
"chunkcount" : 2377,
|
||||
"status": "available"
|
||||
})""");
|
||||
|
|
|
@ -35,7 +35,6 @@ struct JSONSchemas {
|
|||
static const KeyRef storageHealthSchema;
|
||||
static const KeyRef aggregateHealthSchema;
|
||||
static const KeyRef managementApiErrorSchema;
|
||||
static const KeyRef clientLibMetadataSchema;
|
||||
};
|
||||
|
||||
#endif /* FDBCLIENT_SCHEMAS_H */
|
||||
|
|
|
@ -1033,16 +1033,6 @@ std::pair<Key, Version> decodeHealthyZoneValue(ValueRef const& value) {
|
|||
return std::make_pair(zoneId, version);
|
||||
}
|
||||
|
||||
const KeyRangeRef clientLibMetadataKeys(LiteralStringRef("\xff\x02/clientlib/meta/"),
|
||||
LiteralStringRef("\xff\x02/clientlib/meta0"));
|
||||
const KeyRef clientLibMetadataPrefix = clientLibMetadataKeys.begin;
|
||||
|
||||
const KeyRangeRef clientLibBinaryKeys(LiteralStringRef("\xff\x02/clientlib/bin/"),
|
||||
LiteralStringRef("\xff\x02/clientlib/bin0"));
|
||||
const KeyRef clientLibBinaryPrefix = clientLibBinaryKeys.begin;
|
||||
|
||||
const KeyRef clientLibChangeCounterKey = "\xff\x02/clientlib/changeCounter"_sr;
|
||||
|
||||
const KeyRangeRef testOnlyTxnStateStorePrefixRange(LiteralStringRef("\xff/TESTONLYtxnStateStore/"),
|
||||
LiteralStringRef("\xff/TESTONLYtxnStateStore0"));
|
||||
|
||||
|
|
|
@ -488,16 +488,6 @@ extern const KeyRef rebalanceDDIgnoreKey;
|
|||
const Value healthyZoneValue(StringRef const& zoneId, Version version);
|
||||
std::pair<Key, Version> decodeHealthyZoneValue(ValueRef const&);
|
||||
|
||||
// Key ranges reserved for storing client library binaries and respective
|
||||
// json documents with the metadata describing the libaries
|
||||
extern const KeyRangeRef clientLibMetadataKeys;
|
||||
extern const KeyRef clientLibMetadataPrefix;
|
||||
|
||||
extern const KeyRangeRef clientLibBinaryKeys;
|
||||
extern const KeyRef clientLibBinaryPrefix;
|
||||
|
||||
extern const KeyRef clientLibChangeCounterKey;
|
||||
|
||||
// All mutations done to this range are blindly copied into txnStateStore.
|
||||
// Used to create artifically large txnStateStore instances in testing.
|
||||
extern const KeyRangeRef testOnlyTxnStateStorePrefixRange;
|
||||
|
|
|
@ -1205,8 +1205,8 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
|
|||
FLOW_KNOBS->CONNECTION_REJECTED_MESSAGE_DELAY) {
|
||||
TraceEvent(SevWarn, "ConnectionRejected", conn->getDebugID())
|
||||
.detail("Reason", "IncompatibleProtocolVersion")
|
||||
.detail("LocalVersion", g_network->protocolVersion().version())
|
||||
.detail("RejectedVersion", pkt.protocolVersion.version())
|
||||
.detail("LocalVersion", g_network->protocolVersion())
|
||||
.detail("RejectedVersion", pkt.protocolVersion)
|
||||
.detail("Peer",
|
||||
pkt.canonicalRemotePort
|
||||
? NetworkAddress(pkt.canonicalRemoteIp(), pkt.canonicalRemotePort)
|
||||
|
|
|
@ -174,7 +174,6 @@ set(FDBSERVER_SRCS
|
|||
workloads/Cache.actor.cpp
|
||||
workloads/ChangeConfig.actor.cpp
|
||||
workloads/ClearSingleRange.actor.cpp
|
||||
workloads/ClientLibManagementWorkload.actor.cpp
|
||||
workloads/ClientTransactionProfileCorrectness.actor.cpp
|
||||
workloads/TriggerRecovery.actor.cpp
|
||||
workloads/SuspendProcesses.actor.cpp
|
||||
|
|
|
@ -1633,48 +1633,6 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorClientLibChangeCounter(ClusterControllerData::DBInfo* db) {
|
||||
state ClientDBInfo clientInfo;
|
||||
state ReadYourWritesTransaction tr;
|
||||
state Future<Void> clientLibChangeFuture;
|
||||
|
||||
loop {
|
||||
tr = ReadYourWritesTransaction(db->db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
|
||||
Optional<Value> counterVal = wait(tr.get(clientLibChangeCounterKey));
|
||||
if (counterVal.present() && counterVal.get().size() == sizeof(uint64_t)) {
|
||||
uint64_t changeCounter = *reinterpret_cast<const uint64_t*>(counterVal.get().begin());
|
||||
|
||||
clientInfo = db->serverInfo->get().client;
|
||||
if (changeCounter != clientInfo.clientLibChangeCounter) {
|
||||
TraceEvent("ClientLibChangeCounterChanged").detail("Value", changeCounter);
|
||||
clientInfo.id = deterministicRandom()->randomUniqueID();
|
||||
clientInfo.clientLibChangeCounter = changeCounter;
|
||||
db->clientInfo->set(clientInfo);
|
||||
|
||||
ServerDBInfo serverInfo = db->serverInfo->get();
|
||||
serverInfo.id = deterministicRandom()->randomUniqueID();
|
||||
serverInfo.infoGeneration = ++db->dbInfoCount;
|
||||
serverInfo.client = clientInfo;
|
||||
db->serverInfo->set(serverInfo);
|
||||
}
|
||||
}
|
||||
|
||||
clientLibChangeFuture = tr.watch(clientLibChangeCounterKey);
|
||||
wait(tr.commit());
|
||||
wait(clientLibChangeFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updatedChangingDatacenters(ClusterControllerData* self) {
|
||||
// do not change the cluster controller until all the processes have had a chance to register
|
||||
wait(delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
|
||||
|
@ -2466,7 +2424,6 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
|
|||
self.addActor.send(monitorProcessClasses(&self));
|
||||
self.addActor.send(monitorServerInfoConfig(&self.db));
|
||||
self.addActor.send(monitorGlobalConfig(&self.db));
|
||||
self.addActor.send(monitorClientLibChangeCounter(&self.db));
|
||||
self.addActor.send(updatedChangingDatacenters(&self));
|
||||
self.addActor.send(updatedChangedDatacenters(&self));
|
||||
self.addActor.send(updateDatacenterVersionDifference(&self));
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
FDB_DEFINE_BOOLEAN_PARAM(IsPrimary);
|
||||
|
@ -5058,3 +5059,594 @@ Future<Void> DDTeamCollection::run(Reference<DDTeamCollection> teamCollection,
|
|||
Future<Void> DDTeamCollection::printSnapshotTeamsInfo(Reference<DDTeamCollection> self) {
|
||||
return DDTeamCollectionImpl::printSnapshotTeamsInfo(self);
|
||||
}
|
||||
|
||||
std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
|
||||
Reference<IReplicationPolicy> policy,
|
||||
int processCount) {
|
||||
Database database = DatabaseContext::create(
|
||||
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::False);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.storageTeamSize = teamSize;
|
||||
conf.storagePolicy = policy;
|
||||
|
||||
auto collection =
|
||||
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(database,
|
||||
UID(0, 0),
|
||||
MoveKeysLock(),
|
||||
PromiseStream<RelocateShard>(),
|
||||
makeReference<ShardsAffectedByTeamFailure>(),
|
||||
conf,
|
||||
{},
|
||||
{},
|
||||
Future<Void>(Void()),
|
||||
makeReference<AsyncVar<bool>>(true),
|
||||
IsPrimary::True,
|
||||
makeReference<AsyncVar<bool>>(false),
|
||||
makeReference<AsyncVar<bool>>(false),
|
||||
PromiseStream<GetMetricsRequest>(),
|
||||
Promise<UID>(),
|
||||
PromiseStream<Promise<int>>()));
|
||||
|
||||
for (int id = 1; id <= processCount; ++id) {
|
||||
UID uid(id, 0);
|
||||
StorageServerInterface interface;
|
||||
interface.uniqueID = uid;
|
||||
interface.locality.set(LiteralStringRef("machineid"), Standalone<StringRef>(std::to_string(id)));
|
||||
interface.locality.set(LiteralStringRef("zoneid"), Standalone<StringRef>(std::to_string(id % 5)));
|
||||
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(id % 3)));
|
||||
collection->server_info[uid] = makeReference<TCServerInfo>(
|
||||
interface, collection.get(), ProcessClass(), true, collection->storageServerSet);
|
||||
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality));
|
||||
collection->checkAndCreateMachine(collection->server_info[uid]);
|
||||
}
|
||||
|
||||
return collection;
|
||||
}
|
||||
|
||||
std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
|
||||
Reference<IReplicationPolicy> policy,
|
||||
int processCount) {
|
||||
Database database = DatabaseContext::create(
|
||||
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::False);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.storageTeamSize = teamSize;
|
||||
conf.storagePolicy = policy;
|
||||
|
||||
auto collection =
|
||||
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(database,
|
||||
UID(0, 0),
|
||||
MoveKeysLock(),
|
||||
PromiseStream<RelocateShard>(),
|
||||
makeReference<ShardsAffectedByTeamFailure>(),
|
||||
conf,
|
||||
{},
|
||||
{},
|
||||
Future<Void>(Void()),
|
||||
makeReference<AsyncVar<bool>>(true),
|
||||
IsPrimary::True,
|
||||
makeReference<AsyncVar<bool>>(false),
|
||||
makeReference<AsyncVar<bool>>(false),
|
||||
PromiseStream<GetMetricsRequest>(),
|
||||
Promise<UID>(),
|
||||
PromiseStream<Promise<int>>()));
|
||||
|
||||
for (int id = 1; id <= processCount; id++) {
|
||||
UID uid(id, 0);
|
||||
StorageServerInterface interface;
|
||||
interface.uniqueID = uid;
|
||||
int process_id = id;
|
||||
int dc_id = process_id / 1000;
|
||||
int data_hall_id = process_id / 100;
|
||||
int zone_id = process_id / 10;
|
||||
int machine_id = process_id / 5;
|
||||
|
||||
printf("testMachineTeamCollection: process_id:%d zone_id:%d machine_id:%d ip_addr:%s\n",
|
||||
process_id,
|
||||
zone_id,
|
||||
machine_id,
|
||||
interface.address().toString().c_str());
|
||||
interface.locality.set(LiteralStringRef("processid"), Standalone<StringRef>(std::to_string(process_id)));
|
||||
interface.locality.set(LiteralStringRef("machineid"), Standalone<StringRef>(std::to_string(machine_id)));
|
||||
interface.locality.set(LiteralStringRef("zoneid"), Standalone<StringRef>(std::to_string(zone_id)));
|
||||
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(data_hall_id)));
|
||||
interface.locality.set(LiteralStringRef("dcid"), Standalone<StringRef>(std::to_string(dc_id)));
|
||||
collection->server_info[uid] = makeReference<TCServerInfo>(
|
||||
interface, collection.get(), ProcessClass(), true, collection->storageServerSet);
|
||||
|
||||
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality));
|
||||
}
|
||||
|
||||
int totalServerIndex = collection->constructMachinesFromServers();
|
||||
printf("testMachineTeamCollection: construct machines for %d servers\n", totalServerIndex);
|
||||
|
||||
return collection;
|
||||
}
|
||||
|
||||
TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") {
|
||||
wait(Future<Void>(Void()));
|
||||
|
||||
int teamSize = 3; // replication size
|
||||
int processSize = 60;
|
||||
int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
|
||||
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
|
||||
new PolicyAcross(teamSize, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state std::unique_ptr<DDTeamCollection> collection = testMachineTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
collection->addTeamsBestOf(30, desiredTeams, maxTeams);
|
||||
|
||||
ASSERT(collection->sanityCheckTeams() == true);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") {
|
||||
wait(Future<Void>(Void()));
|
||||
|
||||
int teamSize = 3; // replication size
|
||||
int processSize = 60;
|
||||
int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
|
||||
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
|
||||
new PolicyAcross(teamSize, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state std::unique_ptr<DDTeamCollection> collection = testMachineTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
if (collection == nullptr) {
|
||||
fprintf(stderr, "collection is null\n");
|
||||
return Void();
|
||||
}
|
||||
|
||||
collection->addBestMachineTeams(30); // Create machine teams to help debug
|
||||
collection->addTeamsBestOf(30, desiredTeams, maxTeams);
|
||||
collection->sanityCheckTeams(); // Server team may happen to be on the same machine team, although unlikely
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") {
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 10;
|
||||
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(3, policy, processSize);
|
||||
|
||||
int result = collection->addTeamsBestOf(200, desiredTeams, maxTeams);
|
||||
|
||||
// The maximum number of available server teams without considering machine locality is 120
|
||||
// The maximum number of available server teams with machine locality constraint is 120 - 40, because
|
||||
// the 40 (5*4*2) server teams whose servers come from the same machine are invalid.
|
||||
ASSERT(result == 80);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/AddAllTeams/withLimit") {
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 10;
|
||||
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(3, policy, processSize);
|
||||
|
||||
int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams);
|
||||
|
||||
ASSERT(result >= 10);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") {
|
||||
wait(Future<Void>(Void()));
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 10;
|
||||
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
state int teamSize = 3;
|
||||
// state int targetTeamsPerServer = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (teamSize + 1) / 2;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
|
||||
state int result = collection->addTeamsBestOf(8, desiredTeams, maxTeams);
|
||||
|
||||
ASSERT(result >= 8);
|
||||
|
||||
for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) {
|
||||
auto teamCount = process->second->getTeams().size();
|
||||
ASSERT(teamCount >= 1);
|
||||
// ASSERT(teamCount <= targetTeamsPerServer);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Due to the randomness in choosing the machine team and the server team from the machine team, it is possible that
|
||||
// we may not find the remaining several (e.g., 1 or 2) available teams.
|
||||
// It is hard to conclude what is the minimum number of teams the addTeamsBestOf() should create in this situation.
|
||||
TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
|
||||
wait(Future<Void>(Void()));
|
||||
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 5;
|
||||
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
state int teamSize = 3;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
|
||||
collection->addBestMachineTeams(10);
|
||||
int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams);
|
||||
|
||||
if (collection->machineTeams.size() != 10 || result != 8) {
|
||||
collection->traceAllInfo(true); // Debug message
|
||||
}
|
||||
|
||||
// NOTE: Due to the pure randomness in selecting a machine for a machine team,
|
||||
// we cannot guarantee that all machine teams are created.
|
||||
// When we chnage the selectReplicas function to achieve such guarantee, we can enable the following ASSERT
|
||||
ASSERT(collection->machineTeams.size() == 10); // Should create all machine teams
|
||||
|
||||
// We need to guarantee a server always have at least a team so that the server can participate in data distribution
|
||||
for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) {
|
||||
auto teamCount = process->second->getTeams().size();
|
||||
ASSERT(teamCount >= 1);
|
||||
}
|
||||
|
||||
// If we find all available teams, result will be 8 because we prebuild 2 teams
|
||||
ASSERT(result == 8);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/GetTeam/NewServersNotNeeded") {
|
||||
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 5;
|
||||
state int teamSize = 3;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
GetStorageMetricsReply mid_avail;
|
||||
mid_avail.capacity.bytes = 1000 * 1024 * 1024;
|
||||
mid_avail.available.bytes = 400 * 1024 * 1024;
|
||||
mid_avail.load.bytes = 100 * 1024 * 1024;
|
||||
|
||||
GetStorageMetricsReply high_avail;
|
||||
high_avail.capacity.bytes = 1000 * 1024 * 1024;
|
||||
high_avail.available.bytes = 800 * 1024 * 1024;
|
||||
high_avail.load.bytes = 90 * 1024 * 1024;
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(2, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
collection->disableBuildingTeams();
|
||||
collection->setCheckTeamDelay();
|
||||
|
||||
collection->server_info[UID(1, 0)]->setServerMetrics(mid_avail);
|
||||
collection->server_info[UID(2, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(3, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(4, 0)]->setServerMetrics(high_avail);
|
||||
|
||||
/*
|
||||
* Suppose 1, 2 and 3 are complete sources, i.e., they have all shards in
|
||||
* the key range being considered for movement. If the caller says that they
|
||||
* don't strictly need new servers and all of these servers are healthy,
|
||||
* maintain status quo.
|
||||
*/
|
||||
|
||||
bool wantsNewServers = false;
|
||||
bool wantsTrueBest = true;
|
||||
bool preferLowerUtilization = true;
|
||||
bool teamMustHaveShards = false;
|
||||
std::vector<UID> completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) };
|
||||
|
||||
state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards);
|
||||
req.completeSources = completeSources;
|
||||
|
||||
wait(collection->getTeam(req));
|
||||
|
||||
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> resTeam = req.reply.getFuture().get();
|
||||
|
||||
std::set<UID> expectedServers{ UID(1, 0), UID(2, 0), UID(3, 0) };
|
||||
ASSERT(resTeam.first.present());
|
||||
auto servers = resTeam.first.get()->getServerIDs();
|
||||
const std::set<UID> selectedServers(servers.begin(), servers.end());
|
||||
ASSERT(expectedServers == selectedServers);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/GetTeam/HealthyCompleteSource") {
|
||||
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 5;
|
||||
state int teamSize = 3;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
GetStorageMetricsReply mid_avail;
|
||||
mid_avail.capacity.bytes = 1000 * 1024 * 1024;
|
||||
mid_avail.available.bytes = 400 * 1024 * 1024;
|
||||
mid_avail.load.bytes = 100 * 1024 * 1024;
|
||||
|
||||
GetStorageMetricsReply high_avail;
|
||||
high_avail.capacity.bytes = 1000 * 1024 * 1024;
|
||||
high_avail.available.bytes = 800 * 1024 * 1024;
|
||||
high_avail.load.bytes = 90 * 1024 * 1024;
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(2, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
collection->disableBuildingTeams();
|
||||
collection->setCheckTeamDelay();
|
||||
|
||||
collection->server_info[UID(1, 0)]->setServerMetrics(mid_avail);
|
||||
collection->server_info[UID(2, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(3, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(4, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(1, 0)]->markTeamUnhealthy(0);
|
||||
|
||||
/*
|
||||
* Suppose 1, 2, 3 and 4 are complete sources, i.e., they have all shards in
|
||||
* the key range being considered for movement. If the caller says that they don't
|
||||
* strictly need new servers but '1' is not healthy, see that the other team of
|
||||
* complete sources is selected.
|
||||
*/
|
||||
|
||||
bool wantsNewServers = false;
|
||||
bool wantsTrueBest = true;
|
||||
bool preferLowerUtilization = true;
|
||||
bool teamMustHaveShards = false;
|
||||
std::vector<UID> completeSources{ UID(1, 0), UID(2, 0), UID(3, 0), UID(4, 0) };
|
||||
|
||||
state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards);
|
||||
req.completeSources = completeSources;
|
||||
|
||||
wait(collection->getTeam(req));
|
||||
|
||||
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> resTeam = req.reply.getFuture().get();
|
||||
|
||||
std::set<UID> expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0) };
|
||||
ASSERT(resTeam.first.present());
|
||||
auto servers = resTeam.first.get()->getServerIDs();
|
||||
const std::set<UID> selectedServers(servers.begin(), servers.end());
|
||||
|
||||
ASSERT(expectedServers == selectedServers);
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/GetTeam/TrueBestLeastUtilized") {
|
||||
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 5;
|
||||
state int teamSize = 3;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
GetStorageMetricsReply mid_avail;
|
||||
mid_avail.capacity.bytes = 1000 * 1024 * 1024;
|
||||
mid_avail.available.bytes = 400 * 1024 * 1024;
|
||||
mid_avail.load.bytes = 100 * 1024 * 1024;
|
||||
|
||||
GetStorageMetricsReply high_avail;
|
||||
high_avail.capacity.bytes = 1000 * 1024 * 1024;
|
||||
high_avail.available.bytes = 800 * 1024 * 1024;
|
||||
high_avail.load.bytes = 90 * 1024 * 1024;
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(2, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
collection->disableBuildingTeams();
|
||||
collection->setCheckTeamDelay();
|
||||
|
||||
/*
|
||||
* Among server teams that have healthy space available, pick the team that is
|
||||
* least utilized, if the caller says they preferLowerUtilization.
|
||||
*/
|
||||
|
||||
collection->server_info[UID(1, 0)]->setServerMetrics(mid_avail);
|
||||
collection->server_info[UID(2, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(3, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(4, 0)]->setServerMetrics(high_avail);
|
||||
|
||||
bool wantsNewServers = true;
|
||||
bool wantsTrueBest = true;
|
||||
bool preferLowerUtilization = true;
|
||||
bool teamMustHaveShards = false;
|
||||
std::vector<UID> completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) };
|
||||
|
||||
state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards);
|
||||
req.completeSources = completeSources;
|
||||
|
||||
wait(collection->getTeam(req));
|
||||
|
||||
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> resTeam = req.reply.getFuture().get();
|
||||
|
||||
std::set<UID> expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0) };
|
||||
ASSERT(resTeam.first.present());
|
||||
auto servers = resTeam.first.get()->getServerIDs();
|
||||
const std::set<UID> selectedServers(servers.begin(), servers.end());
|
||||
ASSERT(expectedServers == selectedServers);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/GetTeam/TrueBestMostUtilized") {
|
||||
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 5;
|
||||
state int teamSize = 3;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
GetStorageMetricsReply mid_avail;
|
||||
mid_avail.capacity.bytes = 1000 * 1024 * 1024;
|
||||
mid_avail.available.bytes = 400 * 1024 * 1024;
|
||||
mid_avail.load.bytes = 100 * 1024 * 1024;
|
||||
|
||||
GetStorageMetricsReply high_avail;
|
||||
high_avail.capacity.bytes = 1000 * 1024 * 1024;
|
||||
high_avail.available.bytes = 800 * 1024 * 1024;
|
||||
high_avail.load.bytes = 90 * 1024 * 1024;
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(2, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
collection->disableBuildingTeams();
|
||||
collection->setCheckTeamDelay();
|
||||
|
||||
collection->server_info[UID(1, 0)]->setServerMetrics(mid_avail);
|
||||
collection->server_info[UID(2, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(3, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(4, 0)]->setServerMetrics(high_avail);
|
||||
|
||||
/*
|
||||
* Among server teams that have healthy space available, pick the team that is
|
||||
* most utilized, if the caller says they don't preferLowerUtilization.
|
||||
*/
|
||||
|
||||
bool wantsNewServers = true;
|
||||
bool wantsTrueBest = true;
|
||||
bool preferLowerUtilization = false;
|
||||
bool teamMustHaveShards = false;
|
||||
std::vector<UID> completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) };
|
||||
|
||||
state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards);
|
||||
req.completeSources = completeSources;
|
||||
|
||||
wait(collection->getTeam(req));
|
||||
|
||||
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> resTeam = req.reply.getFuture().get();
|
||||
|
||||
std::set<UID> expectedServers{ UID(1, 0), UID(2, 0), UID(3, 0) };
|
||||
ASSERT(resTeam.first.present());
|
||||
auto servers = resTeam.first.get()->getServerIDs();
|
||||
const std::set<UID> selectedServers(servers.begin(), servers.end());
|
||||
ASSERT(expectedServers == selectedServers);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationBelowCutoff") {
|
||||
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 5;
|
||||
state int teamSize = 3;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
GetStorageMetricsReply low_avail;
|
||||
low_avail.capacity.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE * 20;
|
||||
low_avail.available.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE / 2;
|
||||
low_avail.load.bytes = 90 * 1024 * 1024;
|
||||
|
||||
GetStorageMetricsReply high_avail;
|
||||
high_avail.capacity.bytes = 2000 * 1024 * 1024;
|
||||
high_avail.available.bytes = 800 * 1024 * 1024;
|
||||
high_avail.load.bytes = 90 * 1024 * 1024;
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(2, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
collection->disableBuildingTeams();
|
||||
collection->setCheckTeamDelay();
|
||||
|
||||
collection->server_info[UID(1, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(2, 0)]->setServerMetrics(low_avail);
|
||||
collection->server_info[UID(3, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(4, 0)]->setServerMetrics(low_avail);
|
||||
collection->server_info[UID(1, 0)]->markTeamUnhealthy(0);
|
||||
|
||||
/*
|
||||
* If the only available team is one where at least one server is low on
|
||||
* space, decline to pick that team. Every server must have some minimum
|
||||
* free space defined by the MIN_AVAILABLE_SPACE server knob.
|
||||
*/
|
||||
|
||||
bool wantsNewServers = true;
|
||||
bool wantsTrueBest = true;
|
||||
bool preferLowerUtilization = true;
|
||||
bool teamMustHaveShards = false;
|
||||
std::vector<UID> completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) };
|
||||
|
||||
state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards);
|
||||
req.completeSources = completeSources;
|
||||
|
||||
wait(collection->getTeam(req));
|
||||
|
||||
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> resTeam = req.reply.getFuture().get();
|
||||
|
||||
ASSERT(!resTeam.first.present());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/GetTeam/ServerUtilizationNearCutoff") {
|
||||
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 5;
|
||||
state int teamSize = 3;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
GetStorageMetricsReply low_avail;
|
||||
if (SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO > 0) {
|
||||
/* Pick a capacity where MIN_AVAILABLE_SPACE_RATIO of the capacity would be higher than MIN_AVAILABLE_SPACE */
|
||||
low_avail.capacity.bytes = SERVER_KNOBS->MIN_AVAILABLE_SPACE * (2 / SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO);
|
||||
} else {
|
||||
low_avail.capacity.bytes = 2000 * 1024 * 1024;
|
||||
}
|
||||
low_avail.available.bytes = (SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO * 1.1) * low_avail.capacity.bytes;
|
||||
low_avail.load.bytes = 90 * 1024 * 1024;
|
||||
|
||||
GetStorageMetricsReply high_avail;
|
||||
high_avail.capacity.bytes = 2000 * 1024 * 1024;
|
||||
high_avail.available.bytes = 800 * 1024 * 1024;
|
||||
high_avail.load.bytes = 90 * 1024 * 1024;
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(2, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(3, 0), UID(4, 0), UID(5, 0) }), true);
|
||||
collection->disableBuildingTeams();
|
||||
collection->setCheckTeamDelay();
|
||||
|
||||
collection->server_info[UID(1, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(2, 0)]->setServerMetrics(low_avail);
|
||||
collection->server_info[UID(3, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(4, 0)]->setServerMetrics(low_avail);
|
||||
collection->server_info[UID(5, 0)]->setServerMetrics(high_avail);
|
||||
collection->server_info[UID(1, 0)]->markTeamUnhealthy(0);
|
||||
|
||||
/*
|
||||
* If the only available team is one where all servers are low on space,
|
||||
* test that each server has at least MIN_AVAILABLE_SPACE_RATIO (server knob)
|
||||
* percentage points of capacity free before picking that team.
|
||||
*/
|
||||
|
||||
bool wantsNewServers = true;
|
||||
bool wantsTrueBest = true;
|
||||
bool preferLowerUtilization = true;
|
||||
bool teamMustHaveShards = false;
|
||||
std::vector<UID> completeSources{ UID(1, 0), UID(2, 0), UID(3, 0) };
|
||||
|
||||
state GetTeamRequest req(wantsNewServers, wantsTrueBest, preferLowerUtilization, teamMustHaveShards);
|
||||
req.completeSources = completeSources;
|
||||
|
||||
wait(collection->getTeam(req));
|
||||
|
||||
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> resTeam = req.reply.getFuture().get();
|
||||
|
||||
std::set<UID> expectedServers{ UID(2, 0), UID(3, 0), UID(4, 0) };
|
||||
ASSERT(resTeam.first.present());
|
||||
auto servers = resTeam.first.get()->getServerIDs();
|
||||
const std::set<UID> selectedServers(servers.begin(), servers.end());
|
||||
ASSERT(expectedServers == selectedServers);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -595,6 +595,12 @@ public:
|
|||
|
||||
void addTeam(std::set<UID> const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); }
|
||||
|
||||
// FIXME: Public for testing only
|
||||
void disableBuildingTeams() { doBuildTeams = false; }
|
||||
|
||||
// FIXME: Public for testing only
|
||||
void setCheckTeamDelay() { this->checkTeamDelay = Void(); }
|
||||
|
||||
// FIXME: Public for testing only
|
||||
// Group storage servers (process) based on their machineId in LocalityData
|
||||
// All created machines are healthy
|
||||
|
|
|
@ -1259,252 +1259,3 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
|
|||
|
||||
return Void();
|
||||
}
|
||||
|
||||
std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
|
||||
Reference<IReplicationPolicy> policy,
|
||||
int processCount) {
|
||||
Database database = DatabaseContext::create(
|
||||
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::False);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.storageTeamSize = teamSize;
|
||||
conf.storagePolicy = policy;
|
||||
|
||||
auto collection =
|
||||
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(database,
|
||||
UID(0, 0),
|
||||
MoveKeysLock(),
|
||||
PromiseStream<RelocateShard>(),
|
||||
makeReference<ShardsAffectedByTeamFailure>(),
|
||||
conf,
|
||||
{},
|
||||
{},
|
||||
Future<Void>(Void()),
|
||||
makeReference<AsyncVar<bool>>(true),
|
||||
IsPrimary::True,
|
||||
makeReference<AsyncVar<bool>>(false),
|
||||
makeReference<AsyncVar<bool>>(false),
|
||||
PromiseStream<GetMetricsRequest>(),
|
||||
Promise<UID>(),
|
||||
PromiseStream<Promise<int>>()));
|
||||
|
||||
for (int id = 1; id <= processCount; ++id) {
|
||||
UID uid(id, 0);
|
||||
StorageServerInterface interface;
|
||||
interface.uniqueID = uid;
|
||||
interface.locality.set(LiteralStringRef("machineid"), Standalone<StringRef>(std::to_string(id)));
|
||||
interface.locality.set(LiteralStringRef("zoneid"), Standalone<StringRef>(std::to_string(id % 5)));
|
||||
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(id % 3)));
|
||||
collection->server_info[uid] = makeReference<TCServerInfo>(
|
||||
interface, collection.get(), ProcessClass(), true, collection->storageServerSet);
|
||||
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality));
|
||||
collection->checkAndCreateMachine(collection->server_info[uid]);
|
||||
}
|
||||
|
||||
return collection;
|
||||
}
|
||||
|
||||
std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
|
||||
Reference<IReplicationPolicy> policy,
|
||||
int processCount) {
|
||||
Database database = DatabaseContext::create(
|
||||
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::False);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.storageTeamSize = teamSize;
|
||||
conf.storagePolicy = policy;
|
||||
|
||||
auto collection =
|
||||
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(database,
|
||||
UID(0, 0),
|
||||
MoveKeysLock(),
|
||||
PromiseStream<RelocateShard>(),
|
||||
makeReference<ShardsAffectedByTeamFailure>(),
|
||||
conf,
|
||||
{},
|
||||
{},
|
||||
Future<Void>(Void()),
|
||||
makeReference<AsyncVar<bool>>(true),
|
||||
IsPrimary::True,
|
||||
makeReference<AsyncVar<bool>>(false),
|
||||
makeReference<AsyncVar<bool>>(false),
|
||||
PromiseStream<GetMetricsRequest>(),
|
||||
Promise<UID>(),
|
||||
PromiseStream<Promise<int>>()));
|
||||
|
||||
for (int id = 1; id <= processCount; id++) {
|
||||
UID uid(id, 0);
|
||||
StorageServerInterface interface;
|
||||
interface.uniqueID = uid;
|
||||
int process_id = id;
|
||||
int dc_id = process_id / 1000;
|
||||
int data_hall_id = process_id / 100;
|
||||
int zone_id = process_id / 10;
|
||||
int machine_id = process_id / 5;
|
||||
|
||||
printf("testMachineTeamCollection: process_id:%d zone_id:%d machine_id:%d ip_addr:%s\n",
|
||||
process_id,
|
||||
zone_id,
|
||||
machine_id,
|
||||
interface.address().toString().c_str());
|
||||
interface.locality.set(LiteralStringRef("processid"), Standalone<StringRef>(std::to_string(process_id)));
|
||||
interface.locality.set(LiteralStringRef("machineid"), Standalone<StringRef>(std::to_string(machine_id)));
|
||||
interface.locality.set(LiteralStringRef("zoneid"), Standalone<StringRef>(std::to_string(zone_id)));
|
||||
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(data_hall_id)));
|
||||
interface.locality.set(LiteralStringRef("dcid"), Standalone<StringRef>(std::to_string(dc_id)));
|
||||
collection->server_info[uid] = makeReference<TCServerInfo>(
|
||||
interface, collection.get(), ProcessClass(), true, collection->storageServerSet);
|
||||
|
||||
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality));
|
||||
}
|
||||
|
||||
int totalServerIndex = collection->constructMachinesFromServers();
|
||||
printf("testMachineTeamCollection: construct machines for %d servers\n", totalServerIndex);
|
||||
|
||||
return collection;
|
||||
}
|
||||
|
||||
TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") {
|
||||
wait(Future<Void>(Void()));
|
||||
|
||||
int teamSize = 3; // replication size
|
||||
int processSize = 60;
|
||||
int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
|
||||
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
|
||||
new PolicyAcross(teamSize, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state std::unique_ptr<DDTeamCollection> collection = testMachineTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
collection->addTeamsBestOf(30, desiredTeams, maxTeams);
|
||||
|
||||
ASSERT(collection->sanityCheckTeams() == true);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") {
|
||||
wait(Future<Void>(Void()));
|
||||
|
||||
int teamSize = 3; // replication size
|
||||
int processSize = 60;
|
||||
int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
|
||||
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(
|
||||
new PolicyAcross(teamSize, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state std::unique_ptr<DDTeamCollection> collection = testMachineTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
if (collection == nullptr) {
|
||||
fprintf(stderr, "collection is null\n");
|
||||
return Void();
|
||||
}
|
||||
|
||||
collection->addBestMachineTeams(30); // Create machine teams to help debug
|
||||
collection->addTeamsBestOf(30, desiredTeams, maxTeams);
|
||||
collection->sanityCheckTeams(); // Server team may happen to be on the same machine team, although unlikely
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") {
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 10;
|
||||
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(3, policy, processSize);
|
||||
|
||||
int result = collection->addTeamsBestOf(200, desiredTeams, maxTeams);
|
||||
|
||||
// The maximum number of available server teams without considering machine locality is 120
|
||||
// The maximum number of available server teams with machine locality constraint is 120 - 40, because
|
||||
// the 40 (5*4*2) server teams whose servers come from the same machine are invalid.
|
||||
ASSERT(result == 80);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/AddAllTeams/withLimit") {
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 10;
|
||||
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(3, policy, processSize);
|
||||
|
||||
int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams);
|
||||
|
||||
ASSERT(result >= 10);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") {
|
||||
wait(Future<Void>(Void()));
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 10;
|
||||
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
state int teamSize = 3;
|
||||
// state int targetTeamsPerServer = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (teamSize + 1) / 2;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
|
||||
state int result = collection->addTeamsBestOf(8, desiredTeams, maxTeams);
|
||||
|
||||
ASSERT(result >= 8);
|
||||
|
||||
for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) {
|
||||
auto teamCount = process->second->getTeams().size();
|
||||
ASSERT(teamCount >= 1);
|
||||
// ASSERT(teamCount <= targetTeamsPerServer);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Due to the randomness in choosing the machine team and the server team from the machine team, it is possible that
|
||||
// we may not find the remaining several (e.g., 1 or 2) available teams.
|
||||
// It is hard to conclude what is the minimum number of teams the addTeamsBestOf() should create in this situation.
|
||||
TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
|
||||
wait(Future<Void>(Void()));
|
||||
|
||||
Reference<IReplicationPolicy> policy =
|
||||
Reference<IReplicationPolicy>(new PolicyAcross(3, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
|
||||
state int processSize = 5;
|
||||
state int desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * processSize;
|
||||
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
|
||||
state int teamSize = 3;
|
||||
state std::unique_ptr<DDTeamCollection> collection = testTeamCollection(teamSize, policy, processSize);
|
||||
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
|
||||
collection->addTeam(std::set<UID>({ UID(1, 0), UID(3, 0), UID(4, 0) }), true);
|
||||
|
||||
collection->addBestMachineTeams(10);
|
||||
int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams);
|
||||
|
||||
if (collection->machineTeams.size() != 10 || result != 8) {
|
||||
collection->traceAllInfo(true); // Debug message
|
||||
}
|
||||
|
||||
// NOTE: Due to the pure randomness in selecting a machine for a machine team,
|
||||
// we cannot guarantee that all machine teams are created.
|
||||
// When we chnage the selectReplicas function to achieve such guarantee, we can enable the following ASSERT
|
||||
ASSERT(collection->machineTeams.size() == 10); // Should create all machine teams
|
||||
|
||||
// We need to guarantee a server always have at least a team so that the server can participate in data distribution
|
||||
for (auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) {
|
||||
auto teamCount = process->second->getTeams().size();
|
||||
ASSERT(teamCount >= 1);
|
||||
}
|
||||
|
||||
// If we find all available teams, result will be 8 because we prebuild 2 teams
|
||||
ASSERT(result == 8);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -191,6 +191,10 @@ void TCServerInfo::removeTeam(Reference<TCTeamInfo> team) {
|
|||
}
|
||||
}
|
||||
|
||||
void TCServerInfo::markTeamUnhealthy(int teamIndex) {
|
||||
teams[teamIndex]->setHealthy(false);
|
||||
}
|
||||
|
||||
TCServerInfo::~TCServerInfo() {
|
||||
if (collection && ssVersionTooFarBehind.get() && !lastKnownInterface.isTss()) {
|
||||
collection->removeLaggingStorageServer(lastKnownInterface.locality.zoneId().get());
|
||||
|
|
|
@ -97,6 +97,12 @@ public:
|
|||
static Future<Void> updateServerMetrics(Reference<TCServerInfo> server);
|
||||
Future<Void> serverMetricsPolling();
|
||||
|
||||
// FIXME: Public for testing only:
|
||||
void setServerMetrics(GetStorageMetricsReply serverMetrics) { this->serverMetrics = serverMetrics; }
|
||||
|
||||
// FIXME: Public for testing only:
|
||||
void markTeamUnhealthy(int teamIndex);
|
||||
|
||||
~TCServerInfo();
|
||||
};
|
||||
|
||||
|
|
|
@ -1,464 +0,0 @@
|
|||
/*
|
||||
* ClientLibManagementWorkload.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/IAsyncFile.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbclient/ClientLibManagement.actor.h"
|
||||
#include "fdbserver/workloads/AsyncFile.actor.h"
|
||||
#include "fdbclient/md5/md5.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
using namespace ClientLibManagement;
|
||||
|
||||
/**
|
||||
* Workload for testing ClientLib management operations, declared in
|
||||
* MultiVersionClientControl.actor.h
|
||||
*/
|
||||
struct ClientLibManagementWorkload : public TestWorkload {
|
||||
static constexpr size_t FILE_CHUNK_SIZE = 128 * 1024; // Used for test setup only
|
||||
|
||||
size_t testFileSize = 0;
|
||||
RandomByteGenerator rbg;
|
||||
Standalone<StringRef> uploadedClientLibId;
|
||||
json_spirit::mObject uploadedMetadataJson;
|
||||
Standalone<StringRef> generatedChecksum;
|
||||
std::string generatedFileName;
|
||||
bool success = true;
|
||||
|
||||
/*----------------------------------------------------------------
|
||||
* Interface
|
||||
*/
|
||||
|
||||
ClientLibManagementWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
int minTestFileSize = getOption(options, LiteralStringRef("minTestFileSize"), 0);
|
||||
int maxTestFileSize = getOption(options, LiteralStringRef("maxTestFileSize"), 1024 * 1024);
|
||||
testFileSize = deterministicRandom()->randomInt(minTestFileSize, maxTestFileSize + 1);
|
||||
}
|
||||
|
||||
std::string description() const override { return "ClientLibManagement"; }
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return _setup(this); }
|
||||
|
||||
Future<Void> start(Database const& cx) override { return _start(this, cx); }
|
||||
|
||||
Future<bool> check(Database const& cx) override { return success; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
|
||||
/*----------------------------------------------------------------
|
||||
* Setup
|
||||
*/
|
||||
|
||||
ACTOR Future<Void> _setup(ClientLibManagementWorkload* self) {
|
||||
state Reference<AsyncFileBuffer> data = self->allocateBuffer(FILE_CHUNK_SIZE);
|
||||
state size_t fileOffset;
|
||||
state MD5_CTX sum;
|
||||
state size_t bytesToWrite;
|
||||
|
||||
self->generatedFileName = format("clientLibUpload%d", self->clientId);
|
||||
int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE |
|
||||
IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO;
|
||||
state Reference<IAsyncFile> file =
|
||||
wait(IAsyncFileSystem::filesystem()->open(self->generatedFileName, flags, 0666));
|
||||
|
||||
::MD5_Init(&sum);
|
||||
|
||||
for (fileOffset = 0; fileOffset < self->testFileSize; fileOffset += FILE_CHUNK_SIZE) {
|
||||
self->rbg.writeRandomBytesToBuffer(data->buffer, FILE_CHUNK_SIZE);
|
||||
bytesToWrite = std::min(FILE_CHUNK_SIZE, self->testFileSize - fileOffset);
|
||||
wait(file->write(data->buffer, bytesToWrite, fileOffset));
|
||||
|
||||
::MD5_Update(&sum, data->buffer, bytesToWrite);
|
||||
}
|
||||
wait(file->sync());
|
||||
|
||||
self->generatedChecksum = md5SumToHexString(sum);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------
|
||||
* Tests
|
||||
*/
|
||||
|
||||
ACTOR static Future<Void> _start(ClientLibManagementWorkload* self, Database cx) {
|
||||
wait(testUploadClientLibInvalidInput(self, cx));
|
||||
wait(testClientLibUploadFileDoesNotExist(self, cx));
|
||||
wait(testUploadClientLib(self, cx));
|
||||
wait(testClientLibListAfterUpload(self, cx));
|
||||
wait(testDownloadClientLib(self, cx));
|
||||
wait(testClientLibDownloadNotExisting(self, cx));
|
||||
wait(testChangeClientLibStatusErrors(self, cx));
|
||||
wait(testDisableClientLib(self, cx));
|
||||
wait(testChangeStateToDownload(self, cx));
|
||||
wait(testDeleteClientLib(self, cx));
|
||||
wait(testUploadedClientLibInList(self, cx, ClientLibFilter(), false, "No filter, after delete"));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testUploadClientLibInvalidInput(ClientLibManagementWorkload* self, Database cx) {
|
||||
state std::vector<std::string> invalidMetadataStrs = {
|
||||
"{foo", // invalid json
|
||||
"[]", // json array
|
||||
};
|
||||
state StringRef metadataStr;
|
||||
|
||||
// add garbage attribute
|
||||
json_spirit::mObject metadataJson;
|
||||
validClientLibMetadataSample(metadataJson);
|
||||
metadataJson["unknownattr"] = "someval";
|
||||
invalidMetadataStrs.push_back(json_spirit::write_string(json_spirit::mValue(metadataJson)));
|
||||
|
||||
const std::string mandatoryAttrs[] = { CLIENTLIB_ATTR_PLATFORM, CLIENTLIB_ATTR_VERSION,
|
||||
CLIENTLIB_ATTR_CHECKSUM, CLIENTLIB_ATTR_TYPE,
|
||||
CLIENTLIB_ATTR_GIT_HASH, CLIENTLIB_ATTR_PROTOCOL,
|
||||
CLIENTLIB_ATTR_API_VERSION, CLIENTLIB_ATTR_CHECKSUM_ALG };
|
||||
|
||||
for (const std::string& attr : mandatoryAttrs) {
|
||||
validClientLibMetadataSample(metadataJson);
|
||||
metadataJson.erase(attr);
|
||||
invalidMetadataStrs.push_back(json_spirit::write_string(json_spirit::mValue(metadataJson)));
|
||||
}
|
||||
|
||||
for (auto& testMetadataStr : invalidMetadataStrs) {
|
||||
metadataStr = StringRef(testMetadataStr);
|
||||
wait(testExpectedError(uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName)),
|
||||
"uploadClientLibrary with invalid metadata",
|
||||
client_lib_invalid_metadata(),
|
||||
&self->success,
|
||||
{ { "Metadata", metadataStr.toString().c_str() } }));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testClientLibUploadFileDoesNotExist(ClientLibManagementWorkload* self, Database cx) {
|
||||
state Standalone<StringRef> metadataStr;
|
||||
json_spirit::mObject metadataJson;
|
||||
validClientLibMetadataSample(metadataJson);
|
||||
metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(metadataJson)));
|
||||
wait(testExpectedError(uploadClientLibrary(cx, metadataStr, "some_not_existing_file_name"_sr),
|
||||
"uploadClientLibrary with a not existing file",
|
||||
file_not_found(),
|
||||
&self->success));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testUploadClientLibWrongChecksum(ClientLibManagementWorkload* self, Database cx) {
|
||||
state Standalone<StringRef> metadataStr;
|
||||
validClientLibMetadataSample(self->uploadedMetadataJson);
|
||||
metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(self->uploadedMetadataJson)));
|
||||
self->uploadedClientLibId = getClientLibIdFromMetadataJson(metadataStr);
|
||||
wait(testExpectedError(uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName)),
|
||||
"uploadClientLibrary wrong checksum",
|
||||
client_lib_invalid_binary(),
|
||||
&self->success));
|
||||
wait(testUploadedClientLibInList(self, cx, ClientLibFilter(), false, "After upload with wrong checksum"));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testUploadClientLib(ClientLibManagementWorkload* self, Database cx) {
|
||||
state Standalone<StringRef> metadataStr;
|
||||
state std::vector<Future<ErrorOr<Void>>> concurrentUploads;
|
||||
state Future<Void> clientLibChanged = cx->onClientLibStatusChanged();
|
||||
|
||||
validClientLibMetadataSample(self->uploadedMetadataJson);
|
||||
self->uploadedMetadataJson[CLIENTLIB_ATTR_CHECKSUM] = self->generatedChecksum.toString();
|
||||
// avoid clientLibId clashes, when multiple clients try to upload the same file
|
||||
self->uploadedMetadataJson[CLIENTLIB_ATTR_TYPE] = format("devbuild%d", self->clientId);
|
||||
self->uploadedMetadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(ClientLibStatus::ACTIVE);
|
||||
metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(self->uploadedMetadataJson)));
|
||||
self->uploadedClientLibId = getClientLibIdFromMetadataJson(metadataStr);
|
||||
|
||||
// Test two concurrent uploads of the same library, one of the must fail and another succeed
|
||||
for (int i1 = 0; i1 < 2; i1++) {
|
||||
Future<Void> uploadActor = uploadClientLibrary(cx, metadataStr, StringRef(self->generatedFileName));
|
||||
concurrentUploads.push_back(errorOr(uploadActor));
|
||||
}
|
||||
|
||||
wait(waitForAll(concurrentUploads));
|
||||
|
||||
int successCnt = 0;
|
||||
for (auto uploadRes : concurrentUploads) {
|
||||
if (uploadRes.get().isError()) {
|
||||
self->testErrorCode(
|
||||
"concurrent client lib upload", client_lib_already_exists(), uploadRes.get().getError());
|
||||
} else {
|
||||
successCnt++;
|
||||
}
|
||||
}
|
||||
|
||||
if (successCnt == 0) {
|
||||
TraceEvent(SevError, "ClientLibUploadFailed").log();
|
||||
self->success = false;
|
||||
throw operation_failed();
|
||||
} else if (successCnt > 1) {
|
||||
TraceEvent(SevError, "ClientLibConflictingUpload").log();
|
||||
self->success = false;
|
||||
}
|
||||
|
||||
// Clients should be notified about upload of a library with the active status
|
||||
Optional<Void> notificationWait = wait(timeout(clientLibChanged, 100.0));
|
||||
if (!notificationWait.present()) {
|
||||
TraceEvent(SevError, "ClientLibChangeNotificationFailed").log();
|
||||
self->success = false;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testClientLibDownloadNotExisting(ClientLibManagementWorkload* self, Database cx) {
|
||||
// Generate a random valid clientLibId
|
||||
state Standalone<StringRef> clientLibId;
|
||||
state std::string destFileName;
|
||||
json_spirit::mObject metadataJson;
|
||||
validClientLibMetadataSample(metadataJson);
|
||||
Standalone<StringRef> metadataStr = StringRef(json_spirit::write_string(json_spirit::mValue(metadataJson)));
|
||||
clientLibId = getClientLibIdFromMetadataJson(metadataStr);
|
||||
|
||||
destFileName = format("clientLibDownload%d", self->clientId);
|
||||
wait(testExpectedError(downloadClientLibrary(cx, StringRef(clientLibId), StringRef(destFileName)),
|
||||
"download not existing client library",
|
||||
client_lib_not_found(),
|
||||
&self->success));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testDownloadClientLib(ClientLibManagementWorkload* self, Database cx) {
|
||||
state std::string destFileName = format("clientLibDownload%d", self->clientId);
|
||||
wait(downloadClientLibrary(cx, self->uploadedClientLibId, StringRef(destFileName)));
|
||||
|
||||
FILE* f = fopen(destFileName.c_str(), "r");
|
||||
if (f == nullptr) {
|
||||
TraceEvent(SevError, "ClientLibDownloadFileDoesNotExist").detail("FileName", destFileName);
|
||||
self->success = false;
|
||||
} else {
|
||||
fseek(f, 0L, SEEK_END);
|
||||
size_t fileSize = ftell(f);
|
||||
if (fileSize != self->testFileSize) {
|
||||
TraceEvent(SevError, "ClientLibDownloadFileSizeMismatch")
|
||||
.detail("ExpectedSize", self->testFileSize)
|
||||
.detail("ActualSize", fileSize);
|
||||
self->success = false;
|
||||
}
|
||||
fclose(f);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testDeleteClientLib(ClientLibManagementWorkload* self, Database cx) {
|
||||
state Future<Void> clientLibChanged = cx->onClientLibStatusChanged();
|
||||
|
||||
wait(deleteClientLibrary(cx, self->uploadedClientLibId));
|
||||
|
||||
// Clients should be notified about deletion of the library, because it has "download" status
|
||||
Optional<Void> notificationWait = wait(timeout(clientLibChanged, 100.0));
|
||||
if (!notificationWait.present()) {
|
||||
TraceEvent(SevError, "ClientLibChangeNotificationFailed").log();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testClientLibListAfterUpload(ClientLibManagementWorkload* self, Database cx) {
|
||||
state int uploadedApiVersion = self->uploadedMetadataJson[CLIENTLIB_ATTR_API_VERSION].get_int();
|
||||
state ClientLibPlatform uploadedPlatform =
|
||||
getPlatformByName(self->uploadedMetadataJson[CLIENTLIB_ATTR_PLATFORM].get_str());
|
||||
state std::string uploadedVersion = self->uploadedMetadataJson[CLIENTLIB_ATTR_VERSION].get_str();
|
||||
state ClientLibFilter filter;
|
||||
|
||||
filter = ClientLibFilter();
|
||||
wait(testUploadedClientLibInList(self, cx, filter, true, "No filter"));
|
||||
filter = ClientLibFilter().filterAvailable();
|
||||
wait(testUploadedClientLibInList(self, cx, filter, true, "Filter available"));
|
||||
filter = ClientLibFilter().filterAvailable().filterCompatibleAPI(uploadedApiVersion);
|
||||
wait(testUploadedClientLibInList(self, cx, filter, true, "Filter available, the same API"));
|
||||
filter = ClientLibFilter().filterAvailable().filterCompatibleAPI(uploadedApiVersion + 1);
|
||||
wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, newer API"));
|
||||
filter = ClientLibFilter().filterCompatibleAPI(uploadedApiVersion).filterPlatform(uploadedPlatform);
|
||||
wait(testUploadedClientLibInList(self, cx, filter, true, "Filter the same API, the same platform"));
|
||||
ASSERT(uploadedPlatform != ClientLibPlatform::X86_64_WINDOWS);
|
||||
filter = ClientLibFilter().filterAvailable().filterPlatform(ClientLibPlatform::X86_64_WINDOWS);
|
||||
wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, different platform"));
|
||||
filter = ClientLibFilter().filterAvailable().filterNewerPackageVersion(uploadedVersion);
|
||||
wait(testUploadedClientLibInList(self, cx, filter, false, "Filter available, the same version"));
|
||||
filter =
|
||||
ClientLibFilter().filterAvailable().filterNewerPackageVersion("1.15.10").filterPlatform(uploadedPlatform);
|
||||
wait(testUploadedClientLibInList(
|
||||
self, cx, filter, true, "Filter available, an older version, the same platform"));
|
||||
filter = ClientLibFilter()
|
||||
.filterAvailable()
|
||||
.filterNewerPackageVersion(uploadedVersion)
|
||||
.filterPlatform(uploadedPlatform);
|
||||
wait(testUploadedClientLibInList(
|
||||
self, cx, filter, false, "Filter available, the same version, the same platform"));
|
||||
filter = ClientLibFilter().filterNewerPackageVersion("100.1.1");
|
||||
wait(testUploadedClientLibInList(self, cx, filter, false, "Filter a newer version"));
|
||||
filter = ClientLibFilter().filterNewerPackageVersion("1.15.10");
|
||||
wait(testUploadedClientLibInList(self, cx, filter, true, "Filter an older version"));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testUploadedClientLibInList(ClientLibManagementWorkload* self,
|
||||
Database cx,
|
||||
ClientLibFilter filter,
|
||||
bool expectInList,
|
||||
const char* testDescr) {
|
||||
Standalone<VectorRef<StringRef>> allLibs = wait(listClientLibraries(cx, filter));
|
||||
bool found = false;
|
||||
for (StringRef metadataJson : allLibs) {
|
||||
Standalone<StringRef> clientLibId;
|
||||
clientLibId = getClientLibIdFromMetadataJson(metadataJson);
|
||||
if (clientLibId == self->uploadedClientLibId) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
if (found != expectInList) {
|
||||
TraceEvent(SevError, "ClientLibInListTestFailed")
|
||||
.detail("Test", testDescr)
|
||||
.detail("ClientLibId", self->uploadedClientLibId)
|
||||
.detail("Expected", expectInList)
|
||||
.detail("Actual", found);
|
||||
self->success = false;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testChangeClientLibStatusErrors(ClientLibManagementWorkload* self, Database cx) {
|
||||
wait(testExpectedError(changeClientLibraryStatus(cx, self->uploadedClientLibId, ClientLibStatus::UPLOADING),
|
||||
"Setting invalid client library status",
|
||||
client_lib_invalid_metadata(),
|
||||
&self->success));
|
||||
|
||||
wait(testExpectedError(changeClientLibraryStatus(cx, "notExistingClientLib"_sr, ClientLibStatus::DOWNLOAD),
|
||||
"Changing not existing client library status",
|
||||
client_lib_not_found(),
|
||||
&self->success));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testDisableClientLib(ClientLibManagementWorkload* self, Database cx) {
|
||||
state std::string destFileName = format("clientLibDownload%d", self->clientId);
|
||||
state Future<Void> clientLibChanged = cx->onClientLibStatusChanged();
|
||||
|
||||
// Set disabled status on the uploaded library
|
||||
wait(changeClientLibraryStatus(cx, self->uploadedClientLibId, ClientLibStatus::DISABLED));
|
||||
state ClientLibStatus newStatus = wait(getClientLibraryStatus(cx, self->uploadedClientLibId));
|
||||
if (newStatus != ClientLibStatus::DISABLED) {
|
||||
TraceEvent(SevError, "ClientLibDisableClientLibFailed")
|
||||
.detail("Reason", "Unexpected status")
|
||||
.detail("Expected", ClientLibStatus::DISABLED)
|
||||
.detail("Actual", newStatus);
|
||||
self->success = false;
|
||||
}
|
||||
|
||||
// Clients should be notified about an active library being disabled
|
||||
Optional<Void> notificationWait = wait(timeout(clientLibChanged, 100.0));
|
||||
if (!notificationWait.present()) {
|
||||
TraceEvent(SevError, "ClientLibChangeNotificationFailed").log();
|
||||
self->success = false;
|
||||
}
|
||||
|
||||
// It should not be possible to download a disabled client library
|
||||
wait(testExpectedError(downloadClientLibrary(cx, self->uploadedClientLibId, StringRef(destFileName)),
|
||||
"Downloading disabled client library",
|
||||
client_lib_not_available(),
|
||||
&self->success));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> testChangeStateToDownload(ClientLibManagementWorkload* self, Database cx) {
|
||||
state std::string destFileName = format("clientLibDownload%d", self->clientId);
|
||||
state Future<Void> clientLibChanged = cx->onClientLibStatusChanged();
|
||||
|
||||
// Set disabled status on the uploaded library
|
||||
wait(changeClientLibraryStatus(cx, self->uploadedClientLibId, ClientLibStatus::DOWNLOAD));
|
||||
state ClientLibStatus newStatus = wait(getClientLibraryStatus(cx, self->uploadedClientLibId));
|
||||
if (newStatus != ClientLibStatus::DOWNLOAD) {
|
||||
TraceEvent(SevError, "ClientLibChangeStatusFailed")
|
||||
.detail("Reason", "Unexpected status")
|
||||
.detail("Expected", ClientLibStatus::DOWNLOAD)
|
||||
.detail("Actual", newStatus);
|
||||
self->success = false;
|
||||
}
|
||||
|
||||
Optional<Void> notificationWait = wait(timeout(clientLibChanged, 100.0));
|
||||
if (!notificationWait.present()) {
|
||||
TraceEvent(SevError, "ClientLibChangeNotificationFailed").log();
|
||||
self->success = false;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* Utility methods
|
||||
*/
|
||||
|
||||
Reference<AsyncFileBuffer> allocateBuffer(size_t size) { return makeReference<AsyncFileBuffer>(size, false); }
|
||||
|
||||
static std::string randomHexadecimalStr(int length) {
|
||||
std::string s;
|
||||
s.reserve(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
uint32_t hexDigit = static_cast<char>(deterministicRandom()->randomUInt32() % 16);
|
||||
char ch = (hexDigit >= 10 ? hexDigit - 10 + 'a' : hexDigit + '0');
|
||||
s += ch;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
static void validClientLibMetadataSample(json_spirit::mObject& metadataJson) {
|
||||
metadataJson.clear();
|
||||
metadataJson[CLIENTLIB_ATTR_PLATFORM] = getPlatformName(ClientLibPlatform::X86_64_LINUX);
|
||||
metadataJson[CLIENTLIB_ATTR_VERSION] = "7.1.0";
|
||||
metadataJson[CLIENTLIB_ATTR_GIT_HASH] = randomHexadecimalStr(40);
|
||||
metadataJson[CLIENTLIB_ATTR_TYPE] = "debug";
|
||||
metadataJson[CLIENTLIB_ATTR_CHECKSUM] = randomHexadecimalStr(32);
|
||||
metadataJson[CLIENTLIB_ATTR_STATUS] = getStatusName(ClientLibStatus::DOWNLOAD);
|
||||
metadataJson[CLIENTLIB_ATTR_API_VERSION] = 710;
|
||||
metadataJson[CLIENTLIB_ATTR_PROTOCOL] = "fdb00b07001001";
|
||||
metadataJson[CLIENTLIB_ATTR_CHECKSUM_ALG] = "md5";
|
||||
}
|
||||
|
||||
void testErrorCode(const char* testDescr,
|
||||
Error expectedError,
|
||||
Error actualError,
|
||||
std::map<std::string, std::string> details = {},
|
||||
UID id = UID()) {
|
||||
ASSERT(expectedError.isValid());
|
||||
ASSERT(actualError.isValid());
|
||||
if (expectedError.code() != actualError.code()) {
|
||||
TraceEvent evt(SevError, "TestErrorCodeFailed", id);
|
||||
evt.detail("TestDescription", testDescr);
|
||||
evt.detail("ExpectedError", expectedError.code());
|
||||
evt.error(actualError);
|
||||
for (auto& p : details) {
|
||||
evt.detail(p.first.c_str(), p.second);
|
||||
}
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
WorkloadFactory<ClientLibManagementWorkload> ClientLibOperationsWorkloadFactory("ClientLibManagement");
|
Loading…
Reference in New Issue