Add TraceEvent to warn tLogs which has not joined after timeout
During recovery, wait for TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS and log the tLogs that has not joined yet.
This commit is contained in:
parent
58550f9e37
commit
3fbe7e69fa
|
@ -43,6 +43,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
|
||||
// TLogs
|
||||
init( TLOG_TIMEOUT, 0.4 ); //cannot buggify because of availability
|
||||
init( TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS, 60 ); if( randomize && BUGGIFY ) TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS = deterministicRandom()->randomInt(5,10);
|
||||
init( RECOVERY_TLOG_SMART_QUORUM_DELAY, 0.25 ); if( randomize && BUGGIFY ) RECOVERY_TLOG_SMART_QUORUM_DELAY = 0.0; // smaller might be better for bug amplification
|
||||
init( TLOG_STORAGE_MIN_UPDATE_INTERVAL, 0.5 );
|
||||
init( BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL, 30 );
|
||||
|
|
|
@ -42,6 +42,7 @@ public:
|
|||
|
||||
// TLogs
|
||||
double TLOG_TIMEOUT; // tlog OR master proxy failure - master's reaction time
|
||||
double TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS; // Warns if a tlog takes too long to rejoin
|
||||
double RECOVERY_TLOG_SMART_QUORUM_DELAY; // smaller might be better for bug amplification
|
||||
double TLOG_STORAGE_MIN_UPDATE_INTERVAL;
|
||||
double BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL;
|
||||
|
|
|
@ -3025,35 +3025,52 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logServers,
|
||||
FutureStream<struct TLogRejoinRequest> rejoinRequests) {
|
||||
state std::map<UID, ReplyPromise<TLogRejoinReply>> lastReply;
|
||||
state std::set<UID> logsWaiting;
|
||||
state double startTime = now();
|
||||
state Future<Void> warnTimeout = delay(SERVER_KNOBS->TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS);
|
||||
|
||||
for (const auto& log : logServers) {
|
||||
logsWaiting.insert(log.first->get().id());
|
||||
}
|
||||
|
||||
try {
|
||||
loop {
|
||||
TLogRejoinRequest req = waitNext(rejoinRequests);
|
||||
int pos = -1;
|
||||
for (int i = 0; i < logServers.size(); i++) {
|
||||
if (logServers[i].first->get().id() == req.myInterface.id()) {
|
||||
pos = i;
|
||||
break;
|
||||
loop choose {
|
||||
when(TLogRejoinRequest req = waitNext(rejoinRequests)) {
|
||||
int pos = -1;
|
||||
for (int i = 0; i < logServers.size(); i++) {
|
||||
if (logServers[i].first->get().id() == req.myInterface.id()) {
|
||||
pos = i;
|
||||
logsWaiting.erase(logServers[i].first->get().id());
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (pos != -1) {
|
||||
TraceEvent("TLogJoinedMe", dbgid)
|
||||
.detail("TLog", req.myInterface.id())
|
||||
.detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
|
||||
if (!logServers[pos].first->get().present() ||
|
||||
req.myInterface.commit.getEndpoint() !=
|
||||
logServers[pos].first->get().interf().commit.getEndpoint()) {
|
||||
TLogInterface interf = req.myInterface;
|
||||
filterLocalityDataForPolicyDcAndProcess(logServers[pos].second, &interf.filteredLocality);
|
||||
logServers[pos].first->setUnconditional(OptionalInterface<TLogInterface>(interf));
|
||||
}
|
||||
lastReply[req.myInterface.id()].send(TLogRejoinReply{ false });
|
||||
lastReply[req.myInterface.id()] = req.reply;
|
||||
} else {
|
||||
TraceEvent("TLogJoinedMeUnknown", dbgid)
|
||||
.detail("TLog", req.myInterface.id())
|
||||
.detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
|
||||
req.reply.send(true);
|
||||
}
|
||||
}
|
||||
if (pos != -1) {
|
||||
TraceEvent("TLogJoinedMe", dbgid)
|
||||
.detail("TLog", req.myInterface.id())
|
||||
.detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
|
||||
if (!logServers[pos].first->get().present() ||
|
||||
req.myInterface.commit.getEndpoint() !=
|
||||
logServers[pos].first->get().interf().commit.getEndpoint()) {
|
||||
TLogInterface interf = req.myInterface;
|
||||
filterLocalityDataForPolicyDcAndProcess(logServers[pos].second, &interf.filteredLocality);
|
||||
logServers[pos].first->setUnconditional(OptionalInterface<TLogInterface>(interf));
|
||||
when(wait(warnTimeout)) {
|
||||
for (const auto& logId : logsWaiting) {
|
||||
TraceEvent(SevWarnAlways, "TLogRejoinSlow", dbgid)
|
||||
.detail("Elapsed", startTime - now())
|
||||
.detail("LogId", logId);
|
||||
}
|
||||
lastReply[req.myInterface.id()].send(TLogRejoinReply{ false });
|
||||
lastReply[req.myInterface.id()] = req.reply;
|
||||
} else {
|
||||
TraceEvent("TLogJoinedMeUnknown", dbgid)
|
||||
.detail("TLog", req.myInterface.id())
|
||||
.detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString());
|
||||
req.reply.send(true);
|
||||
warnTimeout = Never();
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
|
|
Loading…
Reference in New Issue