Debugged and fixed stuck change feed issue

This commit is contained in:
Josh Slocum 2022-02-02 15:15:54 -06:00
parent f187d01b71
commit 9c8afdf35c
2 changed files with 76 additions and 16 deletions

View File

@ -7494,6 +7494,11 @@ ACTOR Future<Void> doCFMerge(Reference<ChangeFeedData> results,
wait(delay(0));
ASSERT(results->mutations.isEmpty());
// update lastReturned once the previous mutation has been consumed
if (*begin - 1 > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(*begin - 1);
}
state int interfNum = 0;
// TODO minor optimization - could make this just a vector of indexes if each MutationAndVersionStream remembered
@ -7711,11 +7716,18 @@ ACTOR Future<Void> doSingleCFStream(KeyRange range,
ASSERT(results->streams.size() == 1);
ASSERT(results->storageData.size() == 1);
state bool atLatest = false;
// wait for any previous mutations in stream to be consumed
wait(results->mutations.onEmpty());
wait(delay(0));
ASSERT(results->mutations.isEmpty());
// update lastReturned once the previous mutation has been consumed
if (*begin - 1 > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(*begin - 1);
}
loop {
// wait for any previous mutations in stream to be consumed
wait(results->mutations.onEmpty());
wait(delay(0));
ASSERT(results->mutations.isEmpty());
state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture());
*begin = feedReply.mutations.back().version + 1;

View File

@ -1096,17 +1096,44 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
return cfRollbackVersion;
}
ACTOR Future<Void> waitOnCFVersion(Reference<GranuleMetadata> metadata, Version cfVersion) {
#define DEBUG_WAIT_VERSION_COMMITTED false
ACTOR Future<Void> waitOnCFVersion(Reference<GranuleMetadata> metadata,
Version original /*TODO REMOVE, just for debugging*/,
Version waitVersion) {
if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: waiting for {1} \n", original, waitVersion);
}
loop {
try {
if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) {
if (metadata->activeCFData.get().isValid()) {
fmt::print(
"WVC {0}: WAL (currently {1})\n", original, metadata->activeCFData.get()->getVersion());
} else {
fmt::print("WVC {0}: invalid\n", original, metadata->activeCFData.get()->getVersion());
}
}
// if not valid, we're about to be cancelled anyway
state Future<Void> atLeast =
metadata->activeCFData.get().isValid() ? metadata->activeCFData.get()->whenAtLeast(cfVersion) : Never();
state Future<Void> atLeast = metadata->activeCFData.get().isValid()
? metadata->activeCFData.get()->whenAtLeast(waitVersion)
: Never();
choose {
when(wait(atLeast)) { break; }
when(wait(metadata->activeCFData.onChange())) {}
when(wait(atLeast)) {
if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: got at least {1} \n", original, waitVersion);
}
break;
}
when(wait(metadata->activeCFData.onChange())) {
if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: cfOnChange \n", original);
}
}
}
} catch (Error& e) {
if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: got error {1} \n", original, e.name());
}
if (e.code() == error_code_operation_cancelled) {
throw e;
}
@ -1114,9 +1141,9 @@ ACTOR Future<Void> waitOnCFVersion(Reference<GranuleMetadata> metadata, Version
// if waiting on a parent granule change feed and we change to the child, the parent will get end_of_stream,
// which could cause this waiting whenAtLeast to get change_feed_cancelled. We should simply retry and wait
// a bit, as blobGranuleUpdateFiles will switch to the new change feed
if (e.code() != error_code_change_feed_cancelled) {
if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) {
if (BW_DEBUG) {
fmt::print("waitVersionCommitted WAL got unexpected error {}\n", e.name());
fmt::print("WVC {0}: unexpected error {1}\n", original, e.name());
}
throw e;
}
@ -1124,9 +1151,13 @@ ACTOR Future<Void> waitOnCFVersion(Reference<GranuleMetadata> metadata, Version
}
}
if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: got \n", original);
}
// sanity check to make sure whenAtLeast didn't return early
if (cfVersion > metadata->waitForVersionReturned) {
metadata->waitForVersionReturned = cfVersion;
if (waitVersion > metadata->waitForVersionReturned) {
metadata->waitForVersionReturned = waitVersion;
}
// stop after change feed callback
@ -1138,8 +1169,14 @@ ACTOR Future<Void> waitOnCFVersion(Reference<GranuleMetadata> metadata, Version
ACTOR Future<Void> waitCommittedGrv(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata,
Version version) {
if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: grv start\n", version);
}
// TODO REMOVE debugs
if (version > bwData->grvVersion.get()) {
if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: getting grv\n", version);
}
/*if (BW_DEBUG) {
fmt::print("waitVersionCommitted waiting {0}\n", version);
}*/
@ -1154,7 +1191,10 @@ ACTOR Future<Void> waitCommittedGrv(Reference<BlobWorkerData> bwData,
}
Version grvVersion = bwData->grvVersion.get();
wait(waitOnCFVersion(metadata, grvVersion));
if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: got grv\n", version);
}
wait(waitOnCFVersion(metadata, version, grvVersion));
return Void();
}
@ -1163,8 +1203,14 @@ ACTOR Future<Void> waitVersionCommitted(Reference<BlobWorkerData> bwData,
Version version) {
// If GRV is way in the future, we know we can't roll back more than 5 seconds (or whatever this knob is set to)
// worth of versions
if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: starting\n", version);
}
wait(waitCommittedGrv(bwData, metadata, version) ||
waitOnCFVersion(metadata, version + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS));
waitOnCFVersion(metadata, version, version + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS));
if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) {
fmt::print("WVC {0}: done\n", version);
}
if (version > metadata->knownCommittedVersion) {
metadata->knownCommittedVersion = version;
}
@ -1358,8 +1404,10 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
when(Standalone<VectorRef<MutationsAndVersionRef>> _mutations =
waitNext(metadata->activeCFData.get()->mutations.getFuture())) {
/*if (DEBUG_BW_VERSION(metadata->bufferedDeltaVersion)) {
fmt::print("BW got mutations after ({0}): ({1})\n",
fmt::print("BW got mutations after ({0}): {1} - {2} ({3})\n",
metadata->bufferedDeltaVersion,
_mutations.front().version,
_mutations.back().version,
_mutations.size());
}*/
mutations = _mutations;