diff --git a/cmake/AddFdbTest.cmake b/cmake/AddFdbTest.cmake index 9a2f098d85..87f2a0271f 100644 --- a/cmake/AddFdbTest.cmake +++ b/cmake/AddFdbTest.cmake @@ -528,7 +528,7 @@ function(add_python_venv_test) if(USE_SANITIZER) set(test_env_vars "${test_env_vars};${SANITIZER_OPTIONS}") endif() - set_tests_properties("${T_NAME}" PROPERTIES ENVIRONMENT ${test_env_vars}) + set_tests_properties("${T_NAME}" PROPERTIES ENVIRONMENT "${test_env_vars}") endfunction() # Creates a single cluster before running the specified command (usually a ctest test) diff --git a/cmake/ConfigureCompiler.cmake b/cmake/ConfigureCompiler.cmake index 01c94d23a1..0d8b6a83f5 100644 --- a/cmake/ConfigureCompiler.cmake +++ b/cmake/ConfigureCompiler.cmake @@ -178,6 +178,13 @@ else() add_compile_options(-ggdb1) endif() + if(CLANG) + # The default DWARF 5 format does not play nicely with GNU Binutils 2.39 and earlier, resulting + # in tools like addr2line omitting line numbers. We can consider removing this once we are able + # to use a version that has a fix. + add_compile_options(-gdwarf-4) + endif() + if(NOT FDB_RELEASE) # Enable compression of the debug sections. This reduces the size of the binaries several times. # We do not enable it release builds, because CPack fails to generate debuginfo packages when diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 67838eb3f7..43f3274149 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -1538,13 +1538,11 @@ void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Op persistentOptions.emplace_back(option, value.castTo>()); } - if (itr->first == FDBTransactionOptions::TIMEOUT) { - setTimeout(value); - } - auto tr = getTransaction(); if (tr.transaction) { tr.transaction->setOption(option, value); + } else if (itr->first == FDBTransactionOptions::TIMEOUT) { + setTimeout(value); } } diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 4bdfe4bce9..1c3a9637e5 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -307,6 +307,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL, 10 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10); init( TENANT_CACHE_STORAGE_USAGE_TRACE_INTERVAL, 300 ); init( CP_FETCH_TENANTS_OVER_STORAGE_QUOTA_INTERVAL, 5 ); if( randomize && BUGGIFY ) CP_FETCH_TENANTS_OVER_STORAGE_QUOTA_INTERVAL = deterministicRandom()->randomInt(1, 10); + init( DD_BUILD_EXTRA_TEAMS_OVERRIDE, 10 ); if( randomize && BUGGIFY ) DD_BUILD_EXTRA_TEAMS_OVERRIDE = 2; // TeamRemover init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true diff --git a/fdbclient/include/fdbclient/MetaclusterManagement.actor.h b/fdbclient/include/fdbclient/MetaclusterManagement.actor.h index 83c1e30d8e..63e439b159 100644 --- a/fdbclient/include/fdbclient/MetaclusterManagement.actor.h +++ b/fdbclient/include/fdbclient/MetaclusterManagement.actor.h @@ -1440,7 +1440,8 @@ struct DeleteTenantImpl { Reference tr) { state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId) { + if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId || + tenantEntry.get().tenantState == TenantState::RENAMING_TO) { throw tenant_not_found(); } @@ -1452,11 +1453,6 @@ struct DeleteTenantImpl { } if (tenantEntry.get().tenantState != TenantState::REMOVING) { - // Disallow removing the "new" name of a renamed tenant before it completes - if (tenantEntry.get().tenantState == TenantState::RENAMING_TO) { - throw tenant_not_found(); - } - state TenantMapEntry updatedEntry = tenantEntry.get(); // Check if we are deleting a tenant in the middle of a rename updatedEntry.tenantState = TenantState::REMOVING; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 608a225eb0..bf5c9c09a5 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -269,6 +269,7 @@ public: double DD_FAILURE_TIME; double DD_ZERO_HEALTHY_TEAM_DELAY; + int DD_BUILD_EXTRA_TEAMS_OVERRIDE; // build extra teams to allow data movement to progress. must be larger than 0 // Run storage enginee on a child process on the same machine with storage process bool REMOTE_KV_STORE; diff --git a/fdbrpc/AsyncFileCached.actor.cpp b/fdbrpc/AsyncFileCached.actor.cpp index 1d56974094..340b925ef3 100644 --- a/fdbrpc/AsyncFileCached.actor.cpp +++ b/fdbrpc/AsyncFileCached.actor.cpp @@ -245,8 +245,7 @@ Future AsyncFileCached::changeFileSize(int64_t size) { // Wait for the page truncations to finish, then truncate the underlying file // Template types are being provided explicitly because they can't be automatically deduced for some reason. - return mapAsync(Void)>, Void>( - waitForAll(actors), [=](Void _) -> Future { return uncached->truncate(size); }); + return mapAsync(waitForAll(actors), [=](Void _) -> Future { return uncached->truncate(size); }); } Future AsyncFileCached::flush() { diff --git a/fdbrpc/IPAllowList.cpp b/fdbrpc/IPAllowList.cpp index cfb8baa57d..862c871ea9 100644 --- a/fdbrpc/IPAllowList.cpp +++ b/fdbrpc/IPAllowList.cpp @@ -216,7 +216,7 @@ struct SubNetTest { return IPAddress(arr[0]); } else { std::array res; - memcpy(res.data(), arr, 4); + memcpy(res.data(), arr, 16); return IPAddress(res); } } diff --git a/fdbrpc/include/fdbrpc/AsyncFileChaos.h b/fdbrpc/include/fdbrpc/AsyncFileChaos.h index 0df8a911a3..6cb0e0fca5 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileChaos.h +++ b/fdbrpc/include/fdbrpc/AsyncFileChaos.h @@ -74,8 +74,8 @@ public: // Wait for diskDelay before submitting the I/O // Template types are being provided explicitly because they can't be automatically deduced for some reason. // Capture file by value in case this is destroyed during the delay - return mapAsync(Void)>, int>( - delay(diskDelay), [=, file = file](Void _) -> Future { return file->read(data, length, offset); }); + return mapAsync(delay(diskDelay), + [=, file = file](Void _) -> Future { return file->read(data, length, offset); }); } Future write(void const* data, int length, int64_t offset) override { @@ -115,20 +115,19 @@ public: // Wait for diskDelay before submitting the I/O // Capture file by value in case this is destroyed during the delay - return mapAsync(Void)>, Void>( - delay(getDelay()), [=, file = file](Void _) -> Future { - if (pdata) { - return map(holdWhile(arena, file->write(pdata, length, offset)), - [corruptedBlock, file = file](auto res) { - if (g_network->isSimulated()) { - g_simulator->corruptedBlocks.emplace(file->getFilename(), corruptedBlock); - } - return res; - }); - } + return mapAsync(delay(getDelay()), [=, file = file](Void _) -> Future { + if (pdata) { + return map(holdWhile(arena, file->write(pdata, length, offset)), + [corruptedBlock, file = file](auto res) { + if (g_network->isSimulated()) { + g_simulator->corruptedBlocks.emplace(file->getFilename(), corruptedBlock); + } + return res; + }); + } - return file->write(data, length, offset); - }); + return file->write(data, length, offset); + }); } Future truncate(int64_t size) override { @@ -138,17 +137,16 @@ public: // Wait for diskDelay before submitting the I/O // Capture file by value in case this is destroyed during the delay - return mapAsync(Void)>, Void>( - delay(diskDelay), [size, file = file](Void _) -> Future { - constexpr auto maxBlockValue = - std::numeric_limitscorruptedBlocks)::key_type::second_type>::max(); - auto firstDeletedBlock = - g_simulator->corruptedBlocks.lower_bound(std::make_pair(file->getFilename(), size / 4096)); - auto lastFileBlock = - g_simulator->corruptedBlocks.upper_bound(std::make_pair(file->getFilename(), maxBlockValue)); - g_simulator->corruptedBlocks.erase(firstDeletedBlock, lastFileBlock); - return file->truncate(size); - }); + return mapAsync(delay(diskDelay), [size, file = file](Void _) -> Future { + constexpr auto maxBlockValue = + std::numeric_limitscorruptedBlocks)::key_type::second_type>::max(); + auto firstDeletedBlock = + g_simulator->corruptedBlocks.lower_bound(std::make_pair(file->getFilename(), size / 4096)); + auto lastFileBlock = + g_simulator->corruptedBlocks.upper_bound(std::make_pair(file->getFilename(), maxBlockValue)); + g_simulator->corruptedBlocks.erase(firstDeletedBlock, lastFileBlock); + return file->truncate(size); + }); } Future sync() override { @@ -158,8 +156,7 @@ public: // Wait for diskDelay before submitting the I/O // Capture file by value in case this is destroyed during the delay - return mapAsync(Void)>, Void>( - delay(diskDelay), [=, file = file](Void _) -> Future { return file->sync(); }); + return mapAsync(delay(diskDelay), [=, file = file](Void _) -> Future { return file->sync(); }); } Future size() const override { @@ -169,8 +166,7 @@ public: // Wait for diskDelay before submitting the I/O // Capture file by value in case this is destroyed during the delay - return mapAsync(Void)>, int64_t>( - delay(diskDelay), [=, file = file](Void _) -> Future { return file->size(); }); + return mapAsync(delay(diskDelay), [=, file = file](Void _) -> Future { return file->size(); }); } int64_t debugFD() const override { return file->debugFD(); } diff --git a/fdbrpc/include/fdbrpc/LoadBalance.actor.h b/fdbrpc/include/fdbrpc/LoadBalance.actor.h index 8a24efdfd7..1e12db19ee 100644 --- a/fdbrpc/include/fdbrpc/LoadBalance.actor.h +++ b/fdbrpc/include/fdbrpc/LoadBalance.actor.h @@ -299,14 +299,13 @@ struct RequestData : NonCopyable { requestStarted = false; if (backoff > 0) { - response = mapAsync(Void)>, Reply>( - delay(backoff), [this, stream, &request, model, alternatives, channel](Void _) { - requestStarted = true; - modelHolder = Reference(new ModelHolder(model, stream->getEndpoint().token.first())); - Future resp = stream->tryGetReply(request); - maybeDuplicateTSSRequest(stream, request, model, resp, alternatives, channel); - return resp; - }); + response = mapAsync(delay(backoff), [this, stream, &request, model, alternatives, channel](Void _) { + requestStarted = true; + modelHolder = Reference(new ModelHolder(model, stream->getEndpoint().token.first())); + Future resp = stream->tryGetReply(request); + maybeDuplicateTSSRequest(stream, request, model, resp, alternatives, channel); + return resp; + }); } else { requestStarted = true; modelHolder = Reference(new ModelHolder(model, stream->getEndpoint().token.first())); diff --git a/fdbserver/DDShardTracker.actor.cpp b/fdbserver/DDShardTracker.actor.cpp index 47b1d3fc26..7d135d8204 100644 --- a/fdbserver/DDShardTracker.actor.cpp +++ b/fdbserver/DDShardTracker.actor.cpp @@ -1553,11 +1553,14 @@ FDB_DEFINE_BOOLEAN_PARAM(InOverSizePhysicalShard); FDB_DEFINE_BOOLEAN_PARAM(PhysicalShardAvailable); FDB_DEFINE_BOOLEAN_PARAM(MoveKeyRangeOutPhysicalShard); -// Tracks storage metrics for `keys`. This function is similar to `trackShardMetrics()` and altered for physical shard. -// This meant to be temporary. Eventually, we want a new interface to track physical shard metrics more efficiently. -ACTOR Future trackKeyRangeInPhysicalShardMetrics(Reference db, - KeyRange keys, - Reference>> shardMetrics) { +// Tracks storage metrics for `keys` and updates `physicalShardStats` which is the stats for the physical shard owning +// this key range. This function is similar to `trackShardMetrics()` and altered for physical shard. This meant to be +// temporary. Eventually, we want a new interface to track physical shard metrics more efficiently. +ACTOR Future trackKeyRangeInPhysicalShardMetrics( + Reference db, + KeyRange keys, + Reference>> shardMetrics, + Reference>> physicalShardStats) { state BandwidthStatus bandwidthStatus = shardMetrics->get().present() ? getBandwidthStatus(shardMetrics->get().get().metrics) : BandwidthStatusNormal; state double lastLowBandwidthStartTime = @@ -1591,6 +1594,20 @@ ACTOR Future trackKeyRangeInPhysicalShardMetrics(Referenceget().present()) { + physicalShardStats->set(metrics.first.get()); + } else { + if (!shardMetrics->get().present()) { + // We collect key range stats for the first time. + physicalShardStats->set(physicalShardStats->get().get() + metrics.first.get()); + } else { + physicalShardStats->set(physicalShardStats->get().get() - shardMetrics->get().get().metrics + + metrics.first.get()); + } + } + shardMetrics->set(ShardMetrics(metrics.first.get(), lastLowBandwidthStartTime, shardCount)); break; } else { @@ -1605,6 +1622,14 @@ ACTOR Future trackKeyRangeInPhysicalShardMetrics(Reference>>(); + data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, newRange, data.stats, stats); + auto it = rangeData.emplace(newRange, data); + ASSERT(it.second); +} + void PhysicalShardCollection::PhysicalShard::addRange(const KeyRange& newRange) { if (g_network->isSimulated()) { // Test that new range must not overlap with any existing range in this shard. @@ -1613,10 +1638,7 @@ void PhysicalShardCollection::PhysicalShard::addRange(const KeyRange& newRange) } } - RangeData data; - data.stats = makeReference>>(); - data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, newRange, data.stats); - rangeData.emplace(newRange, data); + insertNewRangeData(newRange); } void PhysicalShardCollection::PhysicalShard::removeRange(const KeyRange& outRange) { @@ -1631,10 +1653,7 @@ void PhysicalShardCollection::PhysicalShard::removeRange(const KeyRange& outRang std::vector remainingRanges = range - outRange; for (auto& r : remainingRanges) { ASSERT(r != range); - RangeData data; - data.stats = makeReference>>(); - data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, r, data.stats); - rangeData.emplace(r, data); + insertNewRangeData(r); } // Must erase last since `remainingRanges` uses data in `range`. rangeData.erase(range); diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 23c9e1bcd0..de8dca399a 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -21,6 +21,7 @@ #include "fdbserver/DDTeamCollection.h" #include "flow/Trace.h" #include "flow/actorcompiler.h" // This must be the last #include. +#include FDB_DEFINE_BOOLEAN_PARAM(IsPrimary); FDB_DEFINE_BOOLEAN_PARAM(IsInitialTeam); @@ -575,6 +576,14 @@ public: state int teamsToBuild; teamsToBuild = std::max(0, std::min(desiredTeams - teamCount, maxTeams - totalTeamCount)); + if (teamCount == 0 && teamsToBuild == 0 && SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE > 0) { + // Use DD_BUILD_EXTRA_TEAMS_OVERRIDE > 0 as the feature flag: Set to 0 to disable it + TraceEvent(SevWarnAlways, "BuildServerTeamsHaveTooManyUnhealthyTeams") + .detail("Hint", "Build teams may stuck and prevent DD from relocating data") + .detail("BuildExtraServerTeamsOverride", SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE); + teamsToBuild = SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE; + } + TraceEvent("BuildTeamsBegin", self->distributorId) .detail("TeamsToBuild", teamsToBuild) .detail("DesiredTeams", desiredTeams) @@ -583,7 +592,8 @@ public: .detail("PerpetualWigglingTeams", wigglingTeams) .detail("UniqueMachines", uniqueMachines) .detail("TeamSize", self->configuration.storageTeamSize) - .detail("Servers", serverCount) + .detail("Servers", self->server_info.size()) + .detail("HealthyServers", serverCount) .detail("CurrentTrackedServerTeams", self->teams.size()) .detail("HealthyTeamCount", teamCount) .detail("TotalTeamCount", totalTeamCount) @@ -640,6 +650,10 @@ public: } } else { self->lastBuildTeamsFailed = true; + TraceEvent(SevWarnAlways, "BuildTeamsNotEnoughUniqueMachines", self->distributorId) + .detail("Primary", self->primary) + .detail("UniqueMachines", uniqueMachines) + .detail("Replication", self->configuration.storageTeamSize); } self->evaluateTeamQuality(); @@ -3091,6 +3105,7 @@ public: TraceEvent e("ServerStatus", self->getDistributorId()); e.detail("ServerUID", uid) .detail("MachineIsValid", server_info[uid]->machine.isValid()) + .detail("IsMachineHealthy", self->isMachineHealthy(server_info[uid]->machine)) .detail("MachineTeamSize", server_info[uid]->machine.isValid() ? server_info[uid]->machine->machineTeams.size() : -1) .detail("Primary", self->isPrimary()); @@ -3135,13 +3150,13 @@ public: TraceEvent("MachineInfo", self->getDistributorId()) .detail("Size", machine_info.size()) .detail("Primary", self->isPrimary()); - state std::map, Reference>::iterator machine = machine_info.begin(); state bool isMachineHealthy = false; for (i = 0; i < machine_info.size(); i++) { Reference _machine = machine->second; - if (!_machine.isValid() || machine_info.find(_machine->machineID) == machine_info.end() || - _machine->serversOnMachine.empty()) { + bool machineIDFound = machine_info.find(_machine->machineID) != machine_info.end(); + bool zeroHealthyServersOnMachine = true; + if (!_machine.isValid() || !machineIDFound || _machine->serversOnMachine.empty()) { isMachineHealthy = false; } @@ -3153,13 +3168,17 @@ public: auto it = server_status.find(server->getId()); if (it != server_status.end() && !it->second.isUnhealthy()) { isMachineHealthy = true; + zeroHealthyServersOnMachine = false; + break; } } - isMachineHealthy = false; TraceEvent("MachineInfo", self->getDistributorId()) .detail("MachineInfoIndex", i) .detail("Healthy", isMachineHealthy) + .detail("MachineIDFound", machineIDFound) + .detail("ZeroServersOnMachine", _machine->serversOnMachine.empty()) + .detail("ZeroHealthyServersOnMachine", zeroHealthyServersOnMachine) .detail("MachineID", machine->first.contents().toString()) .detail("MachineTeamOwned", machine->second->machineTeams.size()) .detail("ServerNumOnMachine", machine->second->serversOnMachine.size()) @@ -3274,10 +3293,15 @@ void DDTeamCollection::traceServerInfo() const { .detail("StoreType", server->getStoreType().toString()) .detail("InDesiredDC", server->isInDesiredDC()); } - for (auto& [serverID, server] : server_info) { + i = 0; + for (auto& server : server_info) { + const UID& serverID = server.first; + const ServerStatus& status = server_status.get(serverID); TraceEvent("ServerStatus", distributorId) + .detail("ServerInfoIndex", i++) .detail("ServerID", serverID) - .detail("Healthy", !server_status.get(serverID).isUnhealthy()) + .detail("Healthy", !status.isUnhealthy()) + .detail("StatusString", status.toString()) .detail("MachineIsValid", get(server_info, serverID)->machine.isValid()) .detail("MachineTeamSize", get(server_info, serverID)->machine.isValid() @@ -4048,6 +4072,7 @@ void DDTeamCollection::traceAllInfo(bool shouldPrint) const { } } + // TODO: flush trace log to avoid trace buffer overflow when DD has too many servers and teams TraceEvent("TraceAllInfo", distributorId).detail("Primary", primary); traceConfigInfo(); traceServerInfo(); @@ -4215,10 +4240,13 @@ int DDTeamCollection::addBestMachineTeams(int machineTeamsToBuild) { addMachineTeam(machines); addedMachineTeams++; } else { - traceAllInfo(true); + // When too many teams exist in simulation, traceAllInfo will buffer too many trace logs before + // trace has a chance to flush its buffer, which causes assertion failure. + traceAllInfo(!g_network->isSimulated()); TraceEvent(SevWarn, "DataDistributionBuildTeams", distributorId) .detail("Primary", primary) - .detail("Reason", "Unable to make desired machine Teams"); + .detail("Reason", "Unable to make desired machine Teams") + .detail("Hint", "Check TraceAllInfo event"); lastBuildTeamsFailed = true; break; } @@ -4507,13 +4535,34 @@ int DDTeamCollection::addTeamsBestOf(int teamsToBuild, int desiredTeams, int max // machineTeamsToBuild mimics how the teamsToBuild is calculated in buildTeams() int machineTeamsToBuild = std::max(0, std::min(desiredMachineTeams - healthyMachineTeamCount, maxMachineTeams - totalMachineTeamCount)); + if (healthyMachineTeamCount == 0 && machineTeamsToBuild == 0 && SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE > 0) { + // Use DD_BUILD_EXTRA_TEAMS_OVERRIDE > 0 as the feature flag: Set to 0 to disable it + TraceEvent(SevWarnAlways, "BuildMachineTeamsHaveTooManyUnhealthyMachineTeams") + .detail("Hint", "Build teams may stuck and prevent DD from relocating data") + .detail("BuildExtraMachineTeamsOverride", SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE); + machineTeamsToBuild = SERVER_KNOBS->DD_BUILD_EXTRA_TEAMS_OVERRIDE; + } + if (g_network->isSimulated() && deterministicRandom()->random01() < 0.1) { + // Test when the system has lots of unhealthy machine teams, which may prevent TC from building new teams. + // The scenario creates a deadlock situation that DD cannot relocate data. + int totalMachineTeams = nChooseK(machine_info.size(), configuration.storageTeamSize); + TraceEvent("BuildMachineTeams") + .detail("Primary", primary) + .detail("CalculatedMachineTeamsToBuild", machineTeamsToBuild) + .detail("OverwriteMachineTeamsToBuildForTesting", totalMachineTeams); + + machineTeamsToBuild = totalMachineTeams; + } { TraceEvent te("BuildMachineTeams"); - te.detail("TotalHealthyMachine", totalHealthyMachineCount) + te.detail("Primary", primary) + .detail("TotalMachines", machine_info.size()) + .detail("TotalHealthyMachine", totalHealthyMachineCount) .detail("HealthyMachineTeamCount", healthyMachineTeamCount) .detail("DesiredMachineTeams", desiredMachineTeams) .detail("MaxMachineTeams", maxMachineTeams) + .detail("TotalMachineTeams", totalMachineTeamCount) .detail("MachineTeamsToBuild", machineTeamsToBuild); // Pre-build all machine teams until we have the desired number of machine teams if (machineTeamsToBuild > 0 || notEnoughMachineTeamsForAMachine()) { @@ -4817,6 +4866,7 @@ Reference DDTeamCollection::checkAndCreateMachine(ReferenceserversOnMachine.push_back(server); } server->machine = machineInfo; + ASSERT(machineInfo->machineID == machine_id); // invariant for TC to work return machineInfo; } diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index f24562a9bb..23767f318c 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -82,6 +82,9 @@ public: LatencySample commitLatency; LatencySample commitQueueLatency; LatencySample dbWriteLatency; + std::vector> readLatency; + std::vector> scanLatency; + std::vector> readQueueLatency; void setClosing() { this->closing = true; } bool isClosing() const { return this->closing; } @@ -116,7 +119,22 @@ SharedRocksDBState::SharedRocksDBState(UID id) dbWriteLatency(LatencySample("RocksDBWriteLatency", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, - SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)) {} + SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)) { + for (int i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; i++) { + readLatency.push_back(std::make_shared(format("RocksDBReadLatency-%d", i), + id, + SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)); + scanLatency.push_back(std::make_shared(format("RocksDBScanLatency-%d", i), + id, + SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)); + readQueueLatency.push_back(std::make_shared(format("RocksDBReadQueueLatency-%d", i), + id, + SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SKETCH_ACCURACY)); + } +} rocksdb::ColumnFamilyOptions SharedRocksDBState::initialCfOptions() { rocksdb::ColumnFamilyOptions options; @@ -199,7 +217,7 @@ rocksdb::DBOptions SharedRocksDBState::initialDbOptions() { options.statistics = rocksdb::CreateDBStatistics(); options.statistics->set_stats_level(rocksdb::StatsLevel(SERVER_KNOBS->ROCKSDB_STATS_LEVEL)); - options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY; + options.db_log_dir = g_network->isSimulated() ? "" : SERVER_KNOBS->LOG_DIRECTORY; if (SERVER_KNOBS->ROCKSDB_MUTE_LOGS) { options.info_log = std::make_shared(); @@ -1461,7 +1479,8 @@ struct RocksDBKeyValueStore : IKeyValueStore { if (doPerfContextMetrics) { perfContextMetrics->reset(); } - double readBeginTime = timer_monotonic(); + const double readBeginTime = timer_monotonic(); + sharedState->readQueueLatency[threadIndex]->addMeasurement(readBeginTime - a.startTime); if (a.getHistograms) { metricPromiseStream->send( std::make_pair(ROCKSDB_READVALUE_QUEUEWAIT_HISTOGRAM.toString(), readBeginTime - a.startTime)); @@ -1515,16 +1534,17 @@ struct RocksDBKeyValueStore : IKeyValueStore { a.result.sendError(statusToError(s)); } + const double endTime = timer_monotonic(); if (a.getHistograms) { - double currTime = timer_monotonic(); metricPromiseStream->send( - std::make_pair(ROCKSDB_READVALUE_ACTION_HISTOGRAM.toString(), currTime - readBeginTime)); + std::make_pair(ROCKSDB_READVALUE_ACTION_HISTOGRAM.toString(), endTime - readBeginTime)); metricPromiseStream->send( - std::make_pair(ROCKSDB_READVALUE_LATENCY_HISTOGRAM.toString(), currTime - a.startTime)); + std::make_pair(ROCKSDB_READVALUE_LATENCY_HISTOGRAM.toString(), endTime - a.startTime)); } if (doPerfContextMetrics) { perfContextMetrics->set(threadIndex); } + sharedState->readLatency[threadIndex]->addMeasurement(endTime - readBeginTime); } struct ReadValuePrefixAction : TypedAction { @@ -1546,7 +1566,8 @@ struct RocksDBKeyValueStore : IKeyValueStore { if (doPerfContextMetrics) { perfContextMetrics->reset(); } - double readBeginTime = timer_monotonic(); + const double readBeginTime = timer_monotonic(); + sharedState->readQueueLatency[threadIndex]->addMeasurement(readBeginTime - a.startTime); if (a.getHistograms) { metricPromiseStream->send( std::make_pair(ROCKSDB_READPREFIX_QUEUEWAIT_HISTOGRAM.toString(), readBeginTime - a.startTime)); @@ -1598,16 +1619,17 @@ struct RocksDBKeyValueStore : IKeyValueStore { logRocksDBError(id, s, "ReadValuePrefix"); a.result.sendError(statusToError(s)); } + const double endTime = timer_monotonic(); if (a.getHistograms) { - double currTime = timer_monotonic(); metricPromiseStream->send( - std::make_pair(ROCKSDB_READPREFIX_ACTION_HISTOGRAM.toString(), currTime - readBeginTime)); + std::make_pair(ROCKSDB_READPREFIX_ACTION_HISTOGRAM.toString(), endTime - readBeginTime)); metricPromiseStream->send( - std::make_pair(ROCKSDB_READPREFIX_LATENCY_HISTOGRAM.toString(), currTime - a.startTime)); + std::make_pair(ROCKSDB_READPREFIX_LATENCY_HISTOGRAM.toString(), endTime - a.startTime)); } if (doPerfContextMetrics) { perfContextMetrics->set(threadIndex); } + sharedState->readLatency[threadIndex]->addMeasurement(endTime - readBeginTime); } struct ReadRangeAction : TypedAction, FastAllocated { @@ -1628,7 +1650,8 @@ struct RocksDBKeyValueStore : IKeyValueStore { if (doPerfContextMetrics) { perfContextMetrics->reset(); } - double readBeginTime = timer_monotonic(); + const double readBeginTime = timer_monotonic(); + sharedState->readQueueLatency[threadIndex]->addMeasurement(readBeginTime - a.startTime); if (a.getHistograms) { metricPromiseStream->send( std::make_pair(ROCKSDB_READRANGE_QUEUEWAIT_HISTOGRAM.toString(), readBeginTime - a.startTime)); @@ -1722,16 +1745,17 @@ struct RocksDBKeyValueStore : IKeyValueStore { result.readThrough = result[result.size() - 1].key; } a.result.send(result); + const double endTime = timer_monotonic(); if (a.getHistograms) { - double currTime = timer_monotonic(); metricPromiseStream->send( - std::make_pair(ROCKSDB_READRANGE_ACTION_HISTOGRAM.toString(), currTime - readBeginTime)); + std::make_pair(ROCKSDB_READRANGE_ACTION_HISTOGRAM.toString(), endTime - readBeginTime)); metricPromiseStream->send( - std::make_pair(ROCKSDB_READRANGE_LATENCY_HISTOGRAM.toString(), currTime - a.startTime)); + std::make_pair(ROCKSDB_READRANGE_LATENCY_HISTOGRAM.toString(), endTime - a.startTime)); } if (doPerfContextMetrics) { perfContextMetrics->set(threadIndex); } + sharedState->scanLatency[threadIndex]->addMeasurement(endTime - readBeginTime); } }; diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index b2f03b8e78..d3d972689c 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -365,7 +365,7 @@ rocksdb::Options getOptions() { options.write_buffer_size = SERVER_KNOBS->ROCKSDB_CF_WRITE_BUFFER_SIZE; options.statistics = rocksdb::CreateDBStatistics(); options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers); - options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY; + options.db_log_dir = g_network->isSimulated() ? "" : SERVER_KNOBS->LOG_DIRECTORY; return options; } diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index a85cc863a3..8b01c58fdb 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -23,6 +23,7 @@ #include #include "fdbclient/FDBOptions.g.h" +#include "fdbclient/FDBTypes.h" #include "fdbclient/SystemData.h" #include "flow/ActorCollection.h" #include "fdbrpc/simulator.h" @@ -653,6 +654,63 @@ ACTOR Future getVersionOffset(Database cx, } } +// Returns DC lag for simulation runs +ACTOR Future getDatacenterLag(Database cx, Reference const> dbInfo) { + loop { + if (!g_network->isSimulated() || g_simulator->usableRegions == 1) { + return 0; + } + + state Optional primaryLog; + state Optional remoteLog; + if (dbInfo->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) { + for (const auto& logset : dbInfo->get().logSystemConfig.tLogs) { + if (logset.isLocal && logset.locality != tagLocalitySatellite) { + for (const auto& tlog : logset.tLogs) { + if (tlog.present()) { + primaryLog = tlog.interf(); + break; + } + } + } + if (!logset.isLocal) { + for (const auto& tlog : logset.tLogs) { + if (tlog.present()) { + remoteLog = tlog.interf(); + break; + } + } + } + } + } + + if (!primaryLog.present() || !remoteLog.present()) { + wait(dbInfo->onChange()); + continue; + } + + ASSERT(primaryLog.present()); + ASSERT(remoteLog.present()); + + state Future onChange = dbInfo->onChange(); + loop { + state Future primaryMetrics = + brokenPromiseToNever(primaryLog.get().getQueuingMetrics.getReply(TLogQueuingMetricsRequest())); + state Future remoteMetrics = + brokenPromiseToNever(remoteLog.get().getQueuingMetrics.getReply(TLogQueuingMetricsRequest())); + + wait((success(primaryMetrics) && success(remoteMetrics)) || onChange); + if (onChange.isReady()) { + break; + } + + TraceEvent("DCLag").detail("Primary", primaryMetrics.get().v).detail("Remote", remoteMetrics.get().v); + ASSERT(primaryMetrics.get().v >= 0 && remoteMetrics.get().v >= 0); + return primaryMetrics.get().v - remoteMetrics.get().v; + } + } +} + ACTOR Future repairDeadDatacenter(Database cx, Reference const> dbInfo, std::string context) { @@ -780,6 +838,8 @@ ACTOR Future waitForQuietDatabase(Database cx, state Future dataDistributionActive; state Future storageServersRecruiting; state Future versionOffset; + state Future dcLag; + state Version maxDcLag = 30e6; auto traceMessage = "QuietDatabase" + phase + "Begin"; TraceEvent(traceMessage.c_str()).log(); @@ -817,10 +877,11 @@ ACTOR Future waitForQuietDatabase(Database cx, dataDistributionActive = getDataDistributionActive(cx, distributorWorker); storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID); versionOffset = getVersionOffset(cx, distributorWorker, dbInfo); + dcLag = getDatacenterLag(cx, dbInfo); wait(success(dataInFlight) && success(tLogQueueInfo) && success(dataDistributionQueueSize) && success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) && - success(storageServersRecruiting) && success(versionOffset)); + success(storageServersRecruiting) && success(versionOffset) && success(dcLag)); maxVersionOffset += dbInfo->get().recoveryCount * SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT; @@ -836,7 +897,8 @@ ACTOR Future waitForQuietDatabase(Database cx, .add(evt, "MaxStorageQueueSize", storageQueueSize.get(), maxStorageServerQueueGate) .add(evt, "DataDistributionActive", dataDistributionActive.get(), true, std::equal_to<>()) .add(evt, "StorageServersRecruiting", storageServersRecruiting.get(), false, std::equal_to<>()) - .add(evt, "VersionOffset", versionOffset.get(), maxVersionOffset); + .add(evt, "VersionOffset", versionOffset.get(), maxVersionOffset) + .add(evt, "DatacenterLag", dcLag.get(), maxDcLag); evt.detail("RecoveryCount", dbInfo->get().recoveryCount).detail("NumSuccesses", numSuccesses); evt.log(); diff --git a/fdbserver/RocksDBCheckpointUtils.actor.cpp b/fdbserver/RocksDBCheckpointUtils.actor.cpp index bab72cf6e4..141fad8f7a 100644 --- a/fdbserver/RocksDBCheckpointUtils.actor.cpp +++ b/fdbserver/RocksDBCheckpointUtils.actor.cpp @@ -113,7 +113,7 @@ rocksdb::ColumnFamilyOptions getCFOptions() { rocksdb::Options getOptions() { rocksdb::Options options({}, getCFOptions()); options.create_if_missing = true; - options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY; + options.db_log_dir = g_network->isSimulated() ? "" : SERVER_KNOBS->LOG_DIRECTORY; return options; } diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 07a7ff548e..1d7a2506f6 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1622,6 +1622,7 @@ void peekMessagesFromMemory(Reference self, Version& endVersion) { ASSERT(!messages.getLength()); + int versionCount = 0; auto& deque = getVersionMessages(self, tag); //TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size()); @@ -1652,6 +1653,23 @@ void peekMessagesFromMemory(Reference self, DEBUG_TAGS_AND_MESSAGE( "TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset), self->logId) .detail("PeekTag", tag); + versionCount++; + } + + if (versionCount == 0) { + ++self->emptyPeeks; + } else { + ++self->nonEmptyPeeks; + + // TODO (version vector) check if this should be included in "status details" json + if (self->peekVersionCounts.find(tag) == self->peekVersionCounts.end()) { + UID ssID = deterministicRandom()->randomUniqueID(); + std::string s = "PeekVersionCounts " + tag.toString(); + self->peekVersionCounts.try_emplace( + tag, s, ssID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SKETCH_ACCURACY); + } + LatencySample& sample = self->peekVersionCounts.at(tag); + sample.addMeasurement(versionCount); } } diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 7b8fdb3b0d..db15b3a714 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -2657,16 +2657,17 @@ public: } else if (cacheEntry.reading()) { // This is very unlikely, maybe impossible in the current pager use cases // Wait for the outstanding read to finish, then start the write - cacheEntry.writeFuture = mapAsync(Void)>, Void>( - success(cacheEntry.readFuture), [=](Void) { return writePhysicalPage(reason, level, pageIDs, data); }); + cacheEntry.writeFuture = mapAsync(success(cacheEntry.readFuture), + [=](Void) { return writePhysicalPage(reason, level, pageIDs, data); }); } + // If the page is being written, wait for this write before issuing the new write to ensure the // writes happen in the correct order else if (cacheEntry.writing()) { // This is very unlikely, maybe impossible in the current pager use cases // Wait for the previous write to finish, then start new write - cacheEntry.writeFuture = mapAsync(Void)>, Void>( - cacheEntry.writeFuture, [=](Void) { return writePhysicalPage(reason, level, pageIDs, data); }); + cacheEntry.writeFuture = + mapAsync(cacheEntry.writeFuture, [=](Void) { return writePhysicalPage(reason, level, pageIDs, data); }); } else { cacheEntry.writeFuture = detach(writePhysicalPage(reason, level, pageIDs, data)); } diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index 8679b6d9b5..da5497f961 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -284,7 +284,8 @@ public: StorageMetrics const& metrics, std::vector teams, PhysicalShardCreationTime whenCreated) - : txnProcessor(txnProcessor), id(id), metrics(metrics), teams(teams), whenCreated(whenCreated) {} + : txnProcessor(txnProcessor), id(id), metrics(metrics), + stats(makeReference>>()), teams(teams), whenCreated(whenCreated) {} // Adds `newRange` to this physical shard and starts monitoring the shard. void addRange(const KeyRange& newRange); @@ -297,15 +298,20 @@ public: Reference txnProcessor; uint64_t id; // physical shard id (never changed) StorageMetrics metrics; // current metrics, updated by shardTracker + // todo(zhewu): combine above metrics with stats. They are redundant. + Reference>> stats; // Stats of this physical shard. std::vector teams; // which team owns this physical shard (never changed) PhysicalShardCreationTime whenCreated; // when this physical shard is created (never changed) struct RangeData { Future trackMetrics; - Reference>> - stats; // TODO(zhewu): aggregate all metrics to a single physical shard metrics. + Reference>> stats; }; std::unordered_map rangeData; + + private: + // Inserts a new key range into this physical shard. `newRange` must not exist in this shard already. + void insertNewRangeData(const KeyRange& newRange); }; // Generate a random physical shard ID, which is not UID().first() nor anonymousShardId.first() diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index ebff2221aa..9e6f0a7964 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1045,6 +1045,9 @@ ACTOR Future healthMonitor(ReferencelastLoggedTime == 0, we just started monitor this peer and haven't logged it once yet. + double lastLoggedTime = peer->lastLoggedTime <= 0.0 ? peer->lastConnectTime : peer->lastLoggedTime; if ((workerLocation == Primary && addressInDbAndPrimaryDc(address, dbInfo)) || (workerLocation == Remote && addressInDbAndRemoteDc(address, dbInfo))) { // Monitors intra DC latencies between servers that in the primary or remote DC's transaction @@ -1062,7 +1065,7 @@ ACTOR Future healthMonitor(ReferencelastLoggedTime) + .detail("Elapsed", now() - lastLoggedTime) .detail("Disconnected", disconnectedPeer) .detail("MinLatency", peer->pingLatencies.min()) .detail("MaxLatency", peer->pingLatencies.max()) @@ -1093,7 +1096,7 @@ ACTOR Future healthMonitor(ReferencelastLoggedTime) + .detail("Elapsed", now() - lastLoggedTime) .detail("Disconnected", disconnectedPeer) .detail("MinLatency", peer->pingLatencies.min()) .detail("MaxLatency", peer->pingLatencies.max()) @@ -2410,7 +2413,7 @@ ACTOR Future workerServer(Reference connRecord, ReplyPromise backupReady = req.reply; backupWorkerCache.set(req.reqId, backupReady.getFuture()); Future backupProcess = backupWorker(recruited, req, dbInfo); - backupProcess = storageCache.removeOnReady(req.reqId, backupProcess); + backupProcess = backupWorkerCache.removeOnReady(req.reqId, backupProcess); errorForwarders.add(forwardError(errors, Role::BACKUP, recruited.id(), backupProcess)); TraceEvent("BackupInitRequest", req.reqId).detail("BackupId", recruited.id()); InitializeBackupReply reply(recruited, req.backupEpoch); diff --git a/flow/flow.cpp b/flow/flow.cpp index 0da59e6d76..3cb0d9fe4e 100644 --- a/flow/flow.cpp +++ b/flow/flow.cpp @@ -448,6 +448,27 @@ void bindDeterministicRandomToOpenssl() { #endif // OPENSSL_IS_BORINGSSL } +int nChooseK(int n, int k) { + assert(n >= k && k >= 0); + if (k == 0) { + return 1; + } + if (k > n / 2) { + return nChooseK(n, n - k); + } + + long ret = 1; + + // To avoid integer overflow, we do n/1 * (n-1)/2 * (n-2)/3 * (n-i+1)/i, where i = k + for (int i = 1; i <= k; ++i) { + ret *= n - i + 1; + ret /= i; + } + ASSERT(ret <= INT_MAX); + + return ret; +} + namespace { // Simple message for flatbuffers unittests struct Int { diff --git a/flow/include/flow/flow.h b/flow/include/flow/flow.h index a21bcfee2b..7c1fde7037 100644 --- a/flow/include/flow/flow.h +++ b/flow/include/flow/flow.h @@ -100,6 +100,9 @@ extern StringRef strinc(StringRef const& str, Arena& arena); extern Standalone addVersionStampAtEnd(StringRef const& str); extern StringRef addVersionStampAtEnd(StringRef const& str, Arena& arena); +// Return the number of combinations to choose k items out of n choices +int nChooseK(int n, int k); + template StringRef concatenate(Iter b, Iter const& e, Arena& arena) { int rsize = 0; diff --git a/flow/include/flow/genericactors.actor.h b/flow/include/flow/genericactors.actor.h index c1081fbbd9..f2fde0c90e 100644 --- a/flow/include/flow/genericactors.actor.h +++ b/flow/include/flow/genericactors.actor.h @@ -344,10 +344,10 @@ Future storeOrThrow(T& out, Future> what, Error e = key_not_fo } // Waits for a future to be ready, and then applies an asynchronous function to it. -ACTOR template ()(std::declval()).getValue())> -Future mapAsync(Future what, F actorFunc) { +ACTOR template +Future()(std::declval()).getValue())> mapAsync(Future what, F actorFunc) { T val = wait(what); - U ret = wait(actorFunc(val)); + decltype(std::declval()(std::declval()).getValue()) ret = wait(actorFunc(val)); return ret; } diff --git a/flow/network.cpp b/flow/network.cpp index 85c73f8e85..4a0128a6da 100644 --- a/flow/network.cpp +++ b/flow/network.cpp @@ -289,11 +289,9 @@ Future> INetworkConnections::connect(const std::string& h // Wait for the endpoint to return, then wait for connect(endpoint) and return it. // Template types are being provided explicitly because they can't be automatically deduced for some reason. - return mapAsync>(NetworkAddress const&)>, - Reference>( - pickEndpoint, - [=](NetworkAddress const& addr) -> Future> { return connectExternal(addr); }); + return mapAsync(pickEndpoint, [=](NetworkAddress const& addr) -> Future> { + return connectExternal(addr); + }); } IUDPSocket::~IUDPSocket() {}