Merge branch 'main' of https://github.com/apple/foundationdb into feature/main/tenantCheck

This commit is contained in:
Xiaoxi Wang 2022-12-28 09:30:24 -07:00
commit 1e0f401ae4
24 changed files with 313 additions and 111 deletions

View File

@ -528,7 +528,7 @@ function(add_python_venv_test)
if(USE_SANITIZER)
set(test_env_vars "${test_env_vars};${SANITIZER_OPTIONS}")
endif()
set_tests_properties("${T_NAME}" PROPERTIES ENVIRONMENT ${test_env_vars})
set_tests_properties("${T_NAME}" PROPERTIES ENVIRONMENT "${test_env_vars}")
endfunction()
# Creates a single cluster before running the specified command (usually a ctest test)

View File

@ -178,6 +178,13 @@ else()
add_compile_options(-ggdb1)
endif()
if(CLANG)
# The default DWARF 5 format does not play nicely with GNU Binutils 2.39 and earlier, resulting
# in tools like addr2line omitting line numbers. We can consider removing this once we are able
# to use a version that has a fix.
add_compile_options(-gdwarf-4)
endif()
if(NOT FDB_RELEASE)
# Enable compression of the debug sections. This reduces the size of the binaries several times.
# We do not enable it release builds, because CPack fails to generate debuginfo packages when

View File

@ -1538,13 +1538,11 @@ void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Op
persistentOptions.emplace_back(option, value.castTo<Standalone<StringRef>>());
}
if (itr->first == FDBTransactionOptions::TIMEOUT) {
setTimeout(value);
}
auto tr = getTransaction();
if (tr.transaction) {
tr.transaction->setOption(option, value);
} else if (itr->first == FDBTransactionOptions::TIMEOUT) {
setTimeout(value);
}
}

View File

@ -307,6 +307,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL, 10 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
init( TENANT_CACHE_STORAGE_USAGE_TRACE_INTERVAL, 300 );
init( CP_FETCH_TENANTS_OVER_STORAGE_QUOTA_INTERVAL, 5 ); if( randomize && BUGGIFY ) CP_FETCH_TENANTS_OVER_STORAGE_QUOTA_INTERVAL = deterministicRandom()->randomInt(1, 10);
init( DD_BUILD_EXTRA_TEAMS_OVERRIDE, 10 ); if( randomize && BUGGIFY ) DD_BUILD_EXTRA_TEAMS_OVERRIDE = 2;
// TeamRemover
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -1440,7 +1440,8 @@ struct DeleteTenantImpl {
Reference<typename DB::TransactionT> tr) {
state Optional<TenantMapEntry> tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName));
if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId) {
if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId ||
tenantEntry.get().tenantState == TenantState::RENAMING_TO) {
throw tenant_not_found();
}
@ -1452,11 +1453,6 @@ struct DeleteTenantImpl {
}
if (tenantEntry.get().tenantState != TenantState::REMOVING) {
// Disallow removing the "new" name of a renamed tenant before it completes
if (tenantEntry.get().tenantState == TenantState::RENAMING_TO) {
throw tenant_not_found();
}
state TenantMapEntry updatedEntry = tenantEntry.get();
// Check if we are deleting a tenant in the middle of a rename
updatedEntry.tenantState = TenantState::REMOVING;

View File

@ -269,6 +269,7 @@ public:
double DD_FAILURE_TIME;
double DD_ZERO_HEALTHY_TEAM_DELAY;
int DD_BUILD_EXTRA_TEAMS_OVERRIDE; // build extra teams to allow data movement to progress. must be larger than 0
// Run storage enginee on a child process on the same machine with storage process
bool REMOTE_KV_STORE;

View File

@ -245,8 +245,7 @@ Future<Void> AsyncFileCached::changeFileSize(int64_t size) {
// Wait for the page truncations to finish, then truncate the underlying file
// Template types are being provided explicitly because they can't be automatically deduced for some reason.
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
waitForAll(actors), [=](Void _) -> Future<Void> { return uncached->truncate(size); });
return mapAsync(waitForAll(actors), [=](Void _) -> Future<Void> { return uncached->truncate(size); });
}
Future<Void> AsyncFileCached::flush() {

View File

@ -216,7 +216,7 @@ struct SubNetTest {
return IPAddress(arr[0]);
} else {
std::array<unsigned char, 16> res;
memcpy(res.data(), arr, 4);
memcpy(res.data(), arr, 16);
return IPAddress(res);
}
}

View File

