TSS Mismatch Changes

This commit is contained in:
Josh Slocum 2021-06-11 16:20:38 -05:00
parent 31c12b3d5b
commit 56dadaa428
10 changed files with 380 additions and 232 deletions

View File

@ -275,7 +275,7 @@ public:
Future<Void> monitorProxiesInfoChange;
Future<Void> monitorTssInfoChange;
Future<Void> tssMismatchHandler;
PromiseStream<UID> tssMismatchStream;
PromiseStream<std::pair<UID, std::vector<DetailedTSSMismatch>>> tssMismatchStream;
Reference<CommitProxyInfo> commitProxies;
Reference<GrvProxyInfo> grvProxies;
bool proxyProvisional; // Provisional commit proxy and grv proxy are used at the same time.
@ -428,12 +428,12 @@ public:
static const std::vector<std::string> debugTransactionTagChoices;
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
// Requests to the storage server will be duplicated to the TSS.
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
// Requests to the storage server will be duplicated to the TSS.
void addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi);
// Removes the storage server and its TSS pair from the TSS mapping (if present).
// Requests to the storage server will no longer be duplicated to its pair TSS.
// Removes the storage server and its TSS pair from the TSS mapping (if present).
// Requests to the storage server will no longer be duplicated to its pair TSS.
void removeTssMapping(StorageServerInterface const& ssi);
};

View File

