Merge branch 'main' of https://github.com/apple/foundationdb into readaware

This commit is contained in:
Xiaoxi Wang 2022-05-18 10:11:18 -07:00
commit 8adf38ba08
28 changed files with 307 additions and 70 deletions

View File

@ -306,7 +306,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.1.3" "7.2.0"
--upgrade-path "6.3.23" "7.0.0" "7.1.5" "7.2.0"
--process-number 1
)
@ -314,7 +314,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "7.0.0" "7.1.3" "7.2.0"
--upgrade-path "7.0.0" "7.1.5" "7.2.0"
--process-number 1
)
@ -322,7 +322,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.1.3" "7.2.0"
--upgrade-path "6.3.23" "7.0.0" "7.1.5" "7.2.0" "7.1.5"
--process-number 3
)
@ -330,7 +330,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.0.0" "7.1.3" "7.2.0"
--upgrade-path "7.0.0" "7.1.5" "7.2.0" "7.1.5"
--process-number 3
)
@ -338,7 +338,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.1.3" "7.2.0"
--upgrade-path "7.1.5" "7.2.0" "7.1.5"
--process-number 3
)

View File

@ -160,6 +160,7 @@ typedef struct mappedkeyvalue {
* take the shortcut. */
FDBGetRangeReqAndResult getRange;
unsigned char buffer[32];
fdb_bool_t boundaryAndExist;
} FDBMappedKeyValue;
#pragma pack(push, 4)

View File

@ -48,6 +48,7 @@ public:
int numClientThreads;
int numDatabases;
int numClients;
int statsIntervalMs = 0;
std::vector<std::pair<std::string, std::string>> knobs;
TestSpec testSpec;
std::string bgBasePath;

View File

