fix: handle the case where I change feed is added while a fetch is in progress

This commit is contained in:
Evan Tschannen 2021-09-06 13:51:53 -07:00
parent 7b23894039
commit f53ddf6720
2 changed files with 14 additions and 8 deletions

View File

@ -6702,13 +6702,18 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
loop {
loop {
try {
Optional<Value> val = wait(tr.get(rangeIDKey));
if (!val.present()) {
results.sendError(unsupported_operation());
return Void();
Version readVer = wait(tr.getReadVersion());
if (readVer < begin) {
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
} else {
Optional<Value> val = wait(tr.get(rangeIDKey));
if (!val.present()) {
results.sendError(unsupported_operation());
return Void();
}
keys = std::get<0>(decodeChangeFeedValue(val.get())) & range;
break;
}
keys = std::get<0>(decodeChangeFeedValue(val.get())) & range;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
@ -6724,6 +6729,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
if (locations.size() >= 1000) {
ASSERT(false);
results.sendError(unsupported_operation());
return Void();
}

View File

@ -3233,7 +3233,7 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data, Key rangeId, KeyRange ra
wait(yield());
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
if (e.code() != error_code_end_of_stream && e.code() != error_code_unsupported_operation) {
throw;
}
return Void();
@ -3278,7 +3278,7 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data, Key rangeId, KeyRange ra
wait(yield());
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
if (e.code() != error_code_end_of_stream && e.code() != error_code_unsupported_operation) {
throw;
}
}