@ -74,8 +74,8 @@ public:
// Wait for diskDelay before submitting the I/O
// Template types are being provided explicitly because they can't be automatically deduced for some reason.
// Capture file by value in case this is destroyed during the delay
return mapAsync<Void, std::function<Future<int>(Void)>, int>(
delay(diskDelay), [=, file = file](Void _) -> Future<int> { return file->read(data, length, offset); });
return mapAsync(delay(diskDelay),
[=, file = file](Void _) -> Future<int> { return file->read(data, length, offset); });
}
Future<Void> write(void const* data, int length, int64_t offset) override {
@ -115,20 +115,19 @@ public:
// Wait for diskDelay before submitting the I/O
// Capture file by value in case this is destroyed during the delay
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
delay(getDelay()), [=, file = file](Void _) -> Future<Void> {
if (pdata) {
return map(holdWhile(arena, file->write(pdata, length, offset)),
[corruptedBlock, file = file](auto res) {
if (g_network->isSimulated()) {
g_simulator->corruptedBlocks.emplace(file->getFilename(), corruptedBlock);
}
return res;
});
}
return mapAsync(delay(getDelay()), [=, file = file](Void _) -> Future<Void> {
if (pdata) {
return map(holdWhile(arena, file->write(pdata, length, offset)),
[corruptedBlock, file = file](auto res) {
if (g_network->isSimulated()) {
g_simulator->corruptedBlocks.emplace(file->getFilename(), corruptedBlock);
}
return res;
});
}
return file->write(data, length, offset);
});
return file->write(data, length, offset);
});
}
Future<Void> truncate(int64_t size) override {
@ -138,17 +137,16 @@ public:
// Wait for diskDelay before submitting the I/O
// Capture file by value in case this is destroyed during the delay
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
delay(diskDelay), [size, file = file](Void _) -> Future<Void> {
constexpr auto maxBlockValue =
std::numeric_limits<decltype(g_simulator->corruptedBlocks)::key_type::second_type>::max();
auto firstDeletedBlock =
g_simulator->corruptedBlocks.lower_bound(std::make_pair(file->getFilename(), size / 4096));
auto lastFileBlock =
g_simulator->corruptedBlocks.upper_bound(std::make_pair(file->getFilename(), maxBlockValue));
g_simulator->corruptedBlocks.erase(firstDeletedBlock, lastFileBlock);
return file->truncate(size);
});
return mapAsync(delay(diskDelay), [size, file = file](Void _) -> Future<Void> {
constexpr auto maxBlockValue =
std::numeric_limits<decltype(g_simulator->corruptedBlocks)::key_type::second_type>::max();
auto firstDeletedBlock =
g_simulator->corruptedBlocks.lower_bound(std::make_pair(file->getFilename(), size / 4096));
auto lastFileBlock =
g_simulator->corruptedBlocks.upper_bound(std::make_pair(file->getFilename(), maxBlockValue));
g_simulator->corruptedBlocks.erase(firstDeletedBlock, lastFileBlock);
return file->truncate(size);
});
}
Future<Void> sync() override {
@ -158,8 +156,7 @@ public:
// Wait for diskDelay before submitting the I/O
// Capture file by value in case this is destroyed during the delay
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
delay(diskDelay), [=, file = file](Void _) -> Future<Void> { return file->sync(); });
return mapAsync(delay(diskDelay), [=, file = file](Void _) -> Future<Void> { return file->sync(); });
}
Future<int64_t> size() const override {
@ -169,8 +166,7 @@ public:
// Wait for diskDelay before submitting the I/O
// Capture file by value in case this is destroyed during the delay
return mapAsync<Void, std::function<Future<int64_t>(Void)>, int64_t>(
delay(diskDelay), [=, file = file](Void _) -> Future<int64_t> { return file->size(); });
return mapAsync(delay(diskDelay), [=, file = file](Void _) -> Future<int64_t> { return file->size(); });
}
int64_t debugFD() const override { return file->debugFD(); }

View File

@ -299,14 +299,13 @@ struct RequestData : NonCopyable {
requestStarted = false;
if (backoff > 0) {
response = mapAsync<Void, std::function<Future<Reply>(Void)>, Reply>(
delay(backoff), [this, stream, &request, model, alternatives, channel](Void _) {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
Future<Reply> resp = stream->tryGetReply(request);
maybeDuplicateTSSRequest(stream, request, model, resp, alternatives, channel);
return resp;
});
response = mapAsync(delay(backoff), [this, stream, &request, model, alternatives, channel](Void _) {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
Future<Reply> resp = stream->tryGetReply(request);
maybeDuplicateTSSRequest(stream, request, model, resp, alternatives, channel);
return resp;
});
} else {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));

View File