@ -21,6 +21,7 @@
#include "TesterScheduler.h"
#include "TesterUtil.h"
#include <boost/asio/detail/chrono.hpp>
#include <memory>
#include <thread>
#include <boost/asio.hpp>
@ -31,6 +32,15 @@ namespace FdbApiTester {
const TTaskFct NO_OP_TASK = []() {};
class AsioTimer : public ITimer {
public:
AsioTimer(io_context& io_ctx, chrono::steady_clock::duration time) : impl(io_ctx, time) {}
void cancel() override { impl.cancel(); }
boost::asio::steady_timer impl;
};
class AsioScheduler : public IScheduler {
public:
AsioScheduler(int numThreads) : numThreads(numThreads) {}
@ -44,6 +54,16 @@ public:
void schedule(TTaskFct task) override { post(io_ctx, task); }
std::unique_ptr<ITimer> scheduleWithDelay(int delayMs, TTaskFct task) override {
auto timer = std::make_unique<AsioTimer>(io_ctx, boost::asio::chrono::milliseconds(delayMs));
timer->impl.async_wait([task](const boost::system::error_code& e) {
if (!e) {
task();
}
});
return timer;
}
void stop() override { work = any_io_executor(); }
void join() override {

View File

@ -32,6 +32,16 @@ using TTaskFct = std::function<void(void)>;
extern const TTaskFct NO_OP_TASK;
/**
* Handle to a scheduled timer
*/
class ITimer {
public:
virtual ~ITimer() {}
virtual void cancel() = 0;
};
/**
* Scheduler for asynchronous execution of tasks on a pool of threads
*/
@ -45,6 +55,9 @@ public:
// Schedule a task for asynchronous execution
virtual void schedule(TTaskFct task) = 0;
// Schedule a task to be executed with a given delay
virtual std::unique_ptr<ITimer> scheduleWithDelay(int delayMs, TTaskFct task) = 0;
// Gracefully stop the scheduler. Waits for already running tasks to be finish
virtual void stop() = 0;

View File

@ -80,7 +80,7 @@ bool WorkloadConfig::getBoolOption(const std::string& name, bool defaultVal) con
WorkloadBase::WorkloadBase(const WorkloadConfig& config)
: manager(nullptr), tasksScheduled(0), numErrors(0), clientId(config.clientId), numClients(config.numClients),
failed(false) {
failed(false), numTxCompleted(0) {
maxErrors = config.getIntOption("maxErrors", 10);
workloadId = fmt::format("{}{}", config.name, clientId);
}
@ -89,6 +89,10 @@ void WorkloadBase::init(WorkloadManager* manager) {
this->manager = manager;
}
void WorkloadBase::printStats() {
info(fmt::format("{} transactions completed", numTxCompleted.load()));
}
void WorkloadBase::schedule(TTaskFct task) {
if (failed) {
return;
@ -106,6 +110,7 @@ void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskF
}
tasksScheduled++;
manager->txExecutor->execute(tx, [this, tx, cont, failOnError]() {
numTxCompleted++;
fdb_error_t err = tx->getErrorCode();
if (tx->getErrorCode() == error_code_success) {
cont();
@ -198,6 +203,9 @@ void WorkloadManager::workloadDone(IWorkload* workload, bool failed) {
bool done = workloads.empty();
lock.unlock();
if (done) {
if (statsTimer) {
statsTimer->cancel();
}
scheduler->stop();
}
}
@ -241,6 +249,24 @@ void WorkloadManager::readControlInput(std::string pipeName) {
}
}
void WorkloadManager::schedulePrintStatistics(int timeIntervalMs) {
statsTimer = scheduler->scheduleWithDelay(timeIntervalMs, [this, timeIntervalMs]() {
for (auto workload : getActiveWorkloads()) {
workload->printStats();
}
this->schedulePrintStatistics(timeIntervalMs);
});
}
std::vector<std::shared_ptr<IWorkload>> WorkloadManager::getActiveWorkloads() {
std::unique_lock<std::mutex> lock(mutex);
std::vector<std::shared_ptr<IWorkload>> res;
for (auto iter : workloads) {
res.push_back(iter.second.ref);
}
return res;
}
void WorkloadManager::handleStopCommand() {
std::unique_lock<std::mutex> lock(mutex);
for (auto& iter : workloads) {

View File

@ -62,6 +62,9 @@ public:
// Get workload control interface if supported, nullptr otherwise
virtual IWorkloadControlIfc* getControlIfc() = 0;
// Print workload statistics
virtual void printStats() = 0;
};
// Workload configuration
@ -100,6 +103,8 @@ public:
std::string getWorkloadId() override { return workloadId; }
void printStats() override;
protected:
// Schedule the a task as a part of the workload
void schedule(TTaskFct task);
@ -150,6 +155,9 @@ protected:
// Workload is failed, no further transactions or continuations will be scheduled by the workload
std::atomic<bool> failed;
// Number of completed transactions
std::atomic<int> numTxCompleted;
};
// Workload manager
@ -175,6 +183,9 @@ public:
return numWorkloadsFailed > 0;
}
// Schedule statistics to be printed in regular timeintervals
void schedulePrintStatistics(int timeIntervalMs);
private:
friend WorkloadBase;
@ -205,6 +216,9 @@ private:
// Handle CHECK command received from the test controller
void handleCheckCommand();
// A thread-safe operation to return a list of active workloads
std::vector<std::shared_ptr<IWorkload>> getActiveWorkloads();
// Transaction executor to be used by the workloads
ITransactionExecutor* txExecutor;
@ -225,6 +239,9 @@ private:
// Output pipe for emitting test control events
std::ofstream outputPipe;
// Timer for printing statistics in regular intervals
std::unique_ptr<ITimer> statsTimer;
};
// A workload factory

View File

@ -53,7 +53,8 @@ enum TesterOptionId {
OPT_OUTPUT_PIPE,
OPT_FDB_API_VERSION,
OPT_TRANSACTION_RETRY_LIMIT,
OPT_BLOB_GRANULE_LOCAL_FILE_PATH
OPT_BLOB_GRANULE_LOCAL_FILE_PATH,
OPT_STATS_INTERVAL
};
CSimpleOpt::SOption TesterOptionDefs[] = //
@ -77,6 +78,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
{ OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP },
{ OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP },
{ OPT_BLOB_GRANULE_LOCAL_FILE_PATH, "--blob-granule-local-file-path", SO_REQ_SEP },
{ OPT_STATS_INTERVAL, "--stats-interval", SO_REQ_SEP },
SO_END_OF_OPTIONS };
void printProgramUsage(const char* execName) {
@ -118,6 +120,8 @@ void printProgramUsage(const char* execName) {
" Path to blob granule files on local filesystem\n"
" -f, --test-file FILE\n"
" Test file to run.\n"
" --stats-interval MILLISECONDS\n"
" Time interval in milliseconds for printing workload statistics (default: 0 - disabled).\n"
" -h, --help Display this help and exit.\n",
FDB_API_VERSION);
}
@ -214,6 +218,9 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
case OPT_BLOB_GRANULE_LOCAL_FILE_PATH:
options.bgBasePath = args.OptionArg();
break;
case OPT_STATS_INTERVAL:
processIntOption(args.OptionText(), args.OptionArg(), 0, 60000, options.statsIntervalMs);
break;
}
return true;
}
@ -335,6 +342,9 @@ bool runWorkloads(TesterOptions& options) {
}
scheduler->start();
if (options.statsIntervalMs) {
workloadMgr.schedulePrintStatistics(options.statsIntervalMs);
}
workloadMgr.run();
return !workloadMgr.failed();
} catch (const std::runtime_error& err) {

View File

@ -30,6 +30,8 @@ import glob
import random
import string
TESTER_STATS_INTERVAL_SEC = 5
def random_string(len):
return ''.join(random.choice(string.ascii_letters + string.digits) for i in range(len))
@ -66,7 +68,8 @@ def dump_client_logs(log_dir):
def run_tester(args, test_file):
cmd = [args.tester_binary,
"--cluster-file", args.cluster_file,
"--test-file", test_file]
"--test-file", test_file,
"--stats-interval", str(TESTER_STATS_INTERVAL_SEC*1000)]
if args.external_client_library is not None:
cmd += ["--external-client-library", args.external_client_library]
if args.tmp_dir is not None:

View File

@ -619,6 +619,19 @@ int workerProcessMain(Arguments const& args, int worker_id, shared_memory::Acces
selectApiVersion(args.api_version);
/* enable distributed tracing */
switch (args.distributed_tracer_client) {
case 1:
err = network::setOptionNothrow(FDB_NET_OPTION_DISTRIBUTED_CLIENT_TRACER, BytesRef(toBytePtr("network_lossy")));
break;
case 2:
err = network::setOptionNothrow(FDB_NET_OPTION_DISTRIBUTED_CLIENT_TRACER, BytesRef(toBytePtr("log_file")));
break;
}
if (err) {
logr.error("network::setOption(FDB_NET_OPTION_DISTRIBUTED_CLIENT_TRACER): {}", err.what());
}
/* enable flatbuffers if specified */
if (args.flatbuffers) {
#ifdef FDB_NET_OPTION_USE_FLATBUFFERS
@ -824,6 +837,7 @@ int initArguments(Arguments& args) {
args.json_output_path[0] = '\0';
args.bg_materialize_files = false;
args.bg_file_path[0] = '\0';
args.distributed_tracer_client = 0;
return 0;
}
@ -1002,6 +1016,8 @@ void usage() {
printf("%-24s %s\n",
" --bg_file_path=PATH",
"Read blob granule files from the local filesystem at PATH and materialize the results.");
printf(
"%-24s %s\n", " --distributed_tracer_client=CLIENT", "Specify client (disabled, network_lossy, log_file)");
}
/* parse benchmark paramters */
@ -1053,6 +1069,7 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
{ "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW },
{ "json_report", optional_argument, NULL, ARG_JSON_REPORT },
{ "bg_file_path", required_argument, NULL, ARG_BG_FILE_PATH },
{ "distributed_tracer_client", required_argument, NULL, ARG_DISTRIBUTED_TRACER_CLIENT },
{ NULL, 0, NULL, 0 }
};
idx = 0;
@ -1240,6 +1257,17 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
case ARG_BG_FILE_PATH:
args.bg_materialize_files = true;
strncpy(args.bg_file_path, optarg, std::min(sizeof(args.bg_file_path), strlen(optarg) + 1));
case ARG_DISTRIBUTED_TRACER_CLIENT:
if (strcmp(optarg, "disabled") == 0) {
args.distributed_tracer_client = 0;
} else if (strcmp(optarg, "network_lossy") == 0) {
args.distributed_tracer_client = 1;
} else if (strcmp(optarg, "log_file") == 0) {
args.distributed_tracer_client = 2;
} else {
args.distributed_tracer_client = -1;
}
break;
}
}
@ -1307,6 +1335,10 @@ int validateArguments(Arguments const& args) {
return -1;
}
}
if (args.distributed_tracer_client < 0) {
logr.error("--disibuted_tracer_client must specify either (disabled, network_lossy, log_file)");
return -1;
}
return 0;
}

