Fixing error handling and race for blob worker status stream to manager

This commit is contained in:
Josh Slocum 2022-02-23 15:55:16 -06:00
parent 0714b7a250
commit 2701e44564
1 changed files with 16 additions and 2 deletions

View File

@ -866,7 +866,12 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
loop { loop {
loop { loop {
try { try {
wait(bwData->currentManagerStatusStream.get().onReady()); loop {
choose {
when(wait(bwData->currentManagerStatusStream.get().onReady())) { break; }
when(wait(bwData->currentManagerStatusStream.onChange())) {}
}
}
bwData->currentManagerStatusStream.get().send(GranuleStatusReply(metadata->keyRange, bwData->currentManagerStatusStream.get().send(GranuleStatusReply(metadata->keyRange,
true, true,
writeHot, writeHot,
@ -877,7 +882,16 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
reSnapshotVersion)); reSnapshotVersion));
break; break;
} catch (Error& e) { } catch (Error& e) {
wait(bwData->currentManagerStatusStream.onChange()); if (e.code() == error_code_operation_cancelled) {
throw e;
}
// if we got broken promise while waiting, the old stream was killed, so we don't need to wait on
// change, just retry
if (e.code() == error_code_broken_promise) {
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
} else {
wait(bwData->currentManagerStatusStream.onChange());
}
} }
} }