@ -1553,11 +1553,14 @@ FDB_DEFINE_BOOLEAN_PARAM(InOverSizePhysicalShard);
FDB_DEFINE_BOOLEAN_PARAM(PhysicalShardAvailable);
FDB_DEFINE_BOOLEAN_PARAM(MoveKeyRangeOutPhysicalShard);
// Tracks storage metrics for `keys`. This function is similar to `trackShardMetrics()` and altered for physical shard.
// This meant to be temporary. Eventually, we want a new interface to track physical shard metrics more efficiently.
ACTOR Future<Void> trackKeyRangeInPhysicalShardMetrics(Reference<IDDTxnProcessor> db,
KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics) {
// Tracks storage metrics for `keys` and updates `physicalShardStats` which is the stats for the physical shard owning
// this key range. This function is similar to `trackShardMetrics()` and altered for physical shard. This meant to be
// temporary. Eventually, we want a new interface to track physical shard metrics more efficiently.
ACTOR Future<Void> trackKeyRangeInPhysicalShardMetrics(
Reference<IDDTxnProcessor> db,
KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics,
Reference<AsyncVar<Optional<StorageMetrics>>> physicalShardStats) {
state BandwidthStatus bandwidthStatus =
shardMetrics->get().present() ? getBandwidthStatus(shardMetrics->get().get().metrics) : BandwidthStatusNormal;
state double lastLowBandwidthStartTime =
@ -1591,6 +1594,20 @@ ACTOR Future<Void> trackKeyRangeInPhysicalShardMetrics(Reference<IDDTxnProcessor
lastLowBandwidthStartTime = now();
}
bandwidthStatus = newBandwidthStatus;
// Update current physical shard aggregated stats;
if (!physicalShardStats->get().present()) {
physicalShardStats->set(metrics.first.get());
} else {
if (!shardMetrics->get().present()) {
// We collect key range stats for the first time.
physicalShardStats->set(physicalShardStats->get().get() + metrics.first.get());
} else {
physicalShardStats->set(physicalShardStats->get().get() - shardMetrics->get().get().metrics +
metrics.first.get());
}
}
shardMetrics->set(ShardMetrics(metrics.first.get(), lastLowBandwidthStartTime, shardCount));
break;
} else {
@ -1605,6 +1622,14 @@ ACTOR Future<Void> trackKeyRangeInPhysicalShardMetrics(Reference<IDDTxnProcessor
}
}
void PhysicalShardCollection::PhysicalShard::insertNewRangeData(const KeyRange& newRange) {
RangeData data;
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, newRange, data.stats, stats);
auto it = rangeData.emplace(newRange, data);
ASSERT(it.second);
}
void PhysicalShardCollection::PhysicalShard::addRange(const KeyRange& newRange) {
if (g_network->isSimulated()) {
// Test that new range must not overlap with any existing range in this shard.
@ -1613,10 +1638,7 @@ void PhysicalShardCollection::PhysicalShard::addRange(const KeyRange& newRange)
}
}
RangeData data;
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, newRange, data.stats);
rangeData.emplace(newRange, data);
insertNewRangeData(newRange);
}
void PhysicalShardCollection::PhysicalShard::removeRange(const KeyRange& outRange) {
@ -1631,10 +1653,7 @@ void PhysicalShardCollection::PhysicalShard::removeRange(const KeyRange& outRang
std::vector<KeyRangeRef> remainingRanges = range - outRange;
for (auto& r : remainingRanges) {
ASSERT(r != range);
RangeData data;
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, r, data.stats);
rangeData.emplace(r, data);
insertNewRangeData(r);
}
// Must erase last since `remainingRanges` uses data in `range`.
rangeData.erase(range);

View File

@ -21,6 +21,7 @@
#include "fdbserver/DDTeamCollection.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include <climits>
FDB_DEFINE_BOOLEAN_PARAM(IsPrimary);
FDB_DEFINE_BOOLEAN_PARAM(IsInitialTeam);
@ -575,6 +576,14 @@ public:
state int teamsToBuild;
teamsToBuild = std::max(0, std::min(desiredTeams - teamCount, maxTeams - totalTeamCount));
if (teamCount == 0 && teamsToBuild == 0 && SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE > 0) {
// Use DD_BUILD_EXTRA_TEAMS_OVERRIDE > 0 as the feature flag: Set to 0 to disable it
TraceEvent(SevWarnAlways, "BuildServerTeamsHaveTooManyUnhealthyTeams")
.detail("Hint", "Build teams may stuck and prevent DD from relocating data")
.detail("BuildExtraServerTeamsOverride", SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE);
teamsToBuild = SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE;
}
TraceEvent("BuildTeamsBegin", self->distributorId)
.detail("TeamsToBuild", teamsToBuild)
.detail("DesiredTeams", desiredTeams)
@ -583,7 +592,8 @@ public:
.detail("PerpetualWigglingTeams", wigglingTeams)
.detail("UniqueMachines", uniqueMachines)
.detail("TeamSize", self->configuration.storageTeamSize)
.detail("Servers", serverCount)
.detail("Servers", self->server_info.size())
.detail("HealthyServers", serverCount)
.detail("CurrentTrackedServerTeams", self->teams.size())
.detail("HealthyTeamCount", teamCount)
.detail("TotalTeamCount", totalTeamCount)
@ -640,6 +650,10 @@ public:
}
} else {
self->lastBuildTeamsFailed = true;
TraceEvent(SevWarnAlways, "BuildTeamsNotEnoughUniqueMachines", self->distributorId)
.detail("Primary", self->primary)
.detail("UniqueMachines", uniqueMachines)
.detail("Replication", self->configuration.storageTeamSize);
}
self->evaluateTeamQuality();
@ -3091,6 +3105,7 @@ public:
TraceEvent e("ServerStatus", self->getDistributorId());
e.detail("ServerUID", uid)
.detail("MachineIsValid", server_info[uid]->machine.isValid())
.detail("IsMachineHealthy", self->isMachineHealthy(server_info[uid]->machine))
.detail("MachineTeamSize",
server_info[uid]->machine.isValid() ? server_info[uid]->machine->machineTeams.size() : -1)
.detail("Primary", self->isPrimary());
@ -3135,13 +3150,13 @@ public:
TraceEvent("MachineInfo", self->getDistributorId())
.detail("Size", machine_info.size())
.detail("Primary", self->isPrimary());
state std::map<Standalone<StringRef>, Reference<TCMachineInfo>>::iterator machine = machine_info.begin();
state bool isMachineHealthy = false;
for (i = 0; i < machine_info.size(); i++) {
Reference<TCMachineInfo> _machine = machine->second;
if (!_machine.isValid() || machine_info.find(_machine->machineID) == machine_info.end() ||
_machine->serversOnMachine.empty()) {
bool machineIDFound = machine_info.find(_machine->machineID) != machine_info.end();
bool zeroHealthyServersOnMachine = true;
if (!_machine.isValid() || !machineIDFound || _machine->serversOnMachine.empty()) {
isMachineHealthy = false;
}
@ -3153,13 +3168,17 @@ public:
auto it = server_status.find(server->getId());
if (it != server_status.end() && !it->second.isUnhealthy()) {
isMachineHealthy = true;
zeroHealthyServersOnMachine = false;
break;
}
}
isMachineHealthy = false;
TraceEvent("MachineInfo", self->getDistributorId())
.detail("MachineInfoIndex", i)
.detail("Healthy", isMachineHealthy)
.detail("MachineIDFound", machineIDFound)
.detail("ZeroServersOnMachine", _machine->serversOnMachine.empty())
.detail("ZeroHealthyServersOnMachine", zeroHealthyServersOnMachine)
.detail("MachineID", machine->first.contents().toString())
.detail("MachineTeamOwned", machine->second->machineTeams.size())
.detail("ServerNumOnMachine", machine->second->serversOnMachine.size())
@ -3274,10 +3293,15 @@ void DDTeamCollection::traceServerInfo() const {
.detail("StoreType", server->getStoreType().toString())
.detail("InDesiredDC", server->isInDesiredDC());
}
for (auto& [serverID, server] : server_info) {
i = 0;
for (auto& server : server_info) {
const UID& serverID = server.first;
const ServerStatus& status = server_status.get(serverID);
TraceEvent("ServerStatus", distributorId)
.detail("ServerInfoIndex", i++)
.detail("ServerID", serverID)
.detail("Healthy", !server_status.get(serverID).isUnhealthy())
.detail("Healthy", !status.isUnhealthy())
.detail("StatusString", status.toString())
.detail("MachineIsValid", get(server_info, serverID)->machine.isValid())
.detail("MachineTeamSize",
get(server_info, serverID)->machine.isValid()
@ -4048,6 +4072,7 @@ void DDTeamCollection::traceAllInfo(bool shouldPrint) const {
}
}
// TODO: flush trace log to avoid trace buffer overflow when DD has too many servers and teams
TraceEvent("TraceAllInfo", distributorId).detail("Primary", primary);
traceConfigInfo();
traceServerInfo();
@ -4215,10 +4240,13 @@ int DDTeamCollection::addBestMachineTeams(int machineTeamsToBuild) {
addMachineTeam(machines);
addedMachineTeams++;
} else {
traceAllInfo(true);
// When too many teams exist in simulation, traceAllInfo will buffer too many trace logs before
// trace has a chance to flush its buffer, which causes assertion failure.
traceAllInfo(!g_network->isSimulated());
TraceEvent(SevWarn, "DataDistributionBuildTeams", distributorId)
.detail("Primary", primary)
.detail("Reason", "Unable to make desired machine Teams");
.detail("Reason", "Unable to make desired machine Teams")
.detail("Hint", "Check TraceAllInfo event");
lastBuildTeamsFailed = true;
break;
}
@ -4507,13 +4535,34 @@ int DDTeamCollection::addTeamsBestOf(int teamsToBuild, int desiredTeams, int max
// machineTeamsToBuild mimics how the teamsToBuild is calculated in buildTeams()
int machineTeamsToBuild =
std::max(0, std::min(desiredMachineTeams - healthyMachineTeamCount, maxMachineTeams - totalMachineTeamCount));
if (healthyMachineTeamCount == 0 && machineTeamsToBuild == 0 && SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE > 0) {
// Use DD_BUILD_EXTRA_TEAMS_OVERRIDE > 0 as the feature flag: Set to 0 to disable it
TraceEvent(SevWarnAlways, "BuildMachineTeamsHaveTooManyUnhealthyMachineTeams")
.detail("Hint", "Build teams may stuck and prevent DD from relocating data")
.detail("BuildExtraMachineTeamsOverride", SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE);
machineTeamsToBuild = SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE;
}
if (g_network->isSimulated() && deterministicRandom()->random01() < 0.1) {
// Test when the system has lots of unhealthy machine teams, which may prevent TC from building new teams.
// The scenario creates a deadlock situation that DD cannot relocate data.
int totalMachineTeams = nChooseK(machine_info.size(), configuration.storageTeamSize);
TraceEvent("BuildMachineTeams")
.detail("Primary", primary)
.detail("CalculatedMachineTeamsToBuild", machineTeamsToBuild)
.detail("OverwriteMachineTeamsToBuildForTesting", totalMachineTeams);
machineTeamsToBuild = totalMachineTeams;
}
{
TraceEvent te("BuildMachineTeams");
te.detail("TotalHealthyMachine", totalHealthyMachineCount)
te.detail("Primary", primary)
.detail("TotalMachines", machine_info.size())
.detail("TotalHealthyMachine", totalHealthyMachineCount)
.detail("HealthyMachineTeamCount", healthyMachineTeamCount)
.detail("DesiredMachineTeams", desiredMachineTeams)
.detail("MaxMachineTeams", maxMachineTeams)
.detail("TotalMachineTeams", totalMachineTeamCount)
.detail("MachineTeamsToBuild", machineTeamsToBuild);
// Pre-build all machine teams until we have the desired number of machine teams
if (machineTeamsToBuild > 0 || notEnoughMachineTeamsForAMachine()) {
@ -4817,6 +4866,7 @@ Reference<TCMachineInfo> DDTeamCollection::checkAndCreateMachine(Reference<TCSer
machineInfo->serversOnMachine.push_back(server);
}
server->machine = machineInfo;
ASSERT(machineInfo->machineID == machine_id); // invariant for TC to work
return machineInfo;
}

View File

@ -82,6 +82,9 @@ public:
LatencySample commitLatency;
LatencySample commitQueueLatency;
LatencySample dbWriteLatency;
std::vector<std::shared_ptr<LatencySample>> readLatency;
std::vector<std::shared_ptr<LatencySample>> scanLatency;
std::vector<std::shared_ptr<LatencySample>> readQueueLatency;
void setClosing() { this->closing = true; }
bool isClosing() const { return this->closing; }
@ -116,7 +119,22 @@ SharedRocksDBState::SharedRocksDBState(UID id)
dbWriteLatency(LatencySample("RocksDBWriteLatency",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)) {}
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)) {
for (int i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; i++) {
readLatency.push_back(std::make_shared<LatencySample>(format("RocksDBReadLatency-%d", i),
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY));
scanLatency.push_back(std::make_shared<LatencySample>(format("RocksDBScanLatency-%d", i),
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY));
readQueueLatency.push_back(std::make_shared<LatencySample>(format("RocksDBReadQueueLatency-%d", i),
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SKETCH_ACCURACY));
}
}
rocksdb::ColumnFamilyOptions SharedRocksDBState::initialCfOptions() {
rocksdb::ColumnFamilyOptions options;
@ -199,7 +217,7 @@ rocksdb::DBOptions SharedRocksDBState::initialDbOptions() {
options.statistics = rocksdb::CreateDBStatistics();
options.statistics->set_stats_level(rocksdb::StatsLevel(SERVER_KNOBS->ROCKSDB_STATS_LEVEL));
options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY;
options.db_log_dir = g_network->isSimulated() ? "" : SERVER_KNOBS->LOG_DIRECTORY;
if (SERVER_KNOBS->ROCKSDB_MUTE_LOGS) {
options.info_log = std::make_shared<NullRocksDBLogForwarder>();
@ -1461,7 +1479,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (doPerfContextMetrics) {
perfContextMetrics->reset();
}
double readBeginTime = timer_monotonic();
const double readBeginTime = timer_monotonic();
sharedState->readQueueLatency[threadIndex]->addMeasurement(readBeginTime - a.startTime);
if (a.getHistograms) {
metricPromiseStream->send(
std::make_pair(ROCKSDB_READVALUE_QUEUEWAIT_HISTOGRAM.toString(), readBeginTime - a.startTime));
@ -1515,16 +1534,17 @@ struct RocksDBKeyValueStore : IKeyValueStore {
a.result.sendError(statusToError(s));
}
const double endTime = timer_monotonic();
if (a.getHistograms) {
double currTime = timer_monotonic();
metricPromiseStream->send(
std::make_pair(ROCKSDB_READVALUE_ACTION_HISTOGRAM.toString(), currTime - readBeginTime));
std::make_pair(ROCKSDB_READVALUE_ACTION_HISTOGRAM.toString(), endTime - readBeginTime));
metricPromiseStream->send(
std::make_pair(ROCKSDB_READVALUE_LATENCY_HISTOGRAM.toString(), currTime - a.startTime));
std::make_pair(ROCKSDB_READVALUE_LATENCY_HISTOGRAM.toString(), endTime - a.startTime));
}
if (doPerfContextMetrics) {
perfContextMetrics->set(threadIndex);
}
sharedState->readLatency[threadIndex]->addMeasurement(endTime - readBeginTime);
}
struct ReadValuePrefixAction : TypedAction<Reader, ReadValuePrefixAction> {
@ -1546,7 +1566,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (doPerfContextMetrics) {
perfContextMetrics->reset();
}
double readBeginTime = timer_monotonic();
const double readBeginTime = timer_monotonic();
sharedState->readQueueLatency[threadIndex]->addMeasurement(readBeginTime - a.startTime);
if (a.getHistograms) {
metricPromiseStream->send(
std::make_pair(ROCKSDB_READPREFIX_QUEUEWAIT_HISTOGRAM.toString(), readBeginTime - a.startTime));
@ -1598,16 +1619,17 @@ struct RocksDBKeyValueStore : IKeyValueStore {
logRocksDBError(id, s, "ReadValuePrefix");
a.result.sendError(statusToError(s));
}
const double endTime = timer_monotonic();
if (a.getHistograms) {
double currTime = timer_monotonic();
metricPromiseStream->send(
std::make_pair(ROCKSDB_READPREFIX_ACTION_HISTOGRAM.toString(), currTime - readBeginTime));
std::make_pair(ROCKSDB_READPREFIX_ACTION_HISTOGRAM.toString(), endTime - readBeginTime));
metricPromiseStream->send(
std::make_pair(ROCKSDB_READPREFIX_LATENCY_HISTOGRAM.toString(), currTime - a.startTime));
std::make_pair(ROCKSDB_READPREFIX_LATENCY_HISTOGRAM.toString(), endTime - a.startTime));
}
if (doPerfContextMetrics) {
perfContextMetrics->set(threadIndex);
}
sharedState->readLatency[threadIndex]->addMeasurement(endTime - readBeginTime);
}
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
@ -1628,7 +1650,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (doPerfContextMetrics) {
perfContextMetrics->reset();
}
double readBeginTime = timer_monotonic();
const double readBeginTime = timer_monotonic();
sharedState->readQueueLatency[threadIndex]->addMeasurement(readBeginTime - a.startTime);
if (a.getHistograms) {
metricPromiseStream->send(
std::make_pair(ROCKSDB_READRANGE_QUEUEWAIT_HISTOGRAM.toString(), readBeginTime - a.startTime));
@ -1722,16 +1745,17 @@ struct RocksDBKeyValueStore : IKeyValueStore {
result.readThrough = result[result.size() - 1].key;
}
a.result.send(result);
const double endTime = timer_monotonic();
if (a.getHistograms) {
double currTime = timer_monotonic();
metricPromiseStream->send(
std::make_pair(ROCKSDB_READRANGE_ACTION_HISTOGRAM.toString(), currTime - readBeginTime));
std::make_pair(ROCKSDB_READRANGE_ACTION_HISTOGRAM.toString(), endTime - readBeginTime));
metricPromiseStream->send(
std::make_pair(ROCKSDB_READRANGE_LATENCY_HISTOGRAM.toString(), currTime - a.startTime));
std::make_pair(ROCKSDB_READRANGE_LATENCY_HISTOGRAM.toString(), endTime - a.startTime));
}
if (doPerfContextMetrics) {
perfContextMetrics->set(threadIndex);
}
sharedState->scanLatency[threadIndex]->addMeasurement(endTime - readBeginTime);
}
};

