Merge pull request #2718 from ajbeamon/dd-better-rebalance-logging
Better Data Distribution Rebalancing Logging
This commit is contained in:
commit
45dcbecb16
|
@ -2544,8 +2544,8 @@ ACTOR void checkWrites( Database cx, Future<Void> committed, Promise<Void> outCo
|
|||
} else {
|
||||
Optional<Value> val = wait( tr.get( it->range().begin ) );
|
||||
if( !val.present() || val.get() != m.setValue ) {
|
||||
TraceEvent evt = TraceEvent(SevError, "CheckWritesFailed")
|
||||
.detail("Class", "Set")
|
||||
TraceEvent evt(SevError, "CheckWritesFailed");
|
||||
evt.detail("Class", "Set")
|
||||
.detail("Key", it->range().begin)
|
||||
.detail("Expected", m.setValue);
|
||||
if( !val.present() )
|
||||
|
|
|
@ -1127,8 +1127,10 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
}
|
||||
|
||||
// Move a random shard of sourceTeam's to destTeam if sourceTeam has much more data than destTeam
|
||||
ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<IDataDistributionTeam> sourceTeam, Reference<IDataDistributionTeam> destTeam, bool primary ) {
|
||||
ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<IDataDistributionTeam> sourceTeam,
|
||||
Reference<IDataDistributionTeam> destTeam, bool primary, TraceEvent *traceEvent ) {
|
||||
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
|
||||
traceEvent->detail("CancelingDueToSimulationSpeedup", true);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1138,6 +1140,9 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
|
|||
state int64_t averageShardBytes = wait(req.getFuture());
|
||||
state std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
|
||||
traceEvent->detail("AverageShardBytes", averageShardBytes)
|
||||
.detail("ShardsInSource", shards.size());
|
||||
|
||||
if( !shards.size() )
|
||||
return false;
|
||||
|
||||
|
@ -1159,28 +1164,28 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
|
|||
|
||||
int64_t sourceBytes = sourceTeam->getLoadBytes(false);
|
||||
int64_t destBytes = destTeam->getLoadBytes();
|
||||
if( sourceBytes - destBytes <= 3 * std::max<int64_t>( SERVER_KNOBS->MIN_SHARD_BYTES, metrics.bytes ) || metrics.bytes == 0 )
|
||||
|
||||
bool sourceAndDestTooSimilar = sourceBytes - destBytes <= 3 * std::max<int64_t>(SERVER_KNOBS->MIN_SHARD_BYTES, metrics.bytes);
|
||||
traceEvent->detail("SourceBytes", sourceBytes)
|
||||
.detail("DestBytes", destBytes)
|
||||
.detail("ShardBytes", metrics.bytes)
|
||||
.detail("SourceAndDestTooSimilar", sourceAndDestTooSimilar);
|
||||
|
||||
if( sourceAndDestTooSimilar || metrics.bytes == 0 ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
{
|
||||
//verify the shard is still in sabtf
|
||||
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
for( int i = 0; i < shards.size(); i++ ) {
|
||||
if( moveShard == shards[i] ) {
|
||||
TraceEvent(priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->distributorId)
|
||||
.detail("SourceBytes", sourceBytes)
|
||||
.detail("DestBytes", destBytes)
|
||||
.detail("ShardBytes", metrics.bytes)
|
||||
.detail("AverageShardBytes", averageShardBytes)
|
||||
.detail("SourceTeam", sourceTeam->getDesc())
|
||||
.detail("DestTeam", destTeam->getDesc());
|
||||
|
||||
self->output.send( RelocateShard( moveShard, priority ) );
|
||||
return true;
|
||||
}
|
||||
//verify the shard is still in sabtf
|
||||
shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
for( int i = 0; i < shards.size(); i++ ) {
|
||||
if( moveShard == shards[i] ) {
|
||||
traceEvent->detail("ShardStillPresent", true);
|
||||
self->output.send( RelocateShard( moveShard, priority ) );
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
traceEvent->detail("ShardStillPresent", false);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1191,6 +1196,15 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
|
|||
state double lastRead = 0;
|
||||
state bool skipCurrentLoop = false;
|
||||
loop {
|
||||
state bool moved = false;
|
||||
state TraceEvent traceEvent("BgDDMountainChopper", self->distributorId);
|
||||
traceEvent.suppressFor(5.0)
|
||||
.detail("PollingInterval", rebalancePollingInterval);
|
||||
|
||||
if(*self->lastLimited > 0) {
|
||||
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
|
||||
}
|
||||
|
||||
try {
|
||||
state Future<Void> delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
|
||||
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
|
||||
|
@ -1203,6 +1217,9 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
|
|||
}
|
||||
skipCurrentLoop = val.present();
|
||||
}
|
||||
|
||||
traceEvent.detail("Enabled", !skipCurrentLoop);
|
||||
|
||||
wait(delayF);
|
||||
if (skipCurrentLoop) {
|
||||
// set loop interval to avoid busy wait here.
|
||||
|
@ -1210,18 +1227,31 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
|
|||
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
|
||||
continue;
|
||||
}
|
||||
|
||||
traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]);
|
||||
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
|
||||
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
|
||||
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
|
||||
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, true, false))));
|
||||
|
||||
traceEvent.detail("DestTeam", printable(randomTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
|
||||
return team->getDesc();
|
||||
})));
|
||||
|
||||
if (randomTeam.present()) {
|
||||
state Optional<Reference<IDataDistributionTeam>> loadedTeam =
|
||||
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
|
||||
GetTeamRequest(true, true, false, true))));
|
||||
|
||||
traceEvent.detail("SourceTeam", printable(loadedTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
|
||||
return team->getDesc();
|
||||
})));
|
||||
|
||||
if (loadedTeam.present()) {
|
||||
bool moved =
|
||||
bool _moved =
|
||||
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(),
|
||||
randomTeam.get(), teamCollectionIndex == 0));
|
||||
randomTeam.get(), teamCollectionIndex == 0, &traceEvent));
|
||||
moved = _moved;
|
||||
if (moved) {
|
||||
resetCount = 0;
|
||||
} else {
|
||||
|
@ -1244,10 +1274,16 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
|
|||
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
|
||||
resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
|
||||
}
|
||||
|
||||
traceEvent.detail("ResetCount", resetCount);
|
||||
tr.reset();
|
||||
} catch (Error& e) {
|
||||
traceEvent.error(e, true); // Log actor_cancelled because it's not legal to suppress an event that's initialized
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
||||
traceEvent.detail("Moved", moved);
|
||||
traceEvent.log();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1258,6 +1294,15 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
|
|||
state double lastRead = 0;
|
||||
state bool skipCurrentLoop = false;
|
||||
loop {
|
||||
state bool moved = false;
|
||||
state TraceEvent traceEvent("BgDDValleyFiller", self->distributorId);
|
||||
traceEvent.suppressFor(5.0)
|
||||
.detail("PollingInterval", rebalancePollingInterval);
|
||||
|
||||
if(*self->lastLimited > 0) {
|
||||
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
|
||||
}
|
||||
|
||||
try {
|
||||
state Future<Void> delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
|
||||
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
|
||||
|
@ -1270,6 +1315,9 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
|
|||
}
|
||||
skipCurrentLoop = val.present();
|
||||
}
|
||||
|
||||
traceEvent.detail("Enabled", !skipCurrentLoop);
|
||||
|
||||
wait(delayF);
|
||||
if (skipCurrentLoop) {
|
||||
// set loop interval to avoid busy wait here.
|
||||
|
@ -1277,17 +1325,30 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
|
|||
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
|
||||
continue;
|
||||
}
|
||||
|
||||
traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]);
|
||||
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
|
||||
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
|
||||
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
|
||||
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, false, true))));
|
||||
|
||||
traceEvent.detail("SourceTeam", printable(randomTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
|
||||
return team->getDesc();
|
||||
})));
|
||||
|
||||
if (randomTeam.present()) {
|
||||
state Optional<Reference<IDataDistributionTeam>> unloadedTeam = wait(brokenPromiseToNever(
|
||||
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, true, true, false))));
|
||||
|
||||
traceEvent.detail("DestTeam", printable(unloadedTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
|
||||
return team->getDesc();
|
||||
})));
|
||||
|
||||
if (unloadedTeam.present()) {
|
||||
bool moved =
|
||||
bool _moved =
|
||||
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(),
|
||||
unloadedTeam.get(), teamCollectionIndex == 0));
|
||||
unloadedTeam.get(), teamCollectionIndex == 0, &traceEvent));
|
||||
moved = _moved;
|
||||
if (moved) {
|
||||
resetCount = 0;
|
||||
} else {
|
||||
|
@ -1310,10 +1371,16 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
|
|||
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
|
||||
resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
|
||||
}
|
||||
|
||||
traceEvent.detail("ResetCount", resetCount);
|
||||
tr.reset();
|
||||
} catch (Error& e) {
|
||||
traceEvent.error(e, true); // Log actor_cancelled because it's not legal to suppress an event that's initialized
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
||||
traceEvent.detail("Moved", moved);
|
||||
traceEvent.log();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -653,6 +653,50 @@ void removeTraceRole(std::string role) {
|
|||
g_traceLog.removeRole(role);
|
||||
}
|
||||
|
||||
TraceEvent::TraceEvent() : initialized(true), enabled(false), logged(true) {}
|
||||
|
||||
TraceEvent::TraceEvent(TraceEvent &&ev) {
|
||||
enabled = ev.enabled;
|
||||
err = ev.err;
|
||||
fields = std::move(ev.fields);
|
||||
id = ev.id;
|
||||
initialized = ev.initialized;
|
||||
logged = ev.logged;
|
||||
maxEventLength = ev.maxEventLength;
|
||||
maxFieldLength = ev.maxFieldLength;
|
||||
severity = ev.severity;
|
||||
tmpEventMetric = ev.tmpEventMetric;
|
||||
trackingKey = ev.trackingKey;
|
||||
type = ev.type;
|
||||
|
||||
ev.initialized = true;
|
||||
ev.enabled = false;
|
||||
ev.logged = true;
|
||||
ev.tmpEventMetric = nullptr;
|
||||
}
|
||||
|
||||
TraceEvent& TraceEvent::operator=(TraceEvent &&ev) {
|
||||
enabled = ev.enabled;
|
||||
err = ev.err;
|
||||
fields = std::move(ev.fields);
|
||||
id = ev.id;
|
||||
initialized = ev.initialized;
|
||||
logged = ev.logged;
|
||||
maxEventLength = ev.maxEventLength;
|
||||
maxFieldLength = ev.maxFieldLength;
|
||||
severity = ev.severity;
|
||||
tmpEventMetric = ev.tmpEventMetric;
|
||||
trackingKey = ev.trackingKey;
|
||||
type = ev.type;
|
||||
|
||||
ev.initialized = true;
|
||||
ev.enabled = false;
|
||||
ev.logged = true;
|
||||
ev.tmpEventMetric = nullptr;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true), logged(false) {
|
||||
g_trace_depth++;
|
||||
setMaxFieldLength(0);
|
||||
|
@ -729,7 +773,9 @@ bool TraceEvent::init() {
|
|||
}
|
||||
|
||||
detail("Severity", int(severity));
|
||||
detailf("Time", "%.6f", getCurrentTime());
|
||||
detail("Time", "0.000000");
|
||||
timeIndex = fields.size() - 1;
|
||||
|
||||
detail("Type", type);
|
||||
if(g_network && g_network->isSimulated()) {
|
||||
NetworkAddress local = g_network->getLocalAddress();
|
||||
|
@ -937,6 +983,8 @@ void TraceEvent::log() {
|
|||
init();
|
||||
try {
|
||||
if (enabled) {
|
||||
fields.mutate(timeIndex).second = format("%.6f", TraceEvent::getCurrentTime());
|
||||
|
||||
if (this->severity == SevError) {
|
||||
severity = SevInfo;
|
||||
backtrace();
|
||||
|
@ -1150,6 +1198,10 @@ std::string TraceEventFields::getValue(std::string key) const {
|
|||
}
|
||||
}
|
||||
|
||||
TraceEventFields::Field& TraceEventFields::mutate(int index) {
|
||||
return fields.at(index);
|
||||
}
|
||||
|
||||
namespace {
|
||||
void parseNumericValue(std::string const& s, double &outValue, bool permissive = false) {
|
||||
double d = 0;
|
||||
|
@ -1275,6 +1327,9 @@ void TraceEventFields::validateFormat() const {
|
|||
}
|
||||
|
||||
std::string traceableStringToString(const char* value, size_t S) {
|
||||
ASSERT_WE_THINK(S > 0 && value[S - 1] == '\0');
|
||||
if(g_network) {
|
||||
ASSERT_WE_THINK(S > 0 && value[S - 1] == '\0');
|
||||
}
|
||||
|
||||
return std::string(value, S - 1); // Exclude trailing \0 byte
|
||||
}
|
||||
|
|
|
@ -80,6 +80,8 @@ public:
|
|||
int64_t getInt64(std::string key, bool permissive=false) const;
|
||||
double getDouble(std::string key, bool permissive=false) const;
|
||||
|
||||
Field &mutate(int index);
|
||||
|
||||
std::string toString() const;
|
||||
void validateFormat() const;
|
||||
template<class Archiver>
|
||||
|
@ -373,11 +375,15 @@ struct SpecialTraceMetricType
|
|||
TRACE_METRIC_TYPE(double, double);
|
||||
|
||||
struct TraceEvent {
|
||||
TraceEvent();
|
||||
TraceEvent( const char* type, UID id = UID() ); // Assumes SevInfo severity
|
||||
TraceEvent( Severity, const char* type, UID id = UID() );
|
||||
TraceEvent( struct TraceInterval&, UID id = UID() );
|
||||
TraceEvent( Severity severity, struct TraceInterval& interval, UID id = UID() );
|
||||
|
||||
TraceEvent( TraceEvent &&ev );
|
||||
TraceEvent& operator=( TraceEvent &&ev );
|
||||
|
||||
static void setNetworkThread();
|
||||
static bool isNetworkThread();
|
||||
|
||||
|
@ -489,6 +495,7 @@ private:
|
|||
|
||||
int maxFieldLength;
|
||||
int maxEventLength;
|
||||
int timeIndex;
|
||||
|
||||
void setSizeLimits();
|
||||
|
||||
|
|
Loading…
Reference in New Issue