Adding extra debugid to change feed stream requests to track things better

This commit is contained in:
Josh Slocum 2022-01-27 18:53:14 -06:00
parent bc7b76b407
commit c26e11c2c3
3 changed files with 26 additions and 12 deletions

View File

@ -7323,7 +7323,8 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
Reference<ChangeFeedData> feedData,
Reference<ChangeFeedStorageData> storageData,
int idx /* TODO REMOVE this param after correctness clean */,
KeyRange range /* TODO REMOVE this param after correctness clean */) {
KeyRange range /* TODO REMOVE this param after correctness clean */,
UID debugID /*TODO REMOVE this param after correctness clean*/) {
// calling lastReturnedVersion's callbacks could cause us to be cancelled
state Promise<Void> refresh = feedData->refresh;
@ -7346,11 +7347,11 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
when(state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
if (DEBUG_CF_VERSION_RANGE(
feedData->id, rep.mutations.front().version, rep.mutations.back().version)) {
fmt::print(" single {0} {1}: response {2} - {3} ({4}), atLatest={5}, rep.atLatest={6}, "
"notAtLatest={7}, "
"minSV={8}\n",
fmt::print(" single {0} {1} {2}: response {3} - {4} ({5}), atLatest={6}, rep.atLatest={7}, "
"notAtLatest={8}, minSV={9}\n",
idx,
interf.id().toString().substr(0, 4),
debugID.toString().substr(0, 8).c_str(),
rep.mutations.front().version,
rep.mutations.back().version,
rep.mutations.size(),
@ -7607,6 +7608,7 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
state std::vector<Future<Void>> onErrors(interfs.size());
state std::vector<MutationAndVersionStream> streams(interfs.size());
std::vector<UID> debugIDs;
results->streams.clear();
for (auto& it : interfs) {
ChangeFeedStreamRequest req;
@ -7614,6 +7616,9 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
req.begin = *begin;
req.end = end;
req.range = it.second;
UID debugID = deterministicRandom()->randomUniqueID();
debugIDs.push_back(debugID);
req.debugID = debugID;
results->streams.push_back(it.first.changeFeedStream.getReplyStream(req));
}
@ -7644,7 +7649,8 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
results,
results->storageData[i],
i,
interfs[i].second);
interfs[i].second,
debugIDs[i]);
}
wait(onCFErrors(onErrors) || doCFMerge(results, interfs, streams, begin, end));
@ -7689,7 +7695,8 @@ ACTOR Future<Void> doSingleCFStream(KeyRange range,
Reference<ChangeFeedData> results,
Key rangeID,
Version* begin,
Version end) {
Version end,
UID debugID /*TODO REMOVE this parameter once BG is correctness clean*/) {
state Promise<Void> refresh = results->refresh;
ASSERT(results->streams.size() == 1);
ASSERT(results->storageData.size() == 1);
@ -7714,8 +7721,9 @@ ACTOR Future<Void> doSingleCFStream(KeyRange range,
// empty versions can come out of order, as we sometimes send explicit empty versions when restarting a
// stream. Anything with mutations should be strictly greater than lastReturnedVersion
if (feedReply.mutations.front().version <= results->lastReturnedVersion.get()) {
fmt::print("out of order mutation for CF {0} from ({1}) {2}! {3} < {4}\n",
fmt::print("out of order mutation for CF {0} Req {1} from ({2}) {3}! {4} < {5}\n",
rangeID.toString().substr(0, 6),
debugID.toString().substr(0, 8),
results->storageData.size(),
results->storageData[0]->id.toString().substr(0, 4).c_str(),
feedReply.mutations.front().version,
@ -7732,8 +7740,8 @@ ACTOR Future<Void> doSingleCFStream(KeyRange range,
}
if (DEBUG_CF_VERSION(rangeID, feedReply.mutations.back().version)) {
fmt::print("CFLR (single): {0} ({1}), atLatest={2}, rep.atLatest={3}, notAtLatest={4}, "
"minSV={5}\n",
fmt::print("CFLR (single) {0}: {1} ({2}), atLatest={3}, rep.atLatest={4}, notAtLatest={5}, minSV={6}\n",
debugID.toString().substr(0, 8),
feedReply.mutations.back().version,
feedReply.mutations.size(),
atLatest ? "T" : "F",
@ -7771,10 +7779,12 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
Version end) {
state Database cx(db);
state ChangeFeedStreamRequest req;
state UID debugID = deterministicRandom()->randomUniqueID();
req.rangeID = rangeID;
req.begin = *begin;
req.end = end;
req.range = range;
req.debugID = debugID;
results->streams.clear();
@ -7793,7 +7803,7 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
results->notAtLatest.set(1);
refresh.send(Void());
wait(results->streams[0].onError() || doSingleCFStream(range, results, rangeID, begin, end));
wait(results->streams[0].onError() || doSingleCFStream(range, results, rangeID, begin, end, debugID));
return Void();
}

View File

@ -715,12 +715,14 @@ struct ChangeFeedStreamRequest {
Version begin = 0;
Version end = 0;
KeyRange range;
// TODO REMOVE once BG is correctness clean!! Useful for debugging
UID debugID;
ReplyPromiseStream<ChangeFeedStreamReply> reply;
ChangeFeedStreamRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeID, begin, end, range, reply, spanContext, arena);
serializer(ar, rangeID, begin, end, range, reply, spanContext, debugID, arena);
}
};

View File

@ -1996,7 +1996,9 @@ ACTOR Future<Void> stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq
ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req) {
state Span span("SS:getChangeFeedStream"_loc, { req.spanContext });
state bool atLatest = false;
state UID streamUID = deterministicRandom()->randomUniqueID();
// TODO CHANGE BACK after BG is correctness clean
// state UID streamUID = deterministicRandom()->randomUniqueID();
state UID streamUID = req.debugID;
state bool removeUID = false;
state Optional<Version> blockedVersion;
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);