View File

@ -365,7 +365,7 @@ rocksdb::Options getOptions() {
options.write_buffer_size = SERVER_KNOBS->ROCKSDB_CF_WRITE_BUFFER_SIZE;
options.statistics = rocksdb::CreateDBStatistics();
options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers);
options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY;
options.db_log_dir = g_network->isSimulated() ? "" : SERVER_KNOBS->LOG_DIRECTORY;
return options;
}

View File

@ -23,6 +23,7 @@
#include <type_traits>
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/SystemData.h"
#include "flow/ActorCollection.h"
#include "fdbrpc/simulator.h"
@ -653,6 +654,63 @@ ACTOR Future<int64_t> getVersionOffset(Database cx,
}
}
// Returns DC lag for simulation runs
ACTOR Future<Version> getDatacenterLag(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
loop {
if (!g_network->isSimulated() || g_simulator->usableRegions == 1) {
return 0;
}
state Optional<TLogInterface> primaryLog;
state Optional<TLogInterface> remoteLog;
if (dbInfo->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
for (const auto& logset : dbInfo->get().logSystemConfig.tLogs) {
if (logset.isLocal && logset.locality != tagLocalitySatellite) {
for (const auto& tlog : logset.tLogs) {
if (tlog.present()) {
primaryLog = tlog.interf();
break;
}
}
}
if (!logset.isLocal) {
for (const auto& tlog : logset.tLogs) {
if (tlog.present()) {
remoteLog = tlog.interf();
break;
}
}
}
}
}
if (!primaryLog.present() || !remoteLog.present()) {
wait(dbInfo->onChange());
continue;
}
ASSERT(primaryLog.present());
ASSERT(remoteLog.present());
state Future<Void> onChange = dbInfo->onChange();
loop {
state Future<TLogQueuingMetricsReply> primaryMetrics =
brokenPromiseToNever(primaryLog.get().getQueuingMetrics.getReply(TLogQueuingMetricsRequest()));
state Future<TLogQueuingMetricsReply> remoteMetrics =
brokenPromiseToNever(remoteLog.get().getQueuingMetrics.getReply(TLogQueuingMetricsRequest()));
wait((success(primaryMetrics) && success(remoteMetrics)) || onChange);
if (onChange.isReady()) {
break;
}
TraceEvent("DCLag").detail("Primary", primaryMetrics.get().v).detail("Remote", remoteMetrics.get().v);
ASSERT(primaryMetrics.get().v >= 0 && remoteMetrics.get().v >= 0);
return primaryMetrics.get().v - remoteMetrics.get().v;
}
}
}
ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
@ -780,6 +838,8 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
state Future<bool> dataDistributionActive;
state Future<bool> storageServersRecruiting;
state Future<int64_t> versionOffset;
state Future<Version> dcLag;
state Version maxDcLag = 30e6;
auto traceMessage = "QuietDatabase" + phase + "Begin";
TraceEvent(traceMessage.c_str()).log();
@ -817,10 +877,11 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
dataDistributionActive = getDataDistributionActive(cx, distributorWorker);
storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID);
versionOffset = getVersionOffset(cx, distributorWorker, dbInfo);
dcLag = getDatacenterLag(cx, dbInfo);
wait(success(dataInFlight) && success(tLogQueueInfo) && success(dataDistributionQueueSize) &&
success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) &&
success(storageServersRecruiting) && success(versionOffset));
success(storageServersRecruiting) && success(versionOffset) && success(dcLag));
maxVersionOffset += dbInfo->get().recoveryCount * SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT;
@ -836,7 +897,8 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
.add(evt, "MaxStorageQueueSize", storageQueueSize.get(), maxStorageServerQueueGate)
.add(evt, "DataDistributionActive", dataDistributionActive.get(), true, std::equal_to<>())
.add(evt, "StorageServersRecruiting", storageServersRecruiting.get(), false, std::equal_to<>())
.add(evt, "VersionOffset", versionOffset.get(), maxVersionOffset);
.add(evt, "VersionOffset", versionOffset.get(), maxVersionOffset)
.add(evt, "DatacenterLag", dcLag.get(), maxDcLag);
evt.detail("RecoveryCount", dbInfo->get().recoveryCount).detail("NumSuccesses", numSuccesses);
evt.log();