@ -383,10 +383,11 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
cx->bytesPerCommit.clear();
for (const auto& it : cx->tssMetrics) {
// TODO could skip this tss if request counter is zero? would potentially complicate elapsed calculation
// though
// TODO could skip this whole thing if tss if request counter is zero?
// That would potentially complicate elapsed calculation though
if (it.second->mismatches.getIntervalDelta()) {
cx->tssMismatchStream.send(it.first);
cx->tssMismatchStream.send(
std::pair<UID, std::vector<DetailedTSSMismatch>>(it.first, it.second->detailedMismatches));
}
// do error histograms as separate event
@ -825,13 +826,15 @@ ACTOR Future<Void> monitorCacheList(DatabaseContext* self) {
ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
state Reference<ReadYourWritesTransaction> tr;
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
state KeyBackedMap<Tuple, std::string> tssMismatchDB = KeyBackedMap<Tuple, std::string>(tssMismatchKeys.begin);
loop {
state UID tssID = waitNext(cx->tssMismatchStream.getFuture());
// <tssid, list of detailed mismatch data>
state std::pair<UID, std::vector<DetailedTSSMismatch>> data = waitNext(cx->tssMismatchStream.getFuture());
// find ss pair id so we can remove it from the mapping
state UID tssPairID;
bool found = false;
for (const auto& it : cx->tssMapping) {
if (it.second.id() == tssID) {
if (it.second.id() == data.first) {
tssPairID = it.first;
found = true;
break;
@ -840,7 +843,7 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
if (found) {
state bool quarantine = CLIENT_KNOBS->QUARANTINE_TSS_ON_MISMATCH;
TraceEvent(SevWarnAlways, quarantine ? "TSS_QuarantineMismatch" : "TSS_KillMismatch")
.detail("TSSID", tssID.toString());
.detail("TSSID", data.first.toString());
TEST(quarantine); // Quarantining TSS because it got mismatch
TEST(!quarantine); // Killing TSS because it got mismatch
@ -850,14 +853,21 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
try {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (quarantine) {
tr->set(tssQuarantineKeyFor(tssID), LiteralStringRef(""));
tr->set(tssQuarantineKeyFor(data.first), LiteralStringRef(""));
} else {
tr->clear(serverTagKeyFor(tssID));
tr->clear(serverTagKeyFor(data.first));
}
tssMapDB.erase(tr, tssPairID);
for (const DetailedTSSMismatch& d : data.second) {
// <tssid, time, mismatchid> -> mismatch data
tssMismatchDB.set(
tr,
Tuple().append(data.first.toString()).append(d.timestamp).append(d.mismatchId.toString()),
d.traceString);
}
wait(tr->commit());
break;
@ -867,7 +877,7 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
tries++;
if (tries > 10) {
// Give up, it'll get another mismatch or a human will investigate eventually
TraceEvent("TSS_MismatchGaveUp").detail("TSSID", tssID.toString());
TraceEvent("TSS_MismatchGaveUp").detail("TSSID", data.first.toString());
break;
}
}

View File

@ -30,32 +30,31 @@ std::string traceChecksumValue(ValueRef s) {
return s.size() > 12 ? format("(%d)%08x", s.size(), crc32c_append(0, s.begin(), s.size())) : s.toString();
}
// point reads
template <>
bool TSS_doCompare(const GetValueRequest& req,
const GetValueReply& src,
const GetValueReply& tss,
Severity traceSeverity,
UID tssId) {
if (src.value.present() != tss.value.present() || (src.value.present() && src.value.get() != tss.value.get())) {
TraceEvent(traceSeverity, "TSSMismatchGetValue")
.suppressFor(1.0)
.detail("TSSID", tssId)
.detail("Key", req.key.printable())
.detail("Version", req.version)
.detail("SSReply", src.value.present() ? traceChecksumValue(src.value.get()) : "missing")
.detail("TSSReply", tss.value.present() ? traceChecksumValue(tss.value.get()) : "missing");
return false;
}
return true;
bool TSS_doCompare(const GetValueReply& src, const GetValueReply& tss) {
return src.value.present() == tss.value.present() && (!src.value.present() || src.value.get() == tss.value.get());
}
template <>
bool TSS_doCompare(const GetKeyRequest& req,
const GetKeyReply& src,
const GetKeyReply& tss,
Severity traceSeverity,
UID tssId) {
const char* TSS_mismatchTraceName(const GetValueRequest& req) {
return "TSSMismatchGetValue";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const GetValueRequest& req,
const GetValueReply& src,
const GetValueReply& tss) {
event.detail("Key", req.key.printable())
.detail("Version", req.version)
.detail("SSReply", src.value.present() ? traceChecksumValue(src.value.get()) : "missing")
.detail("TSSReply", tss.value.present() ? traceChecksumValue(tss.value.get()) : "missing");
}
// key selector reads
template <>
bool TSS_doCompare(const GetKeyReply& src, const GetKeyReply& tss) {
// This process is a bit complicated. Since the tss and ss can return different results if neighboring shards to
// req.sel.key are currently being moved, We validate that the results are the same IF the returned key selectors
// are final. Otherwise, we only mark the request as a mismatch if the difference between the two returned key
@ -92,107 +91,170 @@ bool TSS_doCompare(const GetKeyRequest& req,
bool tssOffsetLarger = (src.sel.offset == tss.sel.offset) ? tss.sel.orEqual : src.sel.offset < tss.sel.offset;
matches = tssKeyLarger != tssOffsetLarger;
}
if (!matches) {
TraceEvent(traceSeverity, "TSSMismatchGetKey")
.suppressFor(1.0)
.detail("TSSID", tssId)
.detail("KeySelector",
format("%s%s:%d", req.sel.orEqual ? "=" : "", req.sel.getKey().printable().c_str(), req.sel.offset))
.detail("Version", req.version)
.detail("SSReply",
format("%s%s:%d", src.sel.orEqual ? "=" : "", src.sel.getKey().printable().c_str(), src.sel.offset))
.detail(
"TSSReply",
format("%s%s:%d", tss.sel.orEqual ? "=" : "", tss.sel.getKey().printable().c_str(), tss.sel.offset));
}
return matches;
}
template <>
bool TSS_doCompare(const GetKeyValuesRequest& req,
const GetKeyValuesReply& src,
const GetKeyValuesReply& tss,
Severity traceSeverity,
UID tssId) {
if (src.more != tss.more || src.data != tss.data) {
const char* TSS_mismatchTraceName(const GetKeyRequest& req) {
return "TSSMismatchGetKey";
}
std::string ssResultsString = format("(%d)%s:\n", src.data.size(), src.more ? "+" : "");
for (auto& it : src.data) {
ssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
}
template <>
void TSS_traceMismatch(TraceEvent& event, const GetKeyRequest& req, const GetKeyReply& src, const GetKeyReply& tss) {
event
.detail("KeySelector",
format("%s%s:%d", req.sel.orEqual ? "=" : "", req.sel.getKey().printable().c_str(), req.sel.offset))
.detail("Version", req.version)
.detail("SSReply",
format("%s%s:%d", src.sel.orEqual ? "=" : "", src.sel.getKey().printable().c_str(), src.sel.offset))
.detail("TSSReply",
format("%s%s:%d", tss.sel.orEqual ? "=" : "", tss.sel.getKey().printable().c_str(), tss.sel.offset));
}
std::string tssResultsString = format("(%d)%s:\n", tss.data.size(), tss.more ? "+" : "");
for (auto& it : tss.data) {
tssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
}
// range reads
template <>
bool TSS_doCompare(const GetKeyValuesReply& src, const GetKeyValuesReply& tss) {
return src.more == tss.more && src.data == tss.data;
}
TraceEvent(traceSeverity, "TSSMismatchGetKeyValues")
.suppressFor(1.0)
.detail("TSSID", tssId)
.detail(
"Begin",
format(
"%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
.detail("End",
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
.detail("Version", req.version)
.detail("Limit", req.limit)
.detail("LimitBytes", req.limitBytes)
.detail("SSReply", ssResultsString)
.detail("TSSReply", tssResultsString);
template <>
const char* TSS_mismatchTraceName(const GetKeyValuesRequest& req) {
return "TSSMismatchGetKeyValues";
}
return false;
template <>
void TSS_traceMismatch(TraceEvent& event,
const GetKeyValuesRequest& req,
const GetKeyValuesReply& src,
const GetKeyValuesReply& tss) {
std::string ssResultsString = format("(%d)%s:\n", src.data.size(), src.more ? "+" : "");
for (auto& it : src.data) {
ssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
}
std::string tssResultsString = format("(%d)%s:\n", tss.data.size(), tss.more ? "+" : "");
for (auto& it : tss.data) {
tssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
}
event
.detail(
"Begin",
format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
.detail("End",
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
.detail("Version", req.version)
.detail("Limit", req.limit)
.detail("LimitBytes", req.limitBytes)
.detail("SSReply", ssResultsString)
.detail("TSSReply", tssResultsString);
}
template <>
bool TSS_doCompare(const WatchValueReply& src, const WatchValueReply& tss) {
// We duplicate watches just for load, no need to validate replies.
return true;
}
template <>
bool TSS_doCompare(const WatchValueRequest& req,
const WatchValueReply& src,
const WatchValueReply& tss,
Severity traceSeverity,
UID tssId) {
// We duplicate watches just for load, no need to validte replies.
return true;
const char* TSS_mismatchTraceName(const WatchValueRequest& req) {
ASSERT(false);
return "";
}
// no-op template specializations for metrics replies
template <>
bool TSS_doCompare(const WaitMetricsRequest& req,
const StorageMetrics& src,
const StorageMetrics& tss,
Severity traceSeverity,
UID tssId) {
void TSS_traceMismatch(TraceEvent& event,
const WatchValueRequest& req,
const WatchValueReply& src,
const WatchValueReply& tss) {
ASSERT(false);
}
// template specializations for metrics replies that should never be called because these requests aren't duplicated
// storage metrics
template <>
bool TSS_doCompare(const StorageMetrics& src, const StorageMetrics& tss) {
ASSERT(false);
return true;
}
template <>
bool TSS_doCompare(const SplitMetricsRequest& req,
const SplitMetricsReply& src,
const SplitMetricsReply& tss,
Severity traceSeverity,
UID tssId) {
const char* TSS_mismatchTraceName(const WaitMetricsRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const WaitMetricsRequest& req,
const StorageMetrics& src,
const StorageMetrics& tss) {
ASSERT(false);
}
// split metrics
template <>
bool TSS_doCompare(const SplitMetricsReply& src, const SplitMetricsReply& tss) {
ASSERT(false);
return true;
}
template <>
bool TSS_doCompare(const ReadHotSubRangeRequest& req,
const ReadHotSubRangeReply& src,
const ReadHotSubRangeReply& tss,
Severity traceSeverity,
UID tssId) {
const char* TSS_mismatchTraceName(const SplitMetricsRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const SplitMetricsRequest& req,
const SplitMetricsReply& src,
const SplitMetricsReply& tss) {
ASSERT(false);
}
// read hot sub range
template <>
bool TSS_doCompare(const ReadHotSubRangeReply& src, const ReadHotSubRangeReply& tss) {
ASSERT(false);
return true;
}
template <>
bool TSS_doCompare(const SplitRangeRequest& req,
const SplitRangeReply& src,
const SplitRangeReply& tss,
Severity traceSeverity,
UID tssId) {
const char* TSS_mismatchTraceName(const ReadHotSubRangeRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const ReadHotSubRangeRequest& req,
const ReadHotSubRangeReply& src,
const ReadHotSubRangeReply& tss) {
ASSERT(false);
}
// split range
template <>
bool TSS_doCompare(const SplitRangeReply& src, const SplitRangeReply& tss) {
ASSERT(false);
return true;
}
template <>
const char* TSS_mismatchTraceName(const SplitRangeRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const SplitRangeRequest& req,
const SplitRangeReply& src,
const SplitRangeReply& tss) {
ASSERT(false);
}
// only record metrics for data reads
template <>
@ -240,32 +302,20 @@ TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {
std::string s_d = "d";
std::string s_e = "e";
// test getValue
GetValueRequest gvReq;
gvReq.key = StringRef(s_a);
gvReq.version = 5;
UID tssId;
GetValueReply gvReplyMissing;
GetValueReply gvReplyA(Optional<Value>(StringRef(s_a)), false);
GetValueReply gvReplyB(Optional<Value>(StringRef(s_b)), false);
ASSERT(TSS_doCompare(gvReq, gvReplyMissing, gvReplyMissing, SevInfo, tssId));
ASSERT(TSS_doCompare(gvReq, gvReplyA, gvReplyA, SevInfo, tssId));
ASSERT(TSS_doCompare(gvReq, gvReplyB, gvReplyB, SevInfo, tssId));
ASSERT(TSS_doCompare(gvReplyMissing, gvReplyMissing));
ASSERT(TSS_doCompare(gvReplyA, gvReplyA));
ASSERT(TSS_doCompare(gvReplyB, gvReplyB));
ASSERT(!TSS_doCompare(gvReq, gvReplyMissing, gvReplyA, SevInfo, tssId));
ASSERT(!TSS_doCompare(gvReq, gvReplyA, gvReplyB, SevInfo, tssId));
ASSERT(!TSS_doCompare(gvReplyMissing, gvReplyA));
ASSERT(!TSS_doCompare(gvReplyA, gvReplyB));
// test GetKeyValues
Arena a; // for all of the refs. ASAN complains if this isn't done. Could also make them all standalone i guess
GetKeyValuesRequest gkvReq;
gkvReq.begin = firstGreaterOrEqual(StringRef(a, s_a));
gkvReq.end = firstGreaterOrEqual(StringRef(a, s_b));
gkvReq.version = 5;
gkvReq.limit = 100;
gkvReq.limitBytes = 1000;
Arena a;
GetKeyValuesReply gkvReplyEmpty;
GetKeyValuesReply gkvReplyOne;
KeyValueRef v;
@ -276,16 +326,11 @@ TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {
gkvReplyOneMore.data.push_back_deep(gkvReplyOneMore.arena, v);
gkvReplyOneMore.more = true;
ASSERT(TSS_doCompare(gkvReq, gkvReplyEmpty, gkvReplyEmpty, SevInfo, tssId));
ASSERT(TSS_doCompare(gkvReq, gkvReplyOne, gkvReplyOne, SevInfo, tssId));
ASSERT(TSS_doCompare(gkvReq, gkvReplyOneMore, gkvReplyOneMore, SevInfo, tssId));
ASSERT(!TSS_doCompare(gkvReq, gkvReplyEmpty, gkvReplyOne, SevInfo, tssId));
ASSERT(!TSS_doCompare(gkvReq, gkvReplyOne, gkvReplyOneMore, SevInfo, tssId));
// test GetKey
GetKeyRequest gkReq;
gkReq.sel = KeySelectorRef(StringRef(a, s_a), false, 1);
gkReq.version = 5;
ASSERT(TSS_doCompare(gkvReplyEmpty, gkvReplyEmpty));
ASSERT(TSS_doCompare(gkvReplyOne, gkvReplyOne));
ASSERT(TSS_doCompare(gkvReplyOneMore, gkvReplyOneMore));
ASSERT(!TSS_doCompare(gkvReplyEmpty, gkvReplyOne));
ASSERT(!TSS_doCompare(gkvReplyOne, gkvReplyOneMore));
GetKeyReply gkReplyA(KeySelectorRef(StringRef(a, s_a), false, 20), false);
GetKeyReply gkReplyB(KeySelectorRef(StringRef(a, s_b), false, 10), false);
@ -294,85 +339,58 @@ TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {
GetKeyReply gkReplyE(KeySelectorRef(StringRef(a, s_e), false, -20), false);
// identical cases
ASSERT(TSS_doCompare(gkReq, gkReplyA, gkReplyA, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyB, gkReplyB, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyC, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyD, gkReplyD, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyE, gkReplyE, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReplyA, gkReplyA));
ASSERT(TSS_doCompare(gkReplyB, gkReplyB));
ASSERT(TSS_doCompare(gkReplyC, gkReplyC));
ASSERT(TSS_doCompare(gkReplyD, gkReplyD));
ASSERT(TSS_doCompare(gkReplyE, gkReplyE));
// relative offset cases
ASSERT(TSS_doCompare(gkReq, gkReplyA, gkReplyB, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyB, gkReplyA, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyA, gkReplyC, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyA, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyB, gkReplyC, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyB, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReplyA, gkReplyB));
ASSERT(TSS_doCompare(gkReplyB, gkReplyA));
ASSERT(TSS_doCompare(gkReplyA, gkReplyC));
ASSERT(TSS_doCompare(gkReplyC, gkReplyA));
ASSERT(TSS_doCompare(gkReplyB, gkReplyC));
ASSERT(TSS_doCompare(gkReplyC, gkReplyB));
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyD, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyD, gkReplyC, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyE, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyE, gkReplyC, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyD, gkReplyE, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReq, gkReplyE, gkReplyD, SevInfo, tssId));
ASSERT(TSS_doCompare(gkReplyC, gkReplyD));
ASSERT(TSS_doCompare(gkReplyD, gkReplyC));
ASSERT(TSS_doCompare(gkReplyC, gkReplyE));
ASSERT(TSS_doCompare(gkReplyE, gkReplyC));
ASSERT(TSS_doCompare(gkReplyD, gkReplyE));
ASSERT(TSS_doCompare(gkReplyE, gkReplyD));
// test same offset/orEqual wrong key
ASSERT(!TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false),
SevInfo,
tssId));
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false)));
// this could be from different shard boundaries, so don't say it's a mismatch
ASSERT(TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 10), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 10), false),
SevInfo,
tssId));
ASSERT(TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 10), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 10), false)));
// test offsets and key difference don't match
ASSERT(!TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 10), false),
SevInfo,
tssId));
ASSERT(!TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, -10), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 0), false),
SevInfo,
tssId));
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 10), false)));
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, -10), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 0), false)));
// test key is next over in one shard, one found it and other didn't
// positive
// one that didn't find is +1
ASSERT(TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 1), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false),
SevInfo,
tssId));
ASSERT(!TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 1), false),
SevInfo,
tssId));
ASSERT(TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 1), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false)));
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 1), false)));
// negative will have zero offset but not equal set
ASSERT(TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 0), false),
SevInfo,
tssId));
ASSERT(!TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false),
SevInfo,
tssId));
ASSERT(TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 0), false)));
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false)));
// test shard boundary key returned by incomplete query is the same as the key found by the other (only possible in
// positive direction)
ASSERT(TSS_doCompare(gkReq,
GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 1), false),
SevInfo,
tssId));
ASSERT(TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 1), false)));
// explictly test checksum function
std::string s12 = "ABCDEFGHIJKL";

