Bug fix: The change feed request UID is actually not just for debugging and can't be shared across requests, so the debugID in ReadOptions should not be used. Restored the original ChangeFeedRequest member but renamed it from debugUID to just id.

This commit is contained in:
Steve Atherton 2022-10-26 20:45:39 -07:00
parent fb44945a89
commit 56abec32f1
6 changed files with 46 additions and 51 deletions

View File

@ -9305,7 +9305,7 @@ void handleTSSChangeFeedMismatch(const ChangeFeedStreamRequest& request,
mismatchEvent.detail("EndKey", request.range.end);
mismatchEvent.detail("CanReadPopped", request.canReadPopped);
mismatchEvent.detail("PopVersion", popVersion);
mismatchEvent.detail("DebugUID", request.streamUID());
mismatchEvent.detail("DebugUID", request.id);
// mismatch info
mismatchEvent.detail("MatchesFound", matchesFound);
@ -9331,7 +9331,7 @@ void handleTSSChangeFeedMismatch(const ChangeFeedStreamRequest& request,
"TSSMismatchChangeFeedStream");
summaryEvent.detail("TSSID", tssData.tssId)
.detail("MismatchId", mismatchUID)
.detail("FeedDebugUID", request.streamUID());
.detail("FeedDebugUID", request.id);
}
}
}
@ -9856,7 +9856,8 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
Version* begin,
Version end,
int replyBufferSize,
bool canReadPopped) {
bool canReadPopped,
ReadOptions readOptions) {
state std::vector<Future<Void>> fetchers(interfs.size());
state std::vector<Future<Void>> onErrors(interfs.size());
state std::vector<MutationAndVersionStream> streams(interfs.size());
@ -9884,10 +9885,11 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
if (replyBufferSize != -1 && req.replyBufferSize < CLIENT_KNOBS->CHANGE_FEED_STREAM_MIN_BYTES) {
req.replyBufferSize = CLIENT_KNOBS->CHANGE_FEED_STREAM_MIN_BYTES;
}
UID id = deterministicRandom()->randomUniqueID();
req.options = ReadOptions(id);
debugUIDs.push_back(id);
mergeCursorUID = UID(mergeCursorUID.first() ^ id.first(), mergeCursorUID.second() ^ id.second());
req.options = readOptions;
req.id = deterministicRandom()->randomUniqueID();
debugUIDs.push_back(req.id);
mergeCursorUID = UID(mergeCursorUID.first() ^ req.id.first(), mergeCursorUID.second() ^ req.id.second());
results->streams.push_back(interfs[i].first.changeFeedStream.getReplyStream(req));
maybeDuplicateTSSChangeFeedStream(req,
@ -10102,12 +10104,10 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
req.canReadPopped = canReadPopped;
req.replyBufferSize = replyBufferSize;
req.options = readOptions;
if (!req.streamUID().isValid()) {
req.options.get().debugID = readOptions.debugID = deterministicRandom()->randomUniqueID();
}
req.id = deterministicRandom()->randomUniqueID();
if (DEBUG_CF_CLIENT_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedClientSingleCursor", req.streamUID())
TraceEvent(SevDebug, "TraceChangeFeedClientSingleCursor", req.id)
.detail("FeedID", rangeID)
.detail("Range", range)
.detail("Begin", *begin)
@ -10238,9 +10238,9 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
}
CODE_PROBE(true, "Change feed merge cursor");
// TODO (jslocum): validate connectionFileChanged behavior
wait(
mergeChangeFeedStream(db, interfs, results, rangeID, &begin, end, replyBufferSize, canReadPopped) ||
cx->connectionFileChanged());
wait(mergeChangeFeedStream(
db, interfs, results, rangeID, &begin, end, replyBufferSize, canReadPopped, readOptions) ||
cx->connectionFileChanged());
} else {
CODE_PROBE(true, "Change feed single cursor");
StorageServerInterface interf = locations[0].locations->getInterface(chosenLocations[0]);

View File

@ -383,7 +383,7 @@ public:
KeyRange range = allKeys,
int replyBufferSize = -1,
bool canReadPopped = true,
ReadOptions readOptions = ReadOptions());
ReadOptions readOptions = { ReadType::NORMAL, CacheResult::False });
Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);

View File

@ -1649,14 +1649,14 @@ struct ReadOptions {
Optional<UID> debugID;
Optional<Version> consistencyCheckStartVersion;
ReadOptions() : type(ReadType::NORMAL), cacheResult(CacheResult::True){};
ReadOptions(Optional<UID> debugID,
ReadOptions(Optional<UID> debugID = Optional<UID>(),
ReadType type = ReadType::NORMAL,
CacheResult cache = CacheResult::False,
CacheResult cache = CacheResult::True,
Optional<Version> version = Optional<Version>())
: type(type), cacheResult(cache), debugID(debugID), consistencyCheckStartVersion(version){};
ReadOptions(ReadType type, CacheResult cache = CacheResult::True) : ReadOptions({}, type, cache) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, type, cacheResult, debugID, consistencyCheckStartVersion);

View File

@ -890,21 +890,16 @@ struct ChangeFeedStreamRequest {
KeyRange range;
int replyBufferSize = -1;
bool canReadPopped = true;
UID id;
Optional<ReadOptions> options;
UID streamUID() const {
if (options.present()) {
return options.get().debugID.orDefault(UID());
}
return UID();
}
ReplyPromiseStream<ChangeFeedStreamReply> reply;
ChangeFeedStreamRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeID, begin, end, range, reply, spanContext, replyBufferSize, canReadPopped, options, arena);
serializer(
ar, rangeID, begin, end, range, reply, spanContext, replyBufferSize, canReadPopped, id, options, arena);
}
};