View File

@ -113,7 +113,7 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
rocksdb::Options getOptions() {
rocksdb::Options options({}, getCFOptions());
options.create_if_missing = true;
options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY;
options.db_log_dir = g_network->isSimulated() ? "" : SERVER_KNOBS->LOG_DIRECTORY;
return options;
}

View File

@ -1622,6 +1622,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
Version& endVersion) {
ASSERT(!messages.getLength());
int versionCount = 0;
auto& deque = getVersionMessages(self, tag);
//TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size());
@ -1652,6 +1653,23 @@ void peekMessagesFromMemory(Reference<LogData> self,
DEBUG_TAGS_AND_MESSAGE(
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset), self->logId)
.detail("PeekTag", tag);
versionCount++;
}
if (versionCount == 0) {
++self->emptyPeeks;
} else {
++self->nonEmptyPeeks;
// TODO (version vector) check if this should be included in "status details" json
if (self->peekVersionCounts.find(tag) == self->peekVersionCounts.end()) {
UID ssID = deterministicRandom()->randomUniqueID();
std::string s = "PeekVersionCounts " + tag.toString();
self->peekVersionCounts.try_emplace(
tag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SKETCH_ACCURACY);
}
LatencySample& sample = self->peekVersionCounts.at(tag);
sample.addMeasurement(versionCount);
}
}