View File

@ -364,6 +364,8 @@ UID decodeTssQuarantineKey(KeyRef const& key) {
return serverID;
}
const KeyRangeRef tssMismatchKeys(LiteralStringRef("\xff/tssMismatch/"), LiteralStringRef("\xff/tssMismatch0"));
const KeyRangeRef serverTagKeys(LiteralStringRef("\xff/serverTag/"), LiteralStringRef("\xff/serverTag0"));
const KeyRef serverTagPrefix = serverTagKeys.begin;

View File

@ -124,6 +124,10 @@ extern const KeyRangeRef tssQuarantineKeys;
const Key tssQuarantineKeyFor(UID serverID);
UID decodeTssQuarantineKey(KeyRef const&);
// \xff/tssMismatch/[[Tuple<TSSStorageUID, timestamp, mismatchUID>]] := [[TraceEventString]]
// For recording tss mismatch details in the system keyspace
extern const KeyRangeRef tssMismatchKeys;
// "\xff/serverTag/[[serverID]]" = "[[Tag]]"
// Provides the Tag for the given serverID. Used to access a
// storage server's corresponding TLog in order to apply mutations.

View File

@ -77,16 +77,22 @@ struct LoadBalancedReply {
Optional<LoadBalancedReply> getLoadBalancedReply(const LoadBalancedReply* reply);
Optional<LoadBalancedReply> getLoadBalancedReply(const void*);
ACTOR template <class Req, class Resp>
ACTOR template <class Req, class Resp, class Interface, class Multi>
Future<Void> tssComparison(Req req,
Future<ErrorOr<Resp>> fSource,
Future<ErrorOr<Resp>> fTss,
TSSEndpointData tssData) {
TSSEndpointData tssData,
uint64_t srcEndpointId,
Reference<MultiInterface<Multi>> ssTeam,
RequestStream<Req> Interface::*channel) {
state double startTime = now();
state Future<Optional<ErrorOr<Resp>>> fTssWithTimeout = timeout(fTss, FLOW_KNOBS->LOAD_BALANCE_TSS_TIMEOUT);
state int finished = 0;
state double srcEndTime;
state double tssEndTime;
// we want to record ss/tss errors to metrics
state int srcErrorCode = error_code_success;
state int tssErrorCode = error_code_success;
loop {
choose {
@ -108,11 +114,6 @@ Future<Void> tssComparison(Req req,
}
}
}
// we want to record ss/tss errors to metrics
int srcErrorCode = error_code_success;
int tssErrorCode = error_code_success;
++tssData.metrics->requests;
if (src.isError()) {
@ -137,15 +138,82 @@ Future<Void> tssComparison(Req req,
// apples
tssData.metrics->recordLatency(req, srcEndTime - startTime, tssEndTime - startTime);
// expect mismatches in drop mutations mode.
Severity traceSeverity =
(g_network->isSimulated() && g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError;
if (!TSS_doCompare(req, src.get(), tss.get().get(), traceSeverity, tssData.tssId)) {
if (!TSS_doCompare(src.get(), tss.get().get())) {
TEST(true); // TSS Mismatch
++tssData.metrics->mismatches;
state TraceEvent mismatchEvent(
(g_network->isSimulated() && g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError,
TSS_mismatchTraceName(req));
mismatchEvent.detail("TSSID", tssData.tssId);
if (FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS && ssTeam->size() > 1) {
TEST(true); // checking TSS mismatch against rest of storage team
// if there is more than 1 SS in the team, attempt to verify that the other SS servers have the same
// data
state std::vector<Future<ErrorOr<Resp>>> restOfTeamFutures;
restOfTeamFutures.reserve(ssTeam->size() - 1);
for (int i = 0; i < ssTeam->size(); i++) {
RequestStream<Req> const* si = &ssTeam->get(i, channel);
if (si->getEndpoint().token.first() !=
srcEndpointId) { // don't re-request to SS we already have a response from
resetReply(req);
restOfTeamFutures.push_back(si->tryGetReply(req));
}
}
wait(waitForAllReady(restOfTeamFutures));
int numError = 0;
int numMatchSS = 0;
int numMatchTSS = 0;
int numMatchNeither = 0;
for (Future<ErrorOr<Resp>> f : restOfTeamFutures) {
if (!f.canGet() || f.get().isError()) {
numError++;
} else {
Optional<LoadBalancedReply> fLB = getLoadBalancedReply(&f.get().get());
if (fLB.present() && fLB.get().error.present()) {
numError++;
} else if (TSS_doCompare(src.get(), f.get().get())) {
numMatchSS++;
} else if (TSS_doCompare(tss.get().get(), f.get().get())) {
numMatchTSS++;
} else {
numMatchNeither++;
}
}
}
mismatchEvent.detail("TeamCheckErrors", numError)
.detail("TeamCheckMatchSS", numMatchSS)
.detail("TeamCheckMatchTSS", numMatchTSS)
.detail("TeamCheckMatchNeither", numMatchNeither);
}
if (tssData.metrics->shouldRecordDetailedMismatch()) {
TSS_traceMismatch(mismatchEvent, req, src.get(), tss.get().get());
TEST(FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL); // Tracing Full TSS Mismatch
TEST(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL); // Tracing Partial TSS Mismatch and storing
// the rest in FDB
if (!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL) {
mismatchEvent.disable();
UID mismatchUID = deterministicRandom()->randomUniqueID();
tssData.metrics->recordDetailedMismatchData(mismatchUID, mismatchEvent.getFields().toString());
// record a summarized trace event instead
TraceEvent summaryEvent((g_network->isSimulated() &&
g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError,
TSS_mismatchTraceName(req));
summaryEvent.detail("TSSID", tssData.tssId).detail("MismatchId", mismatchUID);
}
} else {
// don't record trace event
mismatchEvent.disable();
}
}
} else if (tssLB.present() && tssLB.get().error.present()) {
tssErrorCode = tssLB.get().error.get().code();
@ -169,7 +237,7 @@ Future<Void> tssComparison(Req req,
}
// Stores state for a request made by the load balancer
template <class Request>
template <class Request, class Interface, class Multi>
struct RequestData : NonCopyable {
typedef ErrorOr<REPLY_TYPE(Request)> Reply;
@ -187,7 +255,9 @@ struct RequestData : NonCopyable {
static void maybeDuplicateTSSRequest(RequestStream<Request> const* stream,
Request& request,
QueueModel* model,
Future<Reply> ssResponse) {
Future<Reply> ssResponse,
Reference<MultiInterface<Multi>> alternatives,
RequestStream<Request> Interface::*channel) {
if (model) {
// Send parallel request to TSS pair, if it exists
Optional<TSSEndpointData> tssData = model->getTssData(stream->getEndpoint().token.first());
@ -198,34 +268,43 @@ struct RequestData : NonCopyable {
// FIXME: optimize to avoid creating new netNotifiedQueue for each message
RequestStream<Request> tssRequestStream(tssData.get().endpoint);
Future<ErrorOr<REPLY_TYPE(Request)>> fTssResult = tssRequestStream.tryGetReply(request);
model->addActor.send(tssComparison(request, ssResponse, fTssResult, tssData.get()));
model->addActor.send(tssComparison(request,
ssResponse,
fTssResult,
tssData.get(),
stream->getEndpoint().token.first(),
alternatives,
channel));
}
}
}
// Initializes the request state and starts it, possibly after a backoff delay
void startRequest(double backoff,
bool triedAllOptions,
RequestStream<Request> const* stream,
Request& request,
QueueModel* model) {
void startRequest(
double backoff,
bool triedAllOptions,
RequestStream<Request> const* stream,
Request& request,
QueueModel* model,
Reference<MultiInterface<Multi>> alternatives, // alternatives and channel passed through for TSS check
RequestStream<Request> Interface::*channel) {
modelHolder = Reference<ModelHolder>();
requestStarted = false;
if (backoff > 0) {
response = mapAsync<Void, std::function<Future<Reply>(Void)>, Reply>(
delay(backoff), [this, stream, &request, model](Void _) {
delay(backoff), [this, stream, &request, model, alternatives, channel](Void _) {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
Future<Reply> resp = stream->tryGetReply(request);
maybeDuplicateTSSRequest(stream, request, model, resp);
maybeDuplicateTSSRequest(stream, request, model, resp, alternatives, channel);
return resp;
});
} else {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
response = stream->tryGetReply(request);
maybeDuplicateTSSRequest(stream, request, model, response);
maybeDuplicateTSSRequest(stream, request, model, response, alternatives, channel);
}
requestProcessed = false;
@ -363,8 +442,8 @@ Future<REPLY_TYPE(Request)> loadBalance(
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
QueueModel* model = nullptr) {
state RequestData<Request> firstRequestData;
state RequestData<Request> secondRequestData;
state RequestData<Request, Interface, Multi> firstRequestData;
state RequestData<Request, Interface, Multi> secondRequestData;
state Optional<uint64_t> firstRequestEndpoint;
state Future<Void> secondDelay = Never();
@ -577,7 +656,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
firstRequestEndpoint = Optional<uint64_t>();
} else if (firstRequestData.isValid()) {
// Issue a second request, the first one is taking a long time.
secondRequestData.startRequest(backoff, triedAllOptions, stream, request, model);
secondRequestData.startRequest(backoff, triedAllOptions, stream, request, model, alternatives, channel);
state bool firstFinished = false;
loop choose {
@ -606,7 +685,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
}
} else {
// Issue a request, if it takes too long to get a reply, go around the loop
firstRequestData.startRequest(backoff, triedAllOptions, stream, request, model);
firstRequestData.startRequest(backoff, triedAllOptions, stream, request, model, alternatives, channel);
firstRequestEndpoint = stream->getEndpoint().token.first();
loop {

View File

@ -29,6 +29,15 @@
#include "fdbrpc/Stats.h"
// refcounted + noncopyable because both DatabaseContext and individual endpoints share ownership
struct DetailedTSSMismatch {
UID mismatchId;
double timestamp;
std::string traceString;
DetailedTSSMismatch(UID mismatchId, double timestamp, std::string traceString)
: mismatchId(mismatchId), timestamp(timestamp), traceString(traceString) {}
};
struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
CounterCollection cc;
Counter requests;
@ -49,6 +58,8 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
std::unordered_map<int, uint64_t> ssErrorsByCode;
std::unordered_map<int, uint64_t> tssErrorsByCode;
std::vector<DetailedTSSMismatch> detailedMismatches;
void ssError(int code) {
++ssErrors;
ssErrorsByCode[code]++;
@ -62,6 +73,16 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
template <class Req>
void recordLatency(const Req& req, double ssLatency, double tssLatency);
// only record a small number of the detailed mismatches per client per metrics window
bool shouldRecordDetailedMismatch() {
++mismatches;
return (mismatches.getIntervalDelta() < 5);
}
void recordDetailedMismatchData(UID mismatchUID, std::string traceString) {
detailedMismatches.push_back(DetailedTSSMismatch(mismatchUID, now(), traceString));
}
void clear() {
SSgetValueLatency.clear();
SSgetKeyLatency.clear();
@ -73,6 +94,8 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
tssErrorsByCode.clear();
ssErrorsByCode.clear();
detailedMismatches.clear();
}
TSSMetrics()
@ -81,9 +104,13 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
SSgetKeyValuesLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000), TSSgetKeyValuesLatency(1000) {}
};
// part of the contract of this function is that if there is a mismatch, the implementation needs to record a trace
// event with the specified severity and tssId in the event.
template <class Rep>
bool TSS_doCompare(const Rep& src, const Rep& tss);
template <class Req>
const char* TSS_mismatchTraceName(const Req& req);
template <class Req, class Rep>
bool TSS_doCompare(const Req& req, const Rep& src, const Rep& tss, Severity traceSeverity, UID tssId);
void TSS_traceMismatch(TraceEvent& event, const Req& req, const Rep& src, const Rep& tss);
#endif

View File

@ -235,6 +235,8 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( BASIC_LOAD_BALANCE_BUCKETS, 40 ); //proxies bin recent GRV requests into 40 time bins
init( BASIC_LOAD_BALANCE_COMPUTE_PRECISION, 10000 ); //determines how much of the LB usage is holding the CPU usage of the proxy
init( LOAD_BALANCE_TSS_TIMEOUT, 5.0 );
init( LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS, true ); if( randomize && BUGGIFY ) LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS = false; // Whether the client should validate the SS teams all agree on TSS mismatch
init( LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL, false ); if( randomize && BUGGIFY ) LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL = true; // If true, saves the full details of the mismatch in a trace event. If false, saves them in the DB and the trace event references the DB row.
// Health Monitor
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;

View File

@ -251,6 +251,8 @@ public:
double BASIC_LOAD_BALANCE_MIN_REQUESTS;
double BASIC_LOAD_BALANCE_MIN_CPU;
double LOAD_BALANCE_TSS_TIMEOUT;
bool LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS;
bool LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL;
// Health Monitor
int FAILURE_DETECTION_DELAY;

View File

@ -463,12 +463,14 @@ public:
bool isEnabled() const { return enabled; }
TraceEvent &setErrorKind(ErrorKind errorKind);
TraceEvent& setErrorKind(ErrorKind errorKind);
explicit operator bool() const { return enabled; }
void log();
void disable() { enabled = false; } // Disables the trace event so it doesn't get
~TraceEvent(); // Actually logs the event
// Return the number of invocations of TraceEvent() at the specified logging level.
@ -476,6 +478,8 @@ public:
std::unique_ptr<DynamicEventMetric> tmpEventMetric; // This just just a place to store fields
const TraceEventFields& getFields() const { return fields; }
private:
bool initialized;
bool enabled;
@ -491,7 +495,7 @@ private:
int maxFieldLength;
int maxEventLength;
int timeIndex;
int errorKindIndex { -1 };
int errorKindIndex{ -1 };
void setSizeLimits();