Added Streaming Read comparison for TSS

This commit is contained in:
Josh Slocum 2021-06-24 11:48:20 -05:00
parent 6c84594e2f
commit 9f1afbb709
7 changed files with 262 additions and 3 deletions

View File

@ -154,6 +154,8 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe
TSSEndpointData(tssi.id(), tssi.getKeyValues.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.watchValue.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.watchValue.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getKeyValuesStream.getEndpoint(), metrics));
}
}
@ -166,6 +168,7 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
queueModel.removeTssEndpoint(ssi.getKey.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getKeyValues.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.watchValue.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first());
}
}
@ -3498,6 +3501,174 @@ ACTOR Future<RangeResult> getRange(Database cx,
}
}
template <class StreamReply>
struct TSSDuplicateStreamData {
PromiseStream<StreamReply> stream;
Promise<Void> tssComparisonDone;
// empty constructor for optional?
TSSDuplicateStreamData() {}
TSSDuplicateStreamData(PromiseStream<StreamReply> stream) : stream(stream) {}
bool done() { return tssComparisonDone.getFuture().isReady(); }
void setDone() {
if (tssComparisonDone.canBeSet()) {
tssComparisonDone.send(Void());
}
}
~TSSDuplicateStreamData() {}
};
// Error tracking here is weird, and latency doesn't really mean the same thing here as it does with normal tss
// comparisons, so this is pretty much just counting mismatches
ACTOR template <class Request>
static Future<Void> tssStreamComparison(Request request,
TSSDuplicateStreamData<REPLYSTREAM_TYPE(Request)> streamData,
ReplyPromiseStream<REPLYSTREAM_TYPE(Request)> tssReplyStream,
TSSEndpointData tssData) {
state bool ssEndOfStream = false;
state bool tssEndOfStream = false;
state Optional<REPLYSTREAM_TYPE(Request)> ssReply = Optional<REPLYSTREAM_TYPE(Request)>();
state Optional<REPLYSTREAM_TYPE(Request)> tssReply = Optional<REPLYSTREAM_TYPE(Request)>();
loop {
// reset replies
ssReply = Optional<REPLYSTREAM_TYPE(Request)>();
tssReply = Optional<REPLYSTREAM_TYPE(Request)>();
state double startTime = now();
// wait for ss response
try {
REPLYSTREAM_TYPE(Request) _ssReply = waitNext(streamData.stream.getFuture());
ssReply = _ssReply;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
streamData.setDone();
throw;
}
if (e.code() == error_code_end_of_stream) {
// ss response will be set to empty, to compare to the SS response if it wasn't empty and cause a
// mismatch
ssEndOfStream = true;
} else {
tssData.metrics->ssError(e.code());
}
TEST(e.code() != error_code_end_of_stream); // SS got error in TSS stream comparison
}
state double sleepTime = std::max(startTime + FLOW_KNOBS->LOAD_BALANCE_TSS_TIMEOUT - now(), 0.0);
// wait for tss response
try {
choose {
when(REPLYSTREAM_TYPE(Request) _tssReply = waitNext(tssReplyStream.getFuture())) {
tssReply = _tssReply;
}
when(wait(delay(sleepTime))) {
++tssData.metrics->tssTimeouts;
TEST(true); // Got TSS timeout in stream comparison
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
streamData.setDone();
throw;
}
if (e.code() == error_code_end_of_stream) {
// tss response will be set to empty, to compare to the SS response if it wasn't empty and cause a
// mismatch
tssEndOfStream = true;
} else {
tssData.metrics->tssError(e.code());
}
TEST(e.code() != error_code_end_of_stream); // TSS got error in TSS stream comparison
}
if (!ssEndOfStream || !tssEndOfStream) {
++tssData.metrics->streamComparisons;
}
// if both are successful, compare
if (ssReply.present() && tssReply.present()) {
// compare results
// FIXME: this code is pretty much identical to LoadBalance.h
// TODO could add team check logic in if we added synchronous way to turn this into a fixed getRange request
// and send it to the whole team and compare? I think it's fine to skip that for streaming though
TEST(ssEndOfStream != tssEndOfStream); // SS or TSS stream finished early!
// skip tss comparison if both are end of stream
if ((!ssEndOfStream || !tssEndOfStream) && !TSS_doCompare(ssReply.get(), tssReply.get())) {
TEST(true); // TSS mismatch in stream comparison
TraceEvent mismatchEvent(
(g_network->isSimulated() && g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError,
TSS_mismatchTraceName(request));
mismatchEvent.setMaxEventLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE);
mismatchEvent.detail("TSSID", tssData.tssId);
if (tssData.metrics->shouldRecordDetailedMismatch()) {
TSS_traceMismatch(mismatchEvent, request, ssReply.get(), tssReply.get());
TEST(FLOW_KNOBS
->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL); // Tracing Full TSS Mismatch in stream comparison
TEST(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL); // Tracing Partial TSS Mismatch in stream
// comparison and storing the rest in FDB
if (!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL) {
mismatchEvent.disable();
UID mismatchUID = deterministicRandom()->randomUniqueID();
tssData.metrics->recordDetailedMismatchData(mismatchUID, mismatchEvent.getFields().toString());
// record a summarized trace event instead
TraceEvent summaryEvent((g_network->isSimulated() &&
g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError,
TSS_mismatchTraceName(request));
summaryEvent.detail("TSSID", tssData.tssId).detail("MismatchId", mismatchUID);
}
} else {
// don't record trace event
mismatchEvent.disable();
}
streamData.setDone();
return Void();
}
}
if (!ssReply.present() || !tssReply.present() || ssEndOfStream || tssEndOfStream) {
// if both streams don't still have more data, stop comparison
streamData.setDone();
return Void();
}
}
}
// Currently only used for GetKeyValuesStream but could easily be plugged for other stream types
// User of the stream has to forward the SS's responses to the returned promise stream, if it is set
template <class Request>
Optional<TSSDuplicateStreamData<REPLYSTREAM_TYPE(Request)>>
maybeDuplicateTSSStreamFragment(Request& req, QueueModel* model, RequestStream<Request> const* ssStream) {
if (model) {
Optional<TSSEndpointData> tssData = model->getTssData(ssStream->getEndpoint().token.first());
if (tssData.present()) {
TEST(true); // duplicating stream to TSS
resetReply(req);
// FIXME: optimize to avoid creating new netNotifiedQueueWithAcknowledgements for each stream duplication
RequestStream<Request> tssRequestStream(tssData.get().endpoint);
ReplyPromiseStream<REPLYSTREAM_TYPE(Request)> tssReplyStream = tssRequestStream.getReplyStream(req);
PromiseStream<REPLYSTREAM_TYPE(Request)> ssDuplicateReplyStream;
TSSDuplicateStreamData<REPLYSTREAM_TYPE(Request)> streamData(ssDuplicateReplyStream);
model->addActor.send(tssStreamComparison(req, streamData, tssReplyStream, tssData.get()));
return Optional<TSSDuplicateStreamData<REPLYSTREAM_TYPE(Request)>>(streamData);
}
}
return Optional<TSSDuplicateStreamData<REPLYSTREAM_TYPE(Request)>>();
}
// Streams all of the KV pairs in a target key range into a ParallelStream fragment
ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment* results,
Database cx,
@ -3518,6 +3689,7 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
loop {
const KeyRange& range = locations[shard].first;
state Optional<TSSDuplicateStreamData<GetKeyValuesStreamReply>> tssDuplicateStream;
state GetKeyValuesStreamRequest req;
req.version = version;
req.begin = firstGreaterOrEqual(range.begin);
@ -3526,6 +3698,9 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
req.limit = reverse ? -CLIENT_KNOBS->REPLY_BYTE_LIMIT : CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.limitBytes = std::numeric_limits<int>::max();
// keep shard's arena around in case of async tss comparison
req.arena.dependsOn(range.arena());
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
// FIXME: buggify byte limits on internal functions that use them, instead of globally
@ -3589,6 +3764,12 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
locations[shard]
.second->get(useIdx, &StorageServerInterface::getKeyValuesStream)
.getReplyStream(req);
tssDuplicateStream = maybeDuplicateTSSStreamFragment(
req,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr,
&locations[shard].second->get(useIdx, &StorageServerInterface::getKeyValuesStream));
state bool breakAgain = false;
loop {
wait(results->onEmpty());
@ -3596,6 +3777,9 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
choose {
when(wait(cx->connectionFileChanged())) {
results->sendError(transaction_too_old());
if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) {
tssDuplicateStream.get().stream.sendError(transaction_too_old());
}
return Void();
}
@ -3605,9 +3789,15 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
} catch (Error& e) {
++cx->transactionPhysicalReadsCompleted;
if (e.code() == error_code_broken_promise) {
if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) {
tssDuplicateStream.get().stream.sendError(connection_failed());
}
throw connection_failed();
}
if (e.code() != error_code_end_of_stream) {
if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) {
tssDuplicateStream.get().stream.sendError(e);
}
throw;
}
rep = GetKeyValuesStreamReply();
@ -3617,6 +3807,17 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
"TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
RangeResult output(RangeResultRef(rep.data, rep.more), rep.arena);
if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) {
// shallow copy the reply with an arena depends, and send it to the duplicate stream for TSS
GetKeyValuesStreamReply replyCopy;
replyCopy.version = rep.version;
replyCopy.more = rep.more;
replyCopy.cached = rep.cached;
replyCopy.arena.dependsOn(rep.arena);
replyCopy.data.append(replyCopy.arena, rep.data.begin(), rep.data.size());
tssDuplicateStream.get().stream.send(replyCopy);
}
int64_t bytes = 0;
for (const KeyValueRef& kv : output) {
bytes += kv.key.size() + kv.value.size();
@ -3674,6 +3875,9 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
output.readThrough = reverse ? keys.begin : keys.end;
results->send(std::move(output));
results->finish();
if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) {
tssDuplicateStream.get().stream.sendError(end_of_stream());
}
return Void();
}
keys = KeyRangeRef(begin, end);
@ -3700,6 +3904,10 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
break;
}
} catch (Error& e) {
// send errors to tss duplicate stream, including actor_cancelled
if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) {
tssDuplicateStream.get().stream.sendError(e);
}
if (e.code() == error_code_actor_cancelled) {
throw;
}