View File

@ -72,7 +72,8 @@ enum ArgKind {
ARG_DISABLE_RYW,
ARG_CLIENT_THREADS_PER_VERSION,
ARG_JSON_REPORT,
ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set
ARG_BG_FILE_PATH, // if blob granule files are stored locally, mako will read and materialize them if this is set
ARG_DISTRIBUTED_TRACER_CLIENT
};
constexpr const int OP_COUNT = 0;
@ -161,6 +162,7 @@ struct Arguments {
char json_output_path[PATH_MAX];
bool bg_materialize_files;
char bg_file_path[PATH_MAX];
int distributed_tracer_client;
};
} // namespace mako

View File

@ -181,8 +181,8 @@ struct GetMappedRangeResult {
std::string, // value
std::string, // begin
std::string, // end
std::vector<std::pair<std::string, std::string>> // range results
>>
std::vector<std::pair<std::string, std::string>>, // range results
fdb_bool_t>>
mkvs;
// True if values remain in the key range requested.
bool more;
@ -306,6 +306,7 @@ GetMappedRangeResult get_mapped_range(fdb::Transaction& tr,
auto value = extractString(mkv.value);
auto begin = extractString(mkv.getRange.begin.key);
auto end = extractString(mkv.getRange.end.key);
bool boundaryAndExist = mkv.boundaryAndExist;
// std::cout << "key:" << key << " value:" << value << " begin:" << begin << " end:" << end << std::endl;
std::vector<std::pair<std::string, std::string>> range_results;
@ -316,7 +317,7 @@ GetMappedRangeResult get_mapped_range(fdb::Transaction& tr,
range_results.emplace_back(k, v);
// std::cout << "[" << i << "]" << k << " -> " << v << std::endl;
}
result.mkvs.emplace_back(key, value, begin, end, range_results);
result.mkvs.emplace_back(key, value, begin, end, range_results, boundaryAndExist);
}
return result;
}
@ -993,7 +994,15 @@ TEST_CASE("fdb_transaction_get_mapped_range") {
while (1) {
int beginId = 1;
int endId = 19;
const int matchIndex = deterministicRandom()->random01() > 0.5 ? MATCH_INDEX_NONE : MATCH_INDEX_ALL;
const double r = deterministicRandom()->random01();
int matchIndex = MATCH_INDEX_ALL;
if (r < 0.25) {
matchIndex = MATCH_INDEX_NONE;
} else if (r < 0.5) {
matchIndex = MATCH_INDEX_MATCHED_ONLY;
} else if (r < 0.75) {
matchIndex = MATCH_INDEX_UNMATCHED_ONLY;
}
auto result = getMappedIndexEntries(beginId, endId, tr, matchIndex);
if (result.err) {
@ -1007,13 +1016,29 @@ TEST_CASE("fdb_transaction_get_mapped_range") {
CHECK(!result.more);
int id = beginId;
bool boundary;
for (int i = 0; i < expectSize; i++, id++) {
const auto& [key, value, begin, end, range_results] = result.mkvs[i];
boundary = i == 0 || i == expectSize - 1;
const auto& [key, value, begin, end, range_results, boundaryAndExist] = result.mkvs[i];
if (matchIndex == MATCH_INDEX_ALL || i == 0 || i == expectSize - 1) {
CHECK(indexEntryKey(id).compare(key) == 0);
} else if (matchIndex == MATCH_INDEX_MATCHED_ONLY) {
// now we cannot generate a workload that only has partial results matched
// thus expecting everything matched
// TODO: create tests to generate workloads with partial secondary results present
CHECK(indexEntryKey(id).compare(key) == 0);
} else if (matchIndex == MATCH_INDEX_UNMATCHED_ONLY) {
// now we cannot generate a workload that only has partial results matched
// thus expecting everything NOT matched(except for the boundary asserted above)
// TODO: create tests to generate workloads with partial secondary results present
CHECK(EMPTY.compare(key) == 0);
} else {
CHECK(EMPTY.compare(key) == 0);
}
// TODO: create tests to generate workloads with partial secondary results present
CHECK(boundaryAndExist == boundary);
CHECK(EMPTY.compare(value) == 0);
CHECK(range_results.size() == SPLIT_SIZE);
for (int split = 0; split < SPLIT_SIZE; split++) {

View File

@ -35,6 +35,8 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
static public final int MATCH_INDEX_ALL = 0;
static public final int MATCH_INDEX_NONE = 1;
static public final int MATCH_INDEX_MATCHED_ONLY = 2;
static public final int MATCH_INDEX_UNMATCHED_ONLY = 3;
private final Database database;
private final Executor executor;

View File

@ -150,6 +150,8 @@ static const Tag cacheTag{ tagLocalitySpecial, 2 };
const int MATCH_INDEX_ALL = 0;
const int MATCH_INDEX_NONE = 1;
const int MATCH_INDEX_MATCHED_ONLY = 2;
const int MATCH_INDEX_UNMATCHED_ONLY = 3;
enum { txsTagOld = -1, invalidTagOld = -100 };
@ -762,9 +764,18 @@ struct MappedKeyValueRef : KeyValueRef {
MappedReqAndResultRef reqAndResult;
// boundary KVs are always returned so that caller can use it as a continuation,
// for non-boundary KV, it is always false.
// for boundary KV, it is true only when the secondary query succeeds(return non-empty).
// Note: only MATCH_INDEX_MATCHED_ONLY and MATCH_INDEX_UNMATCHED_ONLY modes can make use of it,
// to decide whether the boudnary is a match/unmatch.
// In the case of MATCH_INDEX_ALL and MATCH_INDEX_NONE, caller should not care if boundary has a match or not.
bool boundaryAndExist;
MappedKeyValueRef() = default;
MappedKeyValueRef(Arena& a, const MappedKeyValueRef& copyFrom) : KeyValueRef(a, copyFrom) {
const auto& reqAndResultCopyFrom = copyFrom.reqAndResult;
boundaryAndExist = copyFrom.boundaryAndExist;
if (std::holds_alternative<GetValueReqAndResultRef>(reqAndResultCopyFrom)) {
auto getValue = std::get<GetValueReqAndResultRef>(reqAndResultCopyFrom);
reqAndResult = GetValueReqAndResultRef(a, getValue);
@ -778,7 +789,7 @@ struct MappedKeyValueRef : KeyValueRef {
bool operator==(const MappedKeyValueRef& rhs) const {
return static_cast<const KeyValueRef&>(*this) == static_cast<const KeyValueRef&>(rhs) &&
reqAndResult == rhs.reqAndResult;
reqAndResult == rhs.reqAndResult && boundaryAndExist == rhs.boundaryAndExist;
}
bool operator!=(const MappedKeyValueRef& rhs) const { return !(rhs == *this); }
@ -788,7 +799,7 @@ struct MappedKeyValueRef : KeyValueRef {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((KeyValueRef&)*this), reqAndResult);
serializer(ar, ((KeyValueRef&)*this), reqAndResult, boundaryAndExist);
}
};

View File

@ -80,6 +80,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
* and take the shortcut. */
FDBGetRangeReqAndResult getRange;
unsigned char buffer[32];
bool boundaryAndExist;
} FDBMappedKeyValue;
#pragma pack(push, 4)

View File

@ -4082,7 +4082,7 @@ int64_t inline getRangeResultFamilyBytes(MappedRangeResultRef result) {
int64_t bytes = 0;
for (const MappedKeyValueRef& mappedKeyValue : result) {
bytes += mappedKeyValue.key.size() + mappedKeyValue.value.size();
bytes += sizeof(mappedKeyValue.boundaryAndExist);
auto& reqAndResult = mappedKeyValue.reqAndResult;
if (std::holds_alternative<GetValueReqAndResultRef>(reqAndResult)) {
auto getValue = std::get<GetValueReqAndResultRef>(reqAndResult);

View File

@ -106,14 +106,14 @@ void ISimulator::displayWorkers() const {
for (auto& processInfo : machineRecord.second) {
printf(" %9s %-10s%-13s%-8s %-6s %-9s %-8s %-48s %-40s\n",
processInfo->address.toString().c_str(),
processInfo->name,
processInfo->name.c_str(),
processInfo->startingClass.toString().c_str(),
(processInfo->isExcluded() ? "True" : "False"),
(processInfo->failed ? "True" : "False"),
(processInfo->rebooting ? "True" : "False"),
(processInfo->isCleared() ? "True" : "False"),
getRoles(processInfo->address).c_str(),
processInfo->dataFolder);
processInfo->dataFolder.c_str());
}
}
@ -1588,7 +1588,7 @@ public:
.detail("Protected", protectedAddresses.count(machine->address))
.backtrace();
// This will remove all the "tracked" messages that came from the machine being killed
if (std::string(machine->name) != "remote flow process")
if (machine->name != "remote flow process")
latestEventCache.clear();
machine->failed = true;
} else if (kt == InjectFaults) {
@ -1618,7 +1618,7 @@ public:
ASSERT(false);
}
ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting ||
std::string(machine->name) == "remote flow process");
machine->name == "remote flow process");
}
void rebootProcess(ProcessInfo* process, KillType kt) override {
if (kt == RebootProcessAndDelete && protectedAddresses.count(process->address)) {
@ -2465,7 +2465,7 @@ ACTOR void doReboot(ISimulator::ProcessInfo* p, ISimulator::KillType kt) {
.detail("Rebooting", p->rebooting)
.detail("Reliable", p->isReliable());
return;
} else if (std::string(p->name) == "remote flow process") {
} else if (p->name == "remote flow process") {
TraceEvent(SevDebug, "DoRebootFailed").detail("Name", p->name).detail("Address", p->address);
return;
} else if (p->getChilds().size()) {

View File

@ -59,9 +59,9 @@ public:
struct MachineInfo;
struct ProcessInfo : NonCopyable {
const char* name;
const char* coordinationFolder;
const char* dataFolder;
std::string name;
std::string coordinationFolder;
std::string dataFolder;
MachineInfo* machine;
NetworkAddressList addresses;
NetworkAddress address;
@ -182,7 +182,7 @@ public:
std::string toString() const {
return format(
"name: %s address: %s zone: %s datahall: %s class: %s excluded: %d cleared: %d",
name,
name.c_str(),
formatIpPort(addresses.address.ip, addresses.address.port).c_str(),
(locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"),
(locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"),

View File

@ -80,7 +80,12 @@ struct EncryptBaseCipherKey {
};
using EncryptBaseDomainIdCache = std::unordered_map<EncryptCipherDomainId, EncryptBaseCipherKey>;
using EncryptBaseCipherKeyIdCache = std::unordered_map<EncryptCipherBaseKeyId, EncryptBaseCipherKey>;
using EncryptBaseCipherDomainIdKeyIdCacheKey = std::pair<EncryptCipherDomainId, EncryptCipherBaseKeyId>;
using EncryptBaseCipherDomainIdKeyIdCacheKeyHash = boost::hash<EncryptBaseCipherDomainIdKeyIdCacheKey>;
using EncryptBaseCipherDomainIdKeyIdCache = std::unordered_map<EncryptBaseCipherDomainIdKeyIdCacheKey,
EncryptBaseCipherKey,
EncryptBaseCipherDomainIdKeyIdCacheKeyHash>;
struct EncryptKeyProxyData : NonCopyable, ReferenceCounted<EncryptKeyProxyData> {
public:
@ -89,7 +94,7 @@ public:
Future<Void> encryptionKeyRefresher;
EncryptBaseDomainIdCache baseCipherDomainIdCache;
EncryptBaseCipherKeyIdCache baseCipherKeyIdCache;
EncryptBaseCipherDomainIdKeyIdCache baseCipherDomainIdKeyIdCache;
std::unique_ptr<KmsConnector> kmsConnector;
@ -113,6 +118,12 @@ public:
numResponseWithErrors("EKPNumResponseWithErrors", ekpCacheMetrics),
numEncryptionKeyRefreshErrors("EKPNumEncryptionKeyRefreshErrors", ekpCacheMetrics) {}
EncryptBaseCipherDomainIdKeyIdCacheKey getBaseCipherDomainIdKeyIdCacheKey(
const EncryptCipherDomainId domainId,
const EncryptCipherBaseKeyId baseCipherId) {
return std::make_pair(domainId, baseCipherId);
}
void insertIntoBaseDomainIdCache(const EncryptCipherDomainId domainId,
const EncryptCipherBaseKeyId baseCipherId,
const StringRef baseCipherKey) {
@ -131,7 +142,8 @@ public:
// Given an cipherKey is immutable, it is OK to NOT expire cached information.
// TODO: Update cache to support LRU eviction policy to limit the total cache size.
baseCipherKeyIdCache[baseCipherId] = EncryptBaseCipherKey(domainId, baseCipherId, baseCipherKey, true);
EncryptBaseCipherDomainIdKeyIdCacheKey cacheKey = getBaseCipherDomainIdKeyIdCacheKey(domainId, baseCipherId);
baseCipherDomainIdKeyIdCache[cacheKey] = EncryptBaseCipherKey(domainId, baseCipherId, baseCipherKey, true);
}
template <class Reply>
@ -193,8 +205,10 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
}
for (const auto& item : dedupedCipherIds) {
const auto itr = ekpProxyData->baseCipherKeyIdCache.find(item.first);
if (itr != ekpProxyData->baseCipherKeyIdCache.end()) {
const EncryptBaseCipherDomainIdKeyIdCacheKey cacheKey =
ekpProxyData->getBaseCipherDomainIdKeyIdCacheKey(item.second, item.first);
const auto itr = ekpProxyData->baseCipherDomainIdKeyIdCache.find(cacheKey);
if (itr != ekpProxyData->baseCipherDomainIdKeyIdCache.end()) {
ASSERT(itr->second.isValid());
cachedCipherDetails.emplace_back(
itr->second.domainId, itr->second.baseCipherId, itr->second.baseCipherKey, keyIdsReply.arena);

View File

@ -160,16 +160,17 @@ ACTOR Future<int> spawnSimulated(std::vector<std::string> paramList,
}
}
state int result = 0;
child = g_pSimulator->newProcess("remote flow process",
self->address.ip,
0,
self->address.isTLS(),
self->addresses.secondaryAddress.present() ? 2 : 1,
self->locality,
ProcessClass(ProcessClass::UnsetClass, ProcessClass::AutoSource),
self->dataFolder,
self->coordinationFolder, // do we need to customize this coordination folder path?
self->protocolVersion);
child = g_pSimulator->newProcess(
"remote flow process",
self->address.ip,
0,
self->address.isTLS(),
self->addresses.secondaryAddress.present() ? 2 : 1,
self->locality,
ProcessClass(ProcessClass::UnsetClass, ProcessClass::AutoSource),
self->dataFolder.c_str(),
self->coordinationFolder.c_str(), // do we need to customize this coordination folder path?
self->protocolVersion);
wait(g_pSimulator->onProcess(child));
state Future<ISimulator::KillType> onShutdown = child->onShutdown();
state Future<ISimulator::KillType> parentShutdown = self->onShutdown();

View File

@ -54,7 +54,8 @@ const char* BASE_CIPHER_TAG = "baseCipher";
const char* CIPHER_KEY_DETAILS_TAG = "cipher_key_details";
const char* ENCRYPT_DOMAIN_ID_TAG = "encrypt_domain_id";
const char* ERROR_TAG = "error";
const char* ERROR_DETAIL_TAG = "details";
const char* ERROR_MSG_TAG = "errMsg";
const char* ERROR_CODE_TAG = "errCode";
const char* KMS_URLS_TAG = "kms_urls";
const char* QUERY_MODE_TAG = "query_mode";
const char* REFRESH_KMS_URLS_TAG = "refresh_kms_urls";
@ -282,7 +283,8 @@ void parseKmsResponse(Reference<RESTKmsConnectorCtx> ctx,
// "url1", "url2", ...
// ],
// "error" : { // Optional, populated by the KMS, if present, rest of payload is ignored.
// "details": <details>
// "errMsg" : <message>
// "errCode": <code>
// }
// }
@ -296,12 +298,26 @@ void parseKmsResponse(Reference<RESTKmsConnectorCtx> ctx,
// Check if response has error
if (doc.HasMember(ERROR_TAG)) {
if (doc[ERROR_TAG].HasMember(ERROR_DETAIL_TAG) && doc[ERROR_TAG][ERROR_DETAIL_TAG].IsString()) {
Standalone<StringRef> errRef = makeString(doc[ERROR_TAG][ERROR_DETAIL_TAG].GetStringLength());
memcpy(mutateString(errRef),
doc[ERROR_TAG][ERROR_DETAIL_TAG].GetString(),
doc[ERROR_TAG][ERROR_DETAIL_TAG].GetStringLength());
TraceEvent("KMSErrorResponse", ctx->uid).detail("ErrorDetails", errRef.toString());
Standalone<StringRef> errMsgRef;
Standalone<StringRef> errCodeRef;
if (doc[ERROR_TAG].HasMember(ERROR_MSG_TAG) && doc[ERROR_TAG][ERROR_MSG_TAG].IsString()) {
errMsgRef = makeString(doc[ERROR_TAG][ERROR_MSG_TAG].GetStringLength());
memcpy(mutateString(errMsgRef),
doc[ERROR_TAG][ERROR_MSG_TAG].GetString(),
doc[ERROR_TAG][ERROR_MSG_TAG].GetStringLength());
}
if (doc[ERROR_TAG].HasMember(ERROR_CODE_TAG) && doc[ERROR_TAG][ERROR_CODE_TAG].IsString()) {
errMsgRef = makeString(doc[ERROR_TAG][ERROR_CODE_TAG].GetStringLength());
memcpy(mutateString(errMsgRef),
doc[ERROR_TAG][ERROR_CODE_TAG].GetString(),
doc[ERROR_TAG][ERROR_CODE_TAG].GetStringLength());
}
if (!errCodeRef.empty() || !errMsgRef.empty()) {
TraceEvent("KMSErrorResponse", ctx->uid)
.detail("ErrorMsg", errMsgRef.empty() ? "" : errMsgRef.toString())
.detail("ErrorCode", errCodeRef.empty() ? "" : errCodeRef.toString());
} else {
TraceEvent("KMSErrorResponse_EmptyDetails", ctx->uid).log();
}
@ -1194,7 +1210,7 @@ void testKMSErrorResponse(Reference<RESTKmsConnectorCtx> ctx) {
rapidjson::Value errorTag(rapidjson::kObjectType);
// Add 'error_detail'
rapidjson::Value eKey(ERROR_DETAIL_TAG, doc.GetAllocator());
rapidjson::Value eKey(ERROR_MSG_TAG, doc.GetAllocator());
rapidjson::Value detailInfo;
detailInfo.SetString("Foo is always bad", doc.GetAllocator());
errorTag.AddMember(eKey, detailInfo, doc.GetAllocator());

View File

@ -3749,10 +3749,11 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
state int sz = input.data.size();
state int i = 0;
for (; i < sz; i++) {
KeyValueRef* it = &input.data[i];
state KeyValueRef* it = &input.data[i];
state MappedKeyValueRef kvm;
state bool isBoundary = i == 0 || i == sz - 1;
// need to keep the boundary, so that caller can use it as a continuation.
if ((i == 0 || i == sz - 1) || matchIndex == MATCH_INDEX_ALL) {
if (isBoundary || matchIndex == MATCH_INDEX_ALL) {
kvm.key = it->key;
kvm.value = it->value;
}
@ -3768,11 +3769,19 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
// Use the mappedKey as the prefix of the range query.
GetRangeReqAndResultRef getRange =
wait(quickGetKeyValues(data, mappedKey, input.version, &(result.arena), pOriginalReq));
if (!getRange.result.empty() && matchIndex == MATCH_INDEX_MATCHED_ONLY ||
getRange.result.empty() && matchIndex == MATCH_INDEX_UNMATCHED_ONLY) {
kvm.key = it->key;
kvm.value = it->value;
}
kvm.boundaryAndExist = isBoundary && !getRange.result.empty();
kvm.reqAndResult = getRange;
} else {
GetValueReqAndResultRef getValue =
wait(quickGetValue(data, mappedKey, input.version, &(result.arena), pOriginalReq));
kvm.reqAndResult = getValue;
kvm.boundaryAndExist = isBoundary && getValue.result.present();
}
result.data.push_back(result.arena, kvm);
}

View File

@ -494,7 +494,7 @@ void printSimulatedTopology() {
printf("%sAddress: %s\n", indent.c_str(), p->address.toString().c_str());
indent += " ";
printf("%sClass: %s\n", indent.c_str(), p->startingClass.toString().c_str());
printf("%sName: %s\n", indent.c_str(), p->name);
printf("%sName: %s\n", indent.c_str(), p->name.c_str());
}
}

View File

@ -72,7 +72,7 @@ class WorkloadProcessState {
locality,
ProcessClass(ProcessClass::TesterClass, ProcessClass::AutoSource),
dataFolder.c_str(),
parent->coordinationFolder,
parent->coordinationFolder.c_str(),
parent->protocolVersion);
self->childProcess->excludeFromRestarts = true;
wait(g_simulator.onProcess(self->childProcess, TaskPriority::DefaultYield));

View File

@ -43,8 +43,9 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
Arena arena;
uint64_t minDomainId;
uint64_t maxDomainId;
std::unordered_map<uint64_t, StringRef> cipherIdMap;
std::vector<uint64_t> cipherIds;
using CacheKey = std::pair<int64_t, uint64_t>;
std::unordered_map<CacheKey, StringRef, boost::hash<CacheKey>> cipherIdMap;
std::vector<CacheKey> cipherIds;
int numDomains;
std::vector<uint64_t> domainIds;
static std::atomic<int> seed;
@ -207,8 +208,9 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
self->cipherIdMap.clear();
self->cipherIds.clear();
for (auto& item : rep.baseCipherDetails) {
self->cipherIdMap.emplace(item.baseCipherId, StringRef(self->arena, item.baseCipherKey));
self->cipherIds.emplace_back(item.baseCipherId);
CacheKey cacheKey = std::make_pair(item.encryptDomainId, item.baseCipherId);
self->cipherIdMap.emplace(cacheKey, StringRef(self->arena, item.baseCipherKey));
self->cipherIds.emplace_back(cacheKey);
}
state int numIterations = deterministicRandom()->randomInt(512, 786);
@ -221,7 +223,7 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
req.debugId = deterministicRandom()->randomUniqueID();
}
for (int i = idx; i < nIds && i < self->cipherIds.size(); i++) {
req.baseCipherIds.emplace_back(std::make_pair(self->cipherIds[i], 1));
req.baseCipherIds.emplace_back(std::make_pair(self->cipherIds[i].second, self->cipherIds[i].first));
}
if (req.baseCipherIds.empty()) {
// No keys to query; continue
@ -238,9 +240,10 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
ASSERT_EQ(rep.numHits, expectedHits);
// Valdiate the 'cipherKey' content against the one read while querying by domainIds
for (auto& item : rep.baseCipherDetails) {
const auto itr = self->cipherIdMap.find(item.baseCipherId);
CacheKey cacheKey = std::make_pair(item.encryptDomainId, item.baseCipherId);
const auto itr = self->cipherIdMap.find(cacheKey);
ASSERT(itr != self->cipherIdMap.end());
Standalone<StringRef> toCompare = self->cipherIdMap[item.baseCipherId];
Standalone<StringRef> toCompare = self->cipherIdMap[cacheKey];
if (toCompare.compare(item.baseCipherKey) != 0) {
TraceEvent("Mismatch")
.detail("Id", item.baseCipherId)
@ -264,8 +267,8 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
// Prepare a lookup with valid and invalid keyIds - SimEncryptKmsProxy should throw encrypt_key_not_found()
std::vector<std::pair<uint64_t, int64_t>> baseCipherIds;
for (auto id : self->cipherIds) {
baseCipherIds.emplace_back(std::make_pair(id, 1));
for (auto item : self->cipherIds) {
baseCipherIds.emplace_back(std::make_pair(item.second, item.first));
}
baseCipherIds.emplace_back(std::make_pair(SERVER_KNOBS->SIM_KMS_MAX_KEYS + 10, 1));
EKPGetBaseCipherKeysByIdsRequest req(deterministicRandom()->randomUniqueID(), baseCipherIds);

View File

@ -154,9 +154,22 @@ struct GetMappedRangeWorkload : ApiWorkload {
// indexEntryKey(expectedId) " << printable(indexEntryKey(expectedId)) << std::endl;
if (matchIndex == MATCH_INDEX_ALL || isBoundary) {
ASSERT(it->key == indexEntryKey(expectedId));
} else if (matchIndex == MATCH_INDEX_MATCHED_ONLY) {
// now we cannot generate a workload that only has partial results matched
// thus expecting everything matched
// TODO: create tests to generate workloads with partial secondary results present
ASSERT(it->key == indexEntryKey(expectedId));
} else if (matchIndex == MATCH_INDEX_UNMATCHED_ONLY) {
// now we cannot generate a workload that only has partial results matched
// thus expecting everything NOT matched(except for the boundary asserted above)
// TODO: create tests to generate workloads with partial secondary results present
ASSERT(it->key == EMPTY);
} else {
ASSERT(it->key == EMPTY);
}
// TODO: create tests to generate workloads with partial secondary results present
ASSERT(it->boundaryAndExist == isBoundary);
ASSERT(it->value == EMPTY);
if (self->SPLIT_RECORDS) {
@ -417,7 +430,15 @@ struct GetMappedRangeWorkload : ApiWorkload {
Key mapper = getMapper(self);
// The scanned range cannot be too large to hit get_mapped_key_values_has_more. We have a unit validating the
// error is thrown when the range is large.
int matchIndex = deterministicRandom()->random01() > 0.5 ? MATCH_INDEX_NONE : MATCH_INDEX_ALL;
const double r = deterministicRandom()->random01();
int matchIndex = MATCH_INDEX_ALL;
if (r < 0.25) {
matchIndex = MATCH_INDEX_NONE;
} else if (r < 0.5) {
matchIndex = MATCH_INDEX_MATCHED_ONLY;
} else if (r < 0.75) {
matchIndex = MATCH_INDEX_UNMATCHED_ONLY;
}
wait(self->scanMappedRange(cx, 10, 490, mapper, self, matchIndex));
return Void();
}

View File

@ -72,13 +72,13 @@ struct SaveAndKillWorkload : TestWorkload {
std::map<std::string, ISimulator::ProcessInfo*> allProcessesMap;
for (const auto& [_, process] : rebootingProcesses) {
if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end() &&
std::string(process->name) != "remote flow process") {
process->name != "remote flow process") {
allProcessesMap[process->dataFolder] = process;
}
}
for (const auto& process : processes) {
if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end() &&
std::string(process->name) != "remote flow process") {
process->name != "remote flow process") {
allProcessesMap[process->dataFolder] = process;
}
}
@ -106,18 +106,22 @@ struct SaveAndKillWorkload : TestWorkload {
ini.SetValue(machineIdString,
format("ipAddr%d", process->address.port - 1).c_str(),
process->address.ip.toString().c_str());
ini.SetValue(machineIdString, format("%d", process->address.port - 1).c_str(), process->dataFolder);
ini.SetValue(
machineIdString, format("c%d", process->address.port - 1).c_str(), process->coordinationFolder);
machineIdString, format("%d", process->address.port - 1).c_str(), process->dataFolder.c_str());
ini.SetValue(machineIdString,
format("c%d", process->address.port - 1).c_str(),
process->coordinationFolder.c_str());
j++;
} else {
ini.SetValue(machineIdString,
format("ipAddr%d", process->address.port - 1).c_str(),
process->address.ip.toString().c_str());
int oldValue = machines.find(machineId)->second;
ini.SetValue(machineIdString, format("%d", process->address.port - 1).c_str(), process->dataFolder);
ini.SetValue(
machineIdString, format("c%d", process->address.port - 1).c_str(), process->coordinationFolder);
machineIdString, format("%d", process->address.port - 1).c_str(), process->dataFolder.c_str());
ini.SetValue(machineIdString,
format("c%d", process->address.port - 1).c_str(),
process->coordinationFolder.c_str());
machines.erase(machines.find(machineId));
machines.insert(std::pair<std::string, int>(machineId, oldValue + 1));
}

View File

@ -21,6 +21,8 @@ from local_cluster import LocalCluster, random_secret_string
SUPPORTED_PLATFORMS = ["x86_64"]
SUPPORTED_VERSIONS = [
"7.2.0",
"7.1.5",
"7.1.4",
"7.1.3",
"7.1.2",
"7.1.1",
@ -73,6 +75,7 @@ LOCAL_OLD_BINARY_REPO = "/opt/foundationdb/old/"
CURRENT_VERSION = "7.2.0"
HEALTH_CHECK_TIMEOUT_SEC = 5
PROGRESS_CHECK_TIMEOUT_SEC = 30
TESTER_STATS_INTERVAL_SEC = 5
TRANSACTION_RETRY_LIMIT = 100
MAX_DOWNLOAD_ATTEMPTS = 5
RUN_WITH_GDB = False
@ -398,6 +401,8 @@ class UpgradeTest:
self.tmp_dir,
"--transaction-retry-limit",
str(TRANSACTION_RETRY_LIMIT),
"--stats-interval",
str(TESTER_STATS_INTERVAL_SEC*1000)
]
if RUN_WITH_GDB:
cmd_args = ["gdb", "-ex", "run", "--args"] + cmd_args