View File

@ -2657,16 +2657,17 @@ public:
} else if (cacheEntry.reading()) {
// This is very unlikely, maybe impossible in the current pager use cases
// Wait for the outstanding read to finish, then start the write
cacheEntry.writeFuture = mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
success(cacheEntry.readFuture), [=](Void) { return writePhysicalPage(reason, level, pageIDs, data); });
cacheEntry.writeFuture = mapAsync(success(cacheEntry.readFuture),
[=](Void) { return writePhysicalPage(reason, level, pageIDs, data); });
}
// If the page is being written, wait for this write before issuing the new write to ensure the
// writes happen in the correct order
else if (cacheEntry.writing()) {
// This is very unlikely, maybe impossible in the current pager use cases
// Wait for the previous write to finish, then start new write
cacheEntry.writeFuture = mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
cacheEntry.writeFuture, [=](Void) { return writePhysicalPage(reason, level, pageIDs, data); });
cacheEntry.writeFuture =
mapAsync(cacheEntry.writeFuture, [=](Void) { return writePhysicalPage(reason, level, pageIDs, data); });
} else {
cacheEntry.writeFuture = detach(writePhysicalPage(reason, level, pageIDs, data));
}

View File

@ -284,7 +284,8 @@ public:
StorageMetrics const& metrics,
std::vector<ShardsAffectedByTeamFailure::Team> teams,
PhysicalShardCreationTime whenCreated)
: txnProcessor(txnProcessor), id(id), metrics(metrics), teams(teams), whenCreated(whenCreated) {}
: txnProcessor(txnProcessor), id(id), metrics(metrics),
stats(makeReference<AsyncVar<Optional<StorageMetrics>>>()), teams(teams), whenCreated(whenCreated) {}
// Adds `newRange` to this physical shard and starts monitoring the shard.
void addRange(const KeyRange& newRange);
@ -297,15 +298,20 @@ public:
Reference<IDDTxnProcessor> txnProcessor;
uint64_t id; // physical shard id (never changed)
StorageMetrics metrics; // current metrics, updated by shardTracker
// todo(zhewu): combine above metrics with stats. They are redundant.
Reference<AsyncVar<Optional<StorageMetrics>>> stats; // Stats of this physical shard.
std::vector<ShardsAffectedByTeamFailure::Team> teams; // which team owns this physical shard (never changed)
PhysicalShardCreationTime whenCreated; // when this physical shard is created (never changed)
struct RangeData {
Future<Void> trackMetrics;
Reference<AsyncVar<Optional<ShardMetrics>>>
stats; // TODO(zhewu): aggregate all metrics to a single physical shard metrics.
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
std::unordered_map<KeyRange, RangeData> rangeData;
private:
// Inserts a new key range into this physical shard. `newRange` must not exist in this shard already.
void insertNewRangeData(const KeyRange& newRange);
};
// Generate a random physical shard ID, which is not UID().first() nor anonymousShardId.first()

