diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 71ceae180a..89bc339b6e 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7323,7 +7323,8 @@ ACTOR Future partialChangeFeedStream(StorageServerInterface interf, Reference feedData, Reference storageData, int idx /* TODO REMOVE this param after correctness clean */, - KeyRange range /* TODO REMOVE this param after correctness clean */) { + KeyRange range /* TODO REMOVE this param after correctness clean */, + UID debugID /*TODO REMOVE this param after correctness clean*/) { // calling lastReturnedVersion's callbacks could cause us to be cancelled state Promise refresh = feedData->refresh; @@ -7346,11 +7347,11 @@ ACTOR Future partialChangeFeedStream(StorageServerInterface interf, when(state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) { if (DEBUG_CF_VERSION_RANGE( feedData->id, rep.mutations.front().version, rep.mutations.back().version)) { - fmt::print(" single {0} {1}: response {2} - {3} ({4}), atLatest={5}, rep.atLatest={6}, " - "notAtLatest={7}, " - "minSV={8}\n", + fmt::print(" single {0} {1} {2}: response {3} - {4} ({5}), atLatest={6}, rep.atLatest={7}, " + "notAtLatest={8}, minSV={9}\n", idx, interf.id().toString().substr(0, 4), + debugID.toString().substr(0, 8).c_str(), rep.mutations.front().version, rep.mutations.back().version, rep.mutations.size(), @@ -7607,6 +7608,7 @@ ACTOR Future mergeChangeFeedStream(Reference db, state std::vector> onErrors(interfs.size()); state std::vector streams(interfs.size()); + std::vector debugIDs; results->streams.clear(); for (auto& it : interfs) { ChangeFeedStreamRequest req; @@ -7614,6 +7616,9 @@ ACTOR Future mergeChangeFeedStream(Reference db, req.begin = *begin; req.end = end; req.range = it.second; + UID debugID = deterministicRandom()->randomUniqueID(); + debugIDs.push_back(debugID); + req.debugID = debugID; results->streams.push_back(it.first.changeFeedStream.getReplyStream(req)); } @@ -7644,7 +7649,8 @@ ACTOR Future mergeChangeFeedStream(Reference db, results, results->storageData[i], i, - interfs[i].second); + interfs[i].second, + debugIDs[i]); } wait(onCFErrors(onErrors) || doCFMerge(results, interfs, streams, begin, end)); @@ -7689,7 +7695,8 @@ ACTOR Future doSingleCFStream(KeyRange range, Reference results, Key rangeID, Version* begin, - Version end) { + Version end, + UID debugID /*TODO REMOVE this parameter once BG is correctness clean*/) { state Promise refresh = results->refresh; ASSERT(results->streams.size() == 1); ASSERT(results->storageData.size() == 1); @@ -7714,8 +7721,9 @@ ACTOR Future doSingleCFStream(KeyRange range, // empty versions can come out of order, as we sometimes send explicit empty versions when restarting a // stream. Anything with mutations should be strictly greater than lastReturnedVersion if (feedReply.mutations.front().version <= results->lastReturnedVersion.get()) { - fmt::print("out of order mutation for CF {0} from ({1}) {2}! {3} < {4}\n", + fmt::print("out of order mutation for CF {0} Req {1} from ({2}) {3}! {4} < {5}\n", rangeID.toString().substr(0, 6), + debugID.toString().substr(0, 8), results->storageData.size(), results->storageData[0]->id.toString().substr(0, 4).c_str(), feedReply.mutations.front().version, @@ -7732,8 +7740,8 @@ ACTOR Future doSingleCFStream(KeyRange range, } if (DEBUG_CF_VERSION(rangeID, feedReply.mutations.back().version)) { - fmt::print("CFLR (single): {0} ({1}), atLatest={2}, rep.atLatest={3}, notAtLatest={4}, " - "minSV={5}\n", + fmt::print("CFLR (single) {0}: {1} ({2}), atLatest={3}, rep.atLatest={4}, notAtLatest={5}, minSV={6}\n", + debugID.toString().substr(0, 8), feedReply.mutations.back().version, feedReply.mutations.size(), atLatest ? "T" : "F", @@ -7771,10 +7779,12 @@ ACTOR Future singleChangeFeedStream(Reference db, Version end) { state Database cx(db); state ChangeFeedStreamRequest req; + state UID debugID = deterministicRandom()->randomUniqueID(); req.rangeID = rangeID; req.begin = *begin; req.end = end; req.range = range; + req.debugID = debugID; results->streams.clear(); @@ -7793,7 +7803,7 @@ ACTOR Future singleChangeFeedStream(Reference db, results->notAtLatest.set(1); refresh.send(Void()); - wait(results->streams[0].onError() || doSingleCFStream(range, results, rangeID, begin, end)); + wait(results->streams[0].onError() || doSingleCFStream(range, results, rangeID, begin, end, debugID)); return Void(); } diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 9e40f95e1f..ea58e3b3ed 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -715,12 +715,14 @@ struct ChangeFeedStreamRequest { Version begin = 0; Version end = 0; KeyRange range; + // TODO REMOVE once BG is correctness clean!! Useful for debugging + UID debugID; ReplyPromiseStream reply; ChangeFeedStreamRequest() {} template void serialize(Ar& ar) { - serializer(ar, rangeID, begin, end, range, reply, spanContext, arena); + serializer(ar, rangeID, begin, end, range, reply, spanContext, debugID, arena); } }; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 1691eb363d..b3c099f187 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1996,7 +1996,9 @@ ACTOR Future stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req) { state Span span("SS:getChangeFeedStream"_loc, { req.spanContext }); state bool atLatest = false; - state UID streamUID = deterministicRandom()->randomUniqueID(); + // TODO CHANGE BACK after BG is correctness clean + // state UID streamUID = deterministicRandom()->randomUniqueID(); + state UID streamUID = req.debugID; state bool removeUID = false; state Optional blockedVersion; req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);