Merge cursor debugging and fix in BW::waitForVersion

This commit is contained in:
Josh Slocum 2021-12-09 10:51:31 -06:00
parent 845f1ade42
commit 86f6f73518
2 changed files with 48 additions and 8 deletions

View File

@ -7190,7 +7190,8 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
ReplyPromiseStream<ChangeFeedStreamReply> replyStream,
Version end,
Reference<ChangeFeedData> feedData,
Reference<ChangeFeedStorageData> storageData) {
Reference<ChangeFeedStorageData> storageData,
int idx /* TODO REMOVE this param after correctness clean */) {
state bool atLatestVersion = false;
state Version nextVersion = 0;
try {
@ -7201,9 +7202,30 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
}
choose {
when(state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
if (DEBUG_CF_VERSION(rep.mutations.back().version)) {
fmt::print(" single {0} {1}: response {2} - {3} ({4}), atLatest={5}, rep.atLatest={6}, "
"notAtLatest={7}, "
"minSV={8}\n",
idx,
interf.id().toString().substr(0, 4),
rep.mutations.front().version,
rep.mutations.back().version,
rep.mutations.size(),
atLatestVersion ? "T" : "F",
rep.atLatestVersion ? "T" : "F",
feedData->notAtLatest.get(),
rep.minStreamVersion);
}
state int resultLoc = 0;
while (resultLoc < rep.mutations.size()) {
wait(results.onEmpty());
if (DEBUG_CF_VERSION(rep.mutations[resultLoc].version)) {
fmt::print(" single {0} {1}: onEmpty, sending {2} ({3})\n",
idx,
interf.id().toString().substr(0, 4),
rep.mutations[resultLoc].version,
rep.mutations[resultLoc].mutations.size());
}
if (rep.mutations[resultLoc].version >= nextVersion) {
results.send(rep.mutations[resultLoc]);
} else {
@ -7239,13 +7261,24 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
when(wait(atLatestVersion && replyStream.isEmpty() && results.isEmpty()
? storageData->version.whenAtLeast(nextVersion)
: Future<Void>(Never()))) {
if (DEBUG_CF_VERSION(nextVersion)) {
fmt::print(" single {0} {1}: WAL {2}, sending empty {3})\n",
idx,
interf.id().toString().substr(0, 4),
nextVersion,
storageData->version.get());
}
MutationsAndVersionRef empty;
empty.version = storageData->version.get();
results.send(empty);
nextVersion = storageData->version.get() + 1;
}
when(wait(atLatestVersion && replyStream.isEmpty() && !results.isEmpty() ? results.onEmpty()
: Future<Void>(Never()))) {}
: Future<Void>(Never()))) {
if (DEBUG_CF_VERSION(nextVersion)) {
fmt::print(" single {0} {1}: got onEmpty\n", idx, interf.id().toString().substr(0, 4));
}
}
}
}
} catch (Error& e) {
@ -7295,7 +7328,7 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
for (int i = 0; i < interfs.size(); i++) {
fetchers[i] = singleChangeFeedStream(
interfs[i].first, streams[i].results, results->streams[i], end, results, results->storageData[i]);
interfs[i].first, streams[i].results, results->streams[i], end, results, results->storageData[i], i);
}
state int interfNum = 0;
while (interfNum < interfs.size()) {
@ -7319,6 +7352,9 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
if (nextStream.next.version != checkVersion) {
if (nextOut.size()) {
*begin = checkVersion + 1;
if (DEBUG_CF_VERSION(nextOut.back().version)) {
fmt::print("CFNA (merged): {0} (1)\n", nextOut.back().version);
}
ASSERT(nextOut.back().version >= results->lastReturnedVersion.get());
results->mutations.send(nextOut);
wait(results->mutations.onEmpty());
@ -7344,6 +7380,9 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
}
try {
Standalone<MutationsAndVersionRef> res = waitNext(nextStream.results.getFuture());
if (DEBUG_CF_VERSION(nextOut.back().version)) {
fmt::print(" CFNA (merge1): {0} (1)\n", res.version, res.mutations.size());
}
nextStream.next = res;
mutations.push(nextStream);
} catch (Error& e) {
@ -7562,6 +7601,8 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
results->refresh.sendError(change_feed_cancelled());
throw;
}
// TODO REMOVE
fmt::print("CFNA error {}\n", e.name());
if (results->notAtLatest.get() == 0) {
results->notAtLatest.set(1);
}

View File

@ -1909,7 +1909,7 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
ASSERT(metadata->durableDeltaVersion.get() >= pendingDeltaV);
if (v == DEBUG_BW_WAIT_VERSION) {
fmt::print("{0}) waiting for DDV {1} >= {2}\n", v, metadata->durableDeltaVersion.get(), pendingDeltaV);
fmt::print("{0}) got DDV {1} >= {2}\n", v, metadata->durableDeltaVersion.get(), pendingDeltaV);
}
}
@ -1924,8 +1924,7 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
ASSERT(metadata->durableSnapshotVersion.get() >= pendingSnapshotV);
if (v == DEBUG_BW_WAIT_VERSION) {
fmt::print(
"{0}) waiting for DSV {1} >= {2}\n", v, metadata->durableSnapshotVersion.get(), pendingSnapshotV);
fmt::print("{0}) got DSV {1} >= {2}\n", v, metadata->durableSnapshotVersion.get(), pendingSnapshotV);
}
}
@ -1933,7 +1932,7 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
// kick off another delta file and roll the mutations. In that case, we must return the new delta
// file instead of in memory mutations, so we wait for that delta file to complete
if (metadata->pendingDeltaVersion > v) {
if (metadata->pendingDeltaVersion >= v) {
if (v == DEBUG_BW_WAIT_VERSION) {
fmt::print("{0}) waiting for DDV again {1} < {2}\n", v, metadata->durableDeltaVersion.get(), v);
}
@ -1942,7 +1941,7 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
ASSERT(metadata->durableDeltaVersion.get() >= v);
if (v == DEBUG_BW_WAIT_VERSION) {
fmt::print("{0}) waiting for DDV again {1} >= {2}\n", v, metadata->durableDeltaVersion.get(), v);
fmt::print("{0}) got DDV again {1} >= {2}\n", v, metadata->durableDeltaVersion.get(), v);
}
}