View File

@ -1045,6 +1045,9 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
}
bool degradedPeer = false;
bool disconnectedPeer = false;
// If peer->lastLoggedTime == 0, we just started monitor this peer and haven't logged it once yet.
double lastLoggedTime = peer->lastLoggedTime <= 0.0 ? peer->lastConnectTime : peer->lastLoggedTime;
if ((workerLocation == Primary && addressInDbAndPrimaryDc(address, dbInfo)) ||
(workerLocation == Remote && addressInDbAndRemoteDc(address, dbInfo))) {
// Monitors intra DC latencies between servers that in the primary or remote DC's transaction
@ -1062,7 +1065,7 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
if (disconnectedPeer || degradedPeer) {
TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("Peer", address)
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("Elapsed", now() - lastLoggedTime)
.detail("Disconnected", disconnectedPeer)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
@ -1093,7 +1096,7 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("Peer", address)
.detail("Satellite", true)
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("Elapsed", now() - lastLoggedTime)
.detail("Disconnected", disconnectedPeer)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
@ -2410,7 +2413,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
ReplyPromise<InitializeBackupReply> backupReady = req.reply;
backupWorkerCache.set(req.reqId, backupReady.getFuture());
Future<Void> backupProcess = backupWorker(recruited, req, dbInfo);
backupProcess = storageCache.removeOnReady(req.reqId, backupProcess);
backupProcess = backupWorkerCache.removeOnReady(req.reqId, backupProcess);
errorForwarders.add(forwardError(errors, Role::BACKUP, recruited.id(), backupProcess));
TraceEvent("BackupInitRequest", req.reqId).detail("BackupId", recruited.id());
InitializeBackupReply reply(recruited, req.backupEpoch);

View File

@ -448,6 +448,27 @@ void bindDeterministicRandomToOpenssl() {
#endif // OPENSSL_IS_BORINGSSL
}
int nChooseK(int n, int k) {
assert(n >= k && k >= 0);
if (k == 0) {
return 1;
}
if (k > n / 2) {
return nChooseK(n, n - k);
}
long ret = 1;
// To avoid integer overflow, we do n/1 * (n-1)/2 * (n-2)/3 * (n-i+1)/i, where i = k
for (int i = 1; i <= k; ++i) {
ret *= n - i + 1;
ret /= i;
}
ASSERT(ret <= INT_MAX);
return ret;
}
namespace {
// Simple message for flatbuffers unittests
struct Int {

View File

@ -100,6 +100,9 @@ extern StringRef strinc(StringRef const& str, Arena& arena);
extern Standalone<StringRef> addVersionStampAtEnd(StringRef const& str);
extern StringRef addVersionStampAtEnd(StringRef const& str, Arena& arena);
// Return the number of combinations to choose k items out of n choices
int nChooseK(int n, int k);
template <typename Iter>
StringRef concatenate(Iter b, Iter const& e, Arena& arena) {
int rsize = 0;

View File

@ -344,10 +344,10 @@ Future<Void> storeOrThrow(T& out, Future<Optional<T>> what, Error e = key_not_fo
}
// Waits for a future to be ready, and then applies an asynchronous function to it.
ACTOR template <class T, class F, class U = decltype(std::declval<F>()(std::declval<T>()).getValue())>
Future<U> mapAsync(Future<T> what, F actorFunc) {
ACTOR template <class T, class F>
Future<decltype(std::declval<F>()(std::declval<T>()).getValue())> mapAsync(Future<T> what, F actorFunc) {
T val = wait(what);
U ret = wait(actorFunc(val));
decltype(std::declval<F>()(std::declval<T>()).getValue()) ret = wait(actorFunc(val));
return ret;
}

View File

@ -289,11 +289,9 @@ Future<Reference<IConnection>> INetworkConnections::connect(const std::string& h
// Wait for the endpoint to return, then wait for connect(endpoint) and return it.
// Template types are being provided explicitly because they can't be automatically deduced for some reason.
return mapAsync<NetworkAddress,
std::function<Future<Reference<IConnection>>(NetworkAddress const&)>,
Reference<IConnection>>(
pickEndpoint,
[=](NetworkAddress const& addr) -> Future<Reference<IConnection>> { return connectExternal(addr); });
return mapAsync(pickEndpoint, [=](NetworkAddress const& addr) -> Future<Reference<IConnection>> {
return connectExternal(addr);
});
}
IUDPSocket::~IUDPSocket() {}