View File

@ -145,6 +145,47 @@ void TSS_traceMismatch(TraceEvent& event,
.detail("Version", req.version)
.detail("Limit", req.limit)
.detail("LimitBytes", req.limitBytes)
.setMaxFieldLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE * 4 / 10)
.detail("SSReply", ssResultsString)
.detail("TSSReply", tssResultsString);
}
// streaming range reads
template <>
bool TSS_doCompare(const GetKeyValuesStreamReply& src, const GetKeyValuesStreamReply& tss) {
return src.more == tss.more && src.data == tss.data;
}
template <>
const char* TSS_mismatchTraceName(const GetKeyValuesStreamRequest& req) {
return "TSSMismatchGetKeyValuesStream";
}
// TODO this is all duplicated from above, simplify?
template <>
void TSS_traceMismatch(TraceEvent& event,
const GetKeyValuesStreamRequest& req,
const GetKeyValuesStreamReply& src,
const GetKeyValuesStreamReply& tss) {
std::string ssResultsString = format("(%d)%s:\n", src.data.size(), src.more ? "+" : "");
for (auto& it : src.data) {
ssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
}
std::string tssResultsString = format("(%d)%s:\n", tss.data.size(), tss.more ? "+" : "");
for (auto& it : tss.data) {
tssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
}
event
.detail(
"Begin",
format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
.detail("End",
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
.detail("Version", req.version)
.detail("Limit", req.limit)
.detail("LimitBytes", req.limitBytes)
.setMaxFieldLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE * 4 / 10)
.detail("SSReply", ssResultsString)
.detail("TSSReply", tssResultsString);
}
@ -290,6 +331,9 @@ void TSSMetrics::recordLatency(const ReadHotSubRangeRequest& req, double ssLaten
template <>
void TSSMetrics::recordLatency(const SplitRangeRequest& req, double ssLatency, double tssLatency) {}
template <>
void TSSMetrics::recordLatency(const GetKeyValuesStreamRequest& req, double ssLatency, double tssLatency) {}
// -------------------
TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {

View File

@ -147,6 +147,7 @@ Future<Void> tssComparison(Req req,
? SevWarnAlways
: SevError,
TSS_mismatchTraceName(req));
mismatchEvent.setMaxEventLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE);
mismatchEvent.detail("TSSID", tssData.tssId);
if (FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS && ssTeam->size() > 1) {

View File

@ -41,6 +41,7 @@ struct DetailedTSSMismatch {
struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
CounterCollection cc;
Counter requests;
Counter streamComparisons;
Counter ssErrors;
Counter tssErrors;
Counter tssTimeouts;
@ -99,9 +100,10 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
}
TSSMetrics()
: cc("TSSClientMetrics"), requests("Requests", cc), ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc),
tssTimeouts("TSSTimeouts", cc), mismatches("Mismatches", cc), SSgetValueLatency(1000), SSgetKeyLatency(1000),
SSgetKeyValuesLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000), TSSgetKeyValuesLatency(1000) {}
: cc("TSSClientMetrics"), requests("Requests", cc), streamComparisons("StreamComparisons", cc),
ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc), tssTimeouts("TSSTimeouts", cc),
mismatches("Mismatches", cc), SSgetValueLatency(1000), SSgetKeyLatency(1000), SSgetKeyValuesLatency(1000),
TSSgetValueLatency(1000), TSSgetKeyLatency(1000), TSSgetKeyValuesLatency(1000) {}
};
template <class Rep>

View File

@ -533,6 +533,8 @@ public:
}
}
void reset() { *this = ReplyPromiseStream<T>(); }
private:
NetNotifiedQueueWithAcknowledgements<T>* queue;
SAV<Void>* errors;

View File

@ -239,6 +239,7 @@ void FlowKnobs::initialize(Randomize _randomize, IsSimulated _isSimulated) {
init( LOAD_BALANCE_TSS_TIMEOUT, 5.0 );
init( LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS, true ); if( randomize && BUGGIFY ) LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS = false; // Whether the client should validate the SS teams all agree on TSS mismatch
init( LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL, false ); if( randomize && BUGGIFY ) LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL = true; // If true, saves the full details of the mismatch in a trace event. If false, saves them in the DB and the trace event references the DB row.
init( TSS_LARGE_TRACE_SIZE, 50000 );
// Health Monitor
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;

View File

@ -280,6 +280,7 @@ public:
double LOAD_BALANCE_TSS_TIMEOUT;
bool LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS;
bool LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL;
int TSS_LARGE_TRACE_SIZE;
// Health Monitor
int FAILURE_DETECTION_DELAY;