View File

@ -1188,7 +1188,7 @@ ACTOR Future<RangeResult> tryFetchRange(Database cx,
state RangeResult output;
state KeySelectorRef begin = firstGreaterOrEqual(keys.begin);
state KeySelectorRef end = firstGreaterOrEqual(keys.end);
state ReadOptions options = ReadOptions(Optional<UID>(), ReadType::FETCH);
state ReadOptions options = ReadOptions(ReadType::FETCH, CacheResult::False);
if (*isTooOld)
throw transaction_too_old();

View File

@ -2600,7 +2600,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedMutationsBegin", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
@ -2638,7 +2638,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedMutationsDetails", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
@ -2741,7 +2741,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
"is {4}) (emptyVersion={5}, emptyBefore={6})!\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
req.streamUID().toString().substr(0, 8),
req.id.toString().substr(0, 8),
memoryReply.mutations[memoryVerifyIdx].version,
version,
feedInfo->emptyVersion,
@ -2783,7 +2783,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
"disk! (durable validation = {4})\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
req.streamUID().toString().substr(0, 8),
req.id.toString().substr(0, 8),
version,
durableValidationVersion);
@ -2864,7 +2864,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
if (!ok) {
TraceEvent("ChangeFeedMutationsPopped", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
@ -2881,7 +2881,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
for (auto& m : mutations.mutations) {
DEBUG_MUTATION("ChangeFeedSSRead", mutations.version, m, data->thisServerID)
.detail("ChangeFeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("ReqBegin", req.begin)
.detail("ReqEnd", req.end)
.detail("ReqRange", req.range);
@ -2908,7 +2908,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
fmt::print("ERROR: SS {0} CF {1} SQ {2} missing {3} @ {4} from request for [{5} - {6}) {7} - {8}\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
req.streamUID().toString().substr(0, 8),
req.id.toString().substr(0, 8),
foundVersion ? "key" : "version",
DEBUG_CF_MISSING_VERSION,
req.range.begin.printable(),
@ -2929,7 +2929,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
fmt::print("DBG: SS {0} CF {1} SQ {2} correct @ {3} from request for [{4} - {5}) {6} - {7}\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
req.streamUID().toString().substr(0, 8),
req.id.toString().substr(0, 8),
DEBUG_CF_MISSING_VERSION,
req.range.begin.printable(),
req.range.end.printable(),
@ -2943,7 +2943,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "ChangeFeedMutationsDone", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
@ -2968,7 +2968,7 @@ ACTOR Future<Void> stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq
return Void();
}
state Promise<Void> moved;
feed->second->triggerOnMove(req.range, req.streamUID(), moved);
feed->second->triggerOnMove(req.range, req.id, moved);
try {
wait(moved.getFuture());
} catch (Error& e) {
@ -2977,7 +2977,7 @@ ACTOR Future<Void> stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed != data->uidChangeFeed.end()) {
feed->second->removeOnMoveTrigger(req.range, req.streamUID());
feed->second->removeOnMoveTrigger(req.range, req.id);
}
return Void();
}
@ -3020,7 +3020,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedStreamStart", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
@ -3042,7 +3042,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedStreamSentInitialEmpty", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
@ -3054,12 +3054,12 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
loop {
Future<Void> onReady = req.reply.onReady();
if (atLatest && !onReady.isReady() && !removeUID) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.streamUID()] =
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] =
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedStreamBlockedOnReady", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
@ -3075,13 +3075,13 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
Future<std::pair<ChangeFeedStreamReply, bool>> feedReplyFuture =
getChangeFeedMutations(data, req, false, atLatest);
if (atLatest && !removeUID && !feedReplyFuture.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.streamUID()] =
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] =
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
removeUID = true;
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedStreamBlockedMutations", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", req.streamUID())
.detail("StreamUID", req.id)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
@ -3104,10 +3104,10 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
Version minVersion = removeUID ? data->version.get() : data->prevVersion;
if (removeUID) {
if (gotAll || req.begin == req.end) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(req.streamUID());
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(req.id);
removeUID = false;
} else {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.streamUID()] =
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] =
feedReply.mutations.back().version;
}
}
@ -3155,7 +3155,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
auto it = data->changeFeedClientVersions.find(req.reply.getEndpoint().getPrimaryAddress());
if (it != data->changeFeedClientVersions.end()) {
if (removeUID) {
it->second.erase(req.streamUID());
it->second.erase(req.id);
}
if (it->second.empty()) {
data->changeFeedClientVersions.erase(it);
@ -6005,7 +6005,6 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
.detail("Error", e.what());
tr->reset();
tr->setVersion(fetchVersion);
tr->trState->taskID = TaskPriority::FetchKeys;
throw;
}
return Void();
@ -6836,6 +6835,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.trState->readOptions = readOptions;
tr.trState->taskID = TaskPriority::FetchKeys;
// fetchVersion = data->version.get();
// A quick fix:
@ -6884,7 +6884,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion)
shard->updates.pop_front();
tr.setVersion(fetchVersion);
tr.trState->taskID = TaskPriority::FetchKeys;
state PromiseStream<RangeResult> results;
state Future<Void> hold;
if (SERVER_KNOBS->FETCH_USING_BLOB) {
@ -9574,7 +9574,7 @@ ACTOR Future<Void> applyByteSampleResult(StorageServer* data,
state int totalFetches = 0;
state int totalKeys = 0;
state int totalBytes = 0;
state ReadOptions readOptions(UID(), ReadType::NORMAL, CacheResult::False);
state ReadOptions readOptions(ReadType::NORMAL, CacheResult::False);
loop {
RangeResult bs = wait(storage->readRange(KeyRangeRef(begin, end),