fix roll trace event issue for data distribution
Description Testing
This commit is contained in:
parent
76ab937e55
commit
c10dd0df4b
|
@ -727,6 +727,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
PromiseStream<Promise<int>> getUnhealthyRelocationCount;
|
||||
Promise<UID> removeFailedServer;
|
||||
|
||||
Reference<EventCacheHolder> ddTrackerStartingEventHolder;
|
||||
Reference<EventCacheHolder> teamCollectionInfoEventHolder;
|
||||
Reference<EventCacheHolder> storageServerRecruitmentEventHolder;
|
||||
|
||||
void resetLocalitySet() {
|
||||
storageServerSet = Reference<LocalitySet>(new LocalityMap<UID>());
|
||||
LocalityMap<UID>* storageServerMap = (LocalityMap<UID>*)storageServerSet.getPtr();
|
||||
|
@ -781,9 +785,15 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
clearHealthyZoneFuture(true), medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO),
|
||||
lastMedianAvailableSpaceUpdate(0), lowestUtilizationTeam(0), highestUtilizationTeam(0),
|
||||
getShardMetrics(getShardMetrics), getUnhealthyRelocationCount(getUnhealthyRelocationCount),
|
||||
removeFailedServer(removeFailedServer) {
|
||||
removeFailedServer(removeFailedServer),
|
||||
ddTrackerStartingEventHolder(makeReference<EventCacheHolder>("DDTrackerStarting")),
|
||||
teamCollectionInfoEventHolder(makeReference<EventCacheHolder>("TeamCollectionInfo")),
|
||||
storageServerRecruitmentEventHolder(
|
||||
makeReference<EventCacheHolder>("StorageServerRecruitment_" + distributorId.toString())) {
|
||||
if (!primary || configuration.usableRegions == 1) {
|
||||
TraceEvent("DDTrackerStarting", distributorId).detail("State", "Inactive").trackLatest("DDTrackerStarting");
|
||||
TraceEvent("DDTrackerStarting", distributorId)
|
||||
.detail("State", "Inactive")
|
||||
.trackLatest(ddTrackerStartingEventHolder->trackingKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -863,7 +873,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
if (!self->primary || self->configuration.usableRegions == 1) {
|
||||
TraceEvent("DDTrackerStarting", self->distributorId)
|
||||
.detail("State", "Active")
|
||||
.trackLatest("DDTrackerStarting");
|
||||
.trackLatest(self->ddTrackerStartingEventHolder->trackingKey);
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -2317,7 +2327,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("MinMachineTeamsOnMachine", minMaxMachineTeamsOnMachine.first)
|
||||
.detail("MaxMachineTeamsOnMachine", minMaxMachineTeamsOnMachine.second)
|
||||
.detail("DoBuildTeams", doBuildTeams)
|
||||
.trackLatest("TeamCollectionInfo");
|
||||
.trackLatest(teamCollectionInfoEventHolder->trackingKey);
|
||||
|
||||
return addedTeams;
|
||||
}
|
||||
|
@ -2354,7 +2364,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("MinMachineTeamsOnMachine", minMaxMachineTeamsOnMachine.first)
|
||||
.detail("MaxMachineTeamsOnMachine", minMaxMachineTeamsOnMachine.second)
|
||||
.detail("DoBuildTeams", doBuildTeams)
|
||||
.trackLatest("TeamCollectionInfo");
|
||||
.trackLatest(teamCollectionInfoEventHolder->trackingKey);
|
||||
|
||||
// Advance time so that we will not have multiple TeamCollectionInfo at the same time, otherwise
|
||||
// simulation test will randomly pick one TeamCollectionInfo trace, which could be the one before build teams
|
||||
|
@ -2478,7 +2488,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("MinMachineTeamsOnMachine", minMaxMachineTeamsOnMachine.first)
|
||||
.detail("MaxMachineTeamsOnMachine", minMaxMachineTeamsOnMachine.second)
|
||||
.detail("DoBuildTeams", self->doBuildTeams)
|
||||
.trackLatest("TeamCollectionInfo");
|
||||
.trackLatest(self->teamCollectionInfoEventHolder->trackingKey);
|
||||
}
|
||||
} else {
|
||||
self->lastBuildTeamsFailed = true;
|
||||
|
@ -4952,7 +4962,7 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection* self) {
|
|||
state bool lastIsTss = false;
|
||||
TraceEvent("StorageServerRecruitment", self->distributorId)
|
||||
.detail("State", "Idle")
|
||||
.trackLatest("StorageServerRecruitment_" + self->distributorId.toString());
|
||||
.trackLatest(self->teamCollectionInfoEventHolder->trackingKey);
|
||||
loop {
|
||||
if (!recruiting) {
|
||||
while (self->recruitingStream.get() == 0) {
|
||||
|
@ -4961,7 +4971,7 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection* self) {
|
|||
TraceEvent("StorageServerRecruitment", self->distributorId)
|
||||
.detail("State", "Recruiting")
|
||||
.detail("IsTSS", self->isTssRecruiting ? "True" : "False")
|
||||
.trackLatest("StorageServerRecruitment_" + self->distributorId.toString());
|
||||
.trackLatest(self->teamCollectionInfoEventHolder->trackingKey);
|
||||
recruiting = true;
|
||||
lastIsTss = self->isTssRecruiting;
|
||||
} else {
|
||||
|
@ -4972,7 +4982,7 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection* self) {
|
|||
TraceEvent("StorageServerRecruitment", self->distributorId)
|
||||
.detail("State", "Recruiting")
|
||||
.detail("IsTSS", self->isTssRecruiting ? "True" : "False")
|
||||
.trackLatest("StorageServerRecruitment_" + self->distributorId.toString());
|
||||
.trackLatest(self->teamCollectionInfoEventHolder->trackingKey);
|
||||
lastIsTss = self->isTssRecruiting;
|
||||
}
|
||||
}
|
||||
|
@ -4985,7 +4995,7 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection* self) {
|
|||
}
|
||||
TraceEvent("StorageServerRecruitment", self->distributorId)
|
||||
.detail("State", "Idle")
|
||||
.trackLatest("StorageServerRecruitment_" + self->distributorId.toString());
|
||||
.trackLatest(self->teamCollectionInfoEventHolder->trackingKey);
|
||||
recruiting = false;
|
||||
}
|
||||
}
|
||||
|
@ -5846,9 +5856,17 @@ struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData>
|
|||
UID ddId;
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
DDTeamCollection* teamCollection;
|
||||
Reference<EventCacheHolder> initialDDEventHolder;
|
||||
Reference<EventCacheHolder> movingDataEventHolder;
|
||||
Reference<EventCacheHolder> totalDataInFlightEventHolder;
|
||||
Reference<EventCacheHolder> totalDataInFlightRemoteEventHolder;
|
||||
|
||||
DataDistributorData(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
|
||||
: dbInfo(db), ddId(id), teamCollection(nullptr) {}
|
||||
: dbInfo(db), ddId(id), teamCollection(nullptr),
|
||||
initialDDEventHolder(makeReference<EventCacheHolder>("InitialDD")),
|
||||
movingDataEventHolder(makeReference<EventCacheHolder>("MoveData")),
|
||||
totalDataInFlightEventHolder(makeReference<EventCacheHolder>("TotalDataInFlight")),
|
||||
totalDataInFlightRemoteEventHolder(makeReference<EventCacheHolder>("TotalDataInFlightRemote")) {}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo> const> db, double* lastLimited) {
|
||||
|
@ -5966,14 +5984,14 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
.detail("E", initData->shards.end()[-1].key)
|
||||
.detail("Src", describe(initData->shards.end()[-2].primarySrc))
|
||||
.detail("Dest", describe(initData->shards.end()[-2].primaryDest))
|
||||
.trackLatest("InitialDD");
|
||||
.trackLatest(self->initialDDEventHolder->trackingKey);
|
||||
} else {
|
||||
TraceEvent("DDInitGotInitialDD", self->ddId)
|
||||
.detail("B", "")
|
||||
.detail("E", "")
|
||||
.detail("Src", "[no items]")
|
||||
.detail("Dest", "[no items]")
|
||||
.trackLatest("InitialDD");
|
||||
.trackLatest(self->initialDDEventHolder->trackingKey);
|
||||
}
|
||||
|
||||
if (initData->mode && ddEnabledState->isDDEnabled()) {
|
||||
|
|
|
@ -23,6 +23,8 @@
|
|||
#include <vector>
|
||||
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/Util.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
|
@ -403,6 +405,9 @@ struct DDQueueData {
|
|||
|
||||
std::map<int, int> priority_relocations;
|
||||
int unhealthyRelocations;
|
||||
|
||||
Reference<EventCacheHolder> movedKeyServersEventHolder;
|
||||
|
||||
void startRelocation(int priority, int healthPriority) {
|
||||
// Although PRIORITY_TEAM_REDUNDANT has lower priority than split and merge shard movement,
|
||||
// we must count it into unhealthyRelocations; because team removers relies on unhealthyRelocations to
|
||||
|
@ -455,7 +460,8 @@ struct DDQueueData {
|
|||
fetchSourceLock(new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM)), activeRelocations(0),
|
||||
queuedRelocations(0), bytesWritten(0), teamSize(teamSize), singleRegionTeamSize(singleRegionTeamSize),
|
||||
output(output), input(input), getShardMetrics(getShardMetrics), lastLimited(lastLimited), lastInterval(0),
|
||||
suppressIntervals(0), rawProcessingUnhealthy(new AsyncVar<bool>(false)), unhealthyRelocations(0) {}
|
||||
suppressIntervals(0), rawProcessingUnhealthy(new AsyncVar<bool>(false)), unhealthyRelocations(0),
|
||||
movedKeyServersEventHolder(makeReference<EventCacheHolder>("MovedKeyServers")) {}
|
||||
|
||||
void validate() {
|
||||
if (EXPENSIVE_VALIDATION) {
|
||||
|
@ -1214,7 +1220,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
|
|||
if (rd.keys.begin == keyServersPrefix) {
|
||||
TraceEvent("MovedKeyServerKeys")
|
||||
.detail("Dest", describe(destIds))
|
||||
.trackLatest("MovedKeyServers");
|
||||
.trackLatest(self->movedKeyServersEventHolder->trackingKey);
|
||||
}
|
||||
|
||||
if (!signalledTransferComplete) {
|
||||
|
|
|
@ -24,6 +24,8 @@
|
|||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// The used bandwidth of a shard. The higher the value is, the busier the shard is.
|
||||
|
@ -933,6 +935,7 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
*trackerCancelled);
|
||||
state Future<Void> loggingTrigger = Void();
|
||||
state Future<Void> readHotDetect = readHotDetector(&self);
|
||||
state Reference<EventCacheHolder> ddTrackerStatsEventHolder = makeReference<EventCacheHolder>("DDTrackerStats");
|
||||
try {
|
||||
wait(trackInitialShards(&self, initData));
|
||||
initData = Reference<InitialDataDistribution>();
|
||||
|
@ -946,7 +949,7 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
.detail("Shards", self.shards.size())
|
||||
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
|
||||
.detail("SystemSizeBytes", self.systemSizeEstimate)
|
||||
.trackLatest("DDTrackerStats");
|
||||
.trackLatest(ddTrackerStatsEventHolder->trackingKey);
|
||||
|
||||
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue