Change Feed TSS Support (#8384)

* Change Feed TSS Support

* bug fixing for rare mismatch cases

* Adding rollback handling to tss change feed comparison
This commit is contained in:
Josh Slocum 2022-10-12 10:23:51 -05:00 committed by GitHub
parent fec05e5bc6
commit 96574bacaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 342 additions and 22 deletions

View File

@ -32,6 +32,7 @@
#include <vector>
#include "boost/algorithm/string.hpp"
#include "flow/CodeProbe.h"
#include "fmt/format.h"
@ -48,6 +49,7 @@
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/IKnobCollection.h"
@ -188,6 +190,8 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe
TSSEndpointData(tssi.id(), tssi.getMappedKeyValues.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getKeyValuesStream.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.changeFeedStream.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.changeFeedStream.getEndpoint(), metrics));
// non-data requests duplicated for load
queueModel.updateTssEndpoint(ssi.watchValue.getEndpoint().token.first(),
@ -198,6 +202,12 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe
TSSEndpointData(tssi.id(), tssi.getReadHotRanges.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getRangeSplitPoints.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getRangeSplitPoints.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.overlappingChangeFeeds.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.overlappingChangeFeeds.getEndpoint(), metrics));
// duplicated to ensure feed data cleanup
queueModel.updateTssEndpoint(ssi.changeFeedPop.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.changeFeedPop.getEndpoint(), metrics));
}
}
@ -9013,6 +9023,273 @@ void DatabaseContext::setDesiredChangeFeedVersion(Version v) {
}
}
// Because two storage servers, depending on the shard map, can have different representations of a clear at the same
// version depending on their shard maps at the time of the mutation, it is non-trivial to directly compare change feed
// streams. Instead we compare the presence of data at each version. This both saves on cpu cost of validation, and
// because historically most change feed corruption bugs are the absence of entire versions, not a subset of mutations
// within a version.
struct ChangeFeedTSSValidationData {
PromiseStream<Version> ssStreamSummary;
ReplyPromiseStream<ChangeFeedStreamReply> tssStream;
Future<Void> validatorFuture;
std::deque<std::pair<Version, Version>> rollbacks;
Version popVersion = invalidVersion;
bool done = false;
ChangeFeedTSSValidationData() {}
ChangeFeedTSSValidationData(ReplyPromiseStream<ChangeFeedStreamReply> tssStream) : tssStream(tssStream) {}
void updatePopped(Version newPopVersion) { popVersion = std::max(popVersion, newPopVersion); }
bool checkRollback(const MutationsAndVersionRef& m) {
if (m.mutations.size() == 1 && m.mutations.back().param1 == lastEpochEndPrivateKey) {
if (rollbacks.empty() || rollbacks.back().second < m.version) {
Version rollbackVersion;
BinaryReader br(m.mutations.back().param2, Unversioned());
br >> rollbackVersion;
if (!rollbacks.empty()) {
ASSERT(rollbacks.back().second <= rollbackVersion);
}
rollbacks.push_back({ rollbackVersion, m.version });
}
return true;
} else {
return false;
}
}
bool shouldAddMutation(const MutationsAndVersionRef& m) {
return !done && !m.mutations.empty() && !checkRollback(m);
}
bool isRolledBack(Version v) {
return !rollbacks.empty() && rollbacks.front().first < v && rollbacks.front().second > v;
}
void send(const ChangeFeedStreamReply& ssReply) {
if (done) {
return;
}
updatePopped(ssReply.popVersion);
for (auto& it : ssReply.mutations) {
if (shouldAddMutation(it)) {
ssStreamSummary.send(it.version);
}
}
}
void complete() {
done = true;
// destroy TSS stream to stop server actor
tssStream.reset();
}
};
void handleTSSChangeFeedMismatch(const ChangeFeedStreamRequest& request,
const TSSEndpointData& tssData,
int64_t matchesFound,
Version lastMatchingVersion,
Version ssVersion,
Version tssVersion,
Version popVersion) {
if (request.canReadPopped) {
// There is a known issue where this can return different data between an SS and TSS when a feed was popped but
// the SS restarted before the pop could be persisted, for reads that can read popped data. As such, only count
// this as a mismatch when !req.canReadPopped
return;
}
CODE_PROBE(true, "TSS mismatch in stream comparison");
if (tssData.metrics->shouldRecordDetailedMismatch()) {
TraceEvent mismatchEvent(
(g_network->isSimulated() && g_simulator->tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError,
"TSSMismatchChangeFeedStream");
mismatchEvent.setMaxEventLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE);
// request info
mismatchEvent.detail("TSSID", tssData.tssId);
mismatchEvent.detail("FeedID", request.rangeID);
mismatchEvent.detail("BeginVersion", request.begin);
mismatchEvent.detail("EndVersion", request.end);
mismatchEvent.detail("StartKey", request.range.begin);
mismatchEvent.detail("EndKey", request.range.end);
mismatchEvent.detail("CanReadPopped", request.canReadPopped);
mismatchEvent.detail("PopVersion", popVersion);
mismatchEvent.detail("DebugUID", request.debugUID);
// mismatch info
mismatchEvent.detail("MatchesFound", matchesFound);
mismatchEvent.detail("LastMatchingVersion", lastMatchingVersion);
mismatchEvent.detail("SSVersion", ssVersion);
mismatchEvent.detail("TSSVersion", tssVersion);
CODE_PROBE(FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL,
"Tracing Full TSS Feed Mismatch in stream comparison");
CODE_PROBE(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL,
"Tracing Partial TSS Feed Mismatch in stream comparison and storing the rest in FDB");
if (!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL) {
mismatchEvent.disable();
UID mismatchUID = deterministicRandom()->randomUniqueID();
tssData.metrics->recordDetailedMismatchData(mismatchUID, mismatchEvent.getFields().toString());
// record a summarized trace event instead
TraceEvent summaryEvent(
(g_network->isSimulated() && g_simulator->tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError,
"TSSMismatchChangeFeedStream");
summaryEvent.detail("TSSID", tssData.tssId)
.detail("MismatchId", mismatchUID)
.detail("FeedDebugUID", request.debugUID);
}
}
}
ACTOR Future<Void> changeFeedTSSValidator(ChangeFeedStreamRequest req,
Optional<ChangeFeedTSSValidationData>* data,
TSSEndpointData tssData) {
state bool ssDone = false;
state bool tssDone = false;
state std::deque<Version> ssSummary;
state std::deque<Version> tssSummary;
ASSERT(data->present());
state int64_t matchesFound = 0;
state Version lastMatchingVersion = req.begin - 1;
loop {
// If SS stream gets error, whole stream data gets reset, so it's ok to cancel this actor
if (!ssDone && ssSummary.empty()) {
try {
Version next = waitNext(data->get().ssStreamSummary.getFuture());
ssSummary.push_back(next);
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
data->get().complete();
if (e.code() != error_code_operation_cancelled) {
tssData.metrics->ssError(e.code());
}
throw e;
}
ssDone = true;
if (tssDone) {
data->get().complete();
return Void();
}
}
}
if (!tssDone && tssSummary.empty()) {
try {
choose {
when(ChangeFeedStreamReply nextTss = waitNext(data->get().tssStream.getFuture())) {
data->get().updatePopped(nextTss.popVersion);
for (auto& it : nextTss.mutations) {
if (data->get().shouldAddMutation(it)) {
tssSummary.push_back(it.version);
}
}
}
// if ss has result, tss needs to return it
when(wait((ssDone || !ssSummary.empty()) ? delay(2.0 * FLOW_KNOBS->LOAD_BALANCE_TSS_TIMEOUT)
: Never())) {
++tssData.metrics->tssTimeouts;
data->get().complete();
return Void();
}
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
if (e.code() == error_code_end_of_stream) {
tssDone = true;
if (ssDone) {
data->get().complete();
return Void();
}
} else {
tssData.metrics->tssError(e.code());
data->get().complete();
return Void();
}
}
}
// handle rollbacks and concurrent pops
while (!ssSummary.empty() &&
(ssSummary.front() < data->get().popVersion || data->get().isRolledBack(ssSummary.front()))) {
ssSummary.pop_front();
}
while (!tssSummary.empty() &&
(tssSummary.front() < data->get().popVersion || data->get().isRolledBack(tssSummary.front()))) {
tssSummary.pop_front();
}
while (!ssSummary.empty() && !tssSummary.empty()) {
CODE_PROBE(true, "Comparing TSS change feed data");
if (ssSummary.front() != tssSummary.front()) {
CODE_PROBE(true, "TSS change feed mismatch");
handleTSSChangeFeedMismatch(req,
tssData,
matchesFound,
lastMatchingVersion,
ssSummary.front(),
tssSummary.front(),
data->get().popVersion);
data->get().complete();
return Void();
}
matchesFound++;
lastMatchingVersion = ssSummary.front();
ssSummary.pop_front();
tssSummary.pop_front();
while (!data->get().rollbacks.empty() && data->get().rollbacks.front().second <= lastMatchingVersion) {
data->get().rollbacks.pop_front();
}
}
ASSERT(!ssDone || !tssDone); // both shouldn't be done, otherwise we shouldn't have looped
if ((ssDone && !tssSummary.empty()) || (tssDone && !ssSummary.empty())) {
CODE_PROBE(true, "TSS change feed mismatch at end of stream");
handleTSSChangeFeedMismatch(req,
tssData,
matchesFound,
lastMatchingVersion,
ssDone ? -1 : ssSummary.front(),
tssDone ? -1 : tssSummary.front(),
data->get().popVersion);
data->get().complete();
return Void();
}
}
}
void maybeDuplicateTSSChangeFeedStream(ChangeFeedStreamRequest& req,
const RequestStream<ChangeFeedStreamRequest>& stream,
QueueModel* model,
Optional<ChangeFeedTSSValidationData>* tssData) {
if (model) {
Optional<TSSEndpointData> tssPair = model->getTssData(stream.getEndpoint().token.first());
if (tssPair.present()) {
CODE_PROBE(true, "duplicating feed stream to TSS");
resetReply(req);
RequestStream<ChangeFeedStreamRequest> tssRequestStream(tssPair.get().endpoint);
*tssData = Optional<ChangeFeedTSSValidationData>(
ChangeFeedTSSValidationData(tssRequestStream.getReplyStream(req)));
// tie validator actor to the lifetime of the stream being active
tssData->get().validatorFuture = changeFeedTSSValidator(req, tssData, tssPair.get());
}
}
}
ChangeFeedStorageData::~ChangeFeedStorageData() {
if (context) {
context->changeFeedUpdaters.erase(interfToken);
@ -9134,7 +9411,8 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
Version end,
Reference<ChangeFeedData> feedData,
Reference<ChangeFeedStorageData> storageData,
UID debugUID) {
UID debugUID,
Optional<ChangeFeedTSSValidationData>* tssData) {
// calling lastReturnedVersion's callbacks could cause us to be cancelled
state Promise<Void> refresh = feedData->refresh;
@ -9178,6 +9456,9 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
if (rep.popVersion > feedData->popVersion) {
feedData->popVersion = rep.popVersion;
}
if (tssData->present()) {
tssData->get().updatePopped(rep.popVersion);
}
if (lastEmpty != invalidVersion && !results.isEmpty()) {
for (auto& it : feedData->storageData) {
@ -9192,6 +9473,10 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
while (resultLoc < rep.mutations.size()) {
wait(results.onEmpty());
if (rep.mutations[resultLoc].version >= nextVersion) {
if (tssData->present() && tssData->get().shouldAddMutation(rep.mutations[resultLoc])) {
tssData->get().ssStreamSummary.send(rep.mutations[resultLoc].version);
}
results.send(rep.mutations[resultLoc]);
if (DEBUG_CF_CLIENT_TRACE) {
@ -9388,6 +9673,11 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
state std::vector<Future<Void>> fetchers(interfs.size());
state std::vector<Future<Void>> onErrors(interfs.size());
state std::vector<MutationAndVersionStream> streams(interfs.size());
state std::vector<Optional<ChangeFeedTSSValidationData>> tssDatas;
tssDatas.reserve(interfs.size());
for (int i = 0; i < interfs.size(); i++) {
tssDatas.push_back({});
}
CODE_PROBE(interfs.size() > 10, "Large change feed merge cursor");
CODE_PROBE(interfs.size() > 100, "Very large change feed merge cursor");
@ -9395,12 +9685,12 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
state UID mergeCursorUID = UID();
state std::vector<UID> debugUIDs;
results->streams.clear();
for (auto& it : interfs) {
for (int i = 0; i < interfs.size(); i++) {
ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = *begin;
req.end = end;
req.range = it.second;
req.range = interfs[i].second;
req.canReadPopped = canReadPopped;
// divide total buffer size among sub-streams, but keep individual streams large enough to be efficient
req.replyBufferSize = replyBufferSize / interfs.size();
@ -9412,7 +9702,11 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
mergeCursorUID =
UID(mergeCursorUID.first() ^ req.debugUID.first(), mergeCursorUID.second() ^ req.debugUID.second());
results->streams.push_back(it.first.changeFeedStream.getReplyStream(req));
results->streams.push_back(interfs[i].first.changeFeedStream.getReplyStream(req));
maybeDuplicateTSSChangeFeedStream(req,
interfs[i].first.changeFeedStream,
db->enableLocalityLoadBalance ? &db->queueModel : nullptr,
&tssDatas[i]);
}
results->maxSeenVersion = invalidVersion;
@ -9449,7 +9743,8 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
end,
results,
results->storageData[i],
debugUIDs[i]);
debugUIDs[i],
&tssDatas[i]);
}
wait(waitForAny(onErrors) || mergeChangeFeedStreamInternal(results, interfs, streams, begin, end, mergeCursorUID));
@ -9503,7 +9798,8 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
Reference<ChangeFeedData> results,
Key rangeID,
Version* begin,
Version end) {
Version end,
Optional<ChangeFeedTSSValidationData>* tssData) {
state Promise<Void> refresh = results->refresh;
ASSERT(results->streams.size() == 1);
@ -9538,6 +9834,9 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
if (feedReply.popVersion > results->popVersion) {
results->popVersion = feedReply.popVersion;
}
if (tssData->present()) {
tssData->get().updatePopped(feedReply.popVersion);
}
// don't send completely empty set of mutations to promise stream
bool anyMutations = false;
@ -9552,6 +9851,10 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
// stream. Anything with mutations should be strictly greater than lastReturnedVersion
ASSERT(feedReply.mutations.front().version > results->lastReturnedVersion.get());
if (tssData->present()) {
tssData->get().send(feedReply);
}
results->mutations.send(
Standalone<VectorRef<MutationsAndVersionRef>>(feedReply.mutations, feedReply.arena));
@ -9603,6 +9906,7 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
bool canReadPopped) {
state Database cx(db);
state ChangeFeedStreamRequest req;
state Optional<ChangeFeedTSSValidationData> tssData;
req.rangeID = rangeID;
req.begin = *begin;
req.end = end;
@ -9636,7 +9940,11 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
}
refresh.send(Void());
wait(results->streams[0].onError() || singleChangeFeedStreamInternal(range, results, rangeID, begin, end));
maybeDuplicateTSSChangeFeedStream(
req, interf.changeFeedStream, cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr, &tssData);
wait(results->streams[0].onError() ||
singleChangeFeedStreamInternal(range, results, rangeID, begin, end, &tssData));
return Void();
}
@ -9982,6 +10290,8 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, Ke
return Void();
}
auto model = cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr;
bool foundFailed = false;
for (int i = 0; i < locations.size() && !foundFailed; i++) {
for (int j = 0; j < locations[i].locations->size() && !foundFailed; j++) {
@ -9990,6 +10300,15 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, Ke
.isFailed()) {
foundFailed = true;
}
// for now, if any of popping SS has a TSS pair, just always use backup method
if (model && model
->getTssData(locations[i]
.locations->get(j, &StorageServerInterface::changeFeedPop)
.getEndpoint()
.token.first())
.present()) {
foundFailed = true;
}
}
}

View File

@ -342,7 +342,7 @@ void TSS_traceMismatch(TraceEvent& event,
// change feed
template <>
bool TSS_doCompare(const OverlappingChangeFeedsReply& src, const OverlappingChangeFeedsReply& tss) {
ASSERT(false);
// We duplicate for load, no need to validate replies
return true;
}

View File

@ -2863,20 +2863,6 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
}
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "ChangeFeedMutationsDone", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
.detail("FirstVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.front().version)
.detail("LastVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.back().version)
.detail("Count", reply.mutations.size())
.detail("GotAll", gotAll)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
}
if (DEBUG_CF_MISSING(req.rangeID, req.range, req.begin, reply.mutations.back().version) && !req.canReadPopped) {
bool foundVersion = false;
bool foundKey = false;
@ -2928,6 +2914,21 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
reply.popVersion = feedInfo->emptyVersion + 1;
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "ChangeFeedMutationsDone", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
.detail("FirstVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.front().version)
.detail("LastVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.back().version)
.detail("PopVersion", reply.popVersion)
.detail("Count", reply.mutations.size())
.detail("GotAll", gotAll)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
}
// If the SS's version advanced at all during any of the waits, the read from memory may have missed some
// mutations, so gotAll can only be true if data->version didn't change over the course of this actor
return std::make_pair(reply, gotAll);