Added change feed popped read guard for blob workers

This commit is contained in:
Josh Slocum 2022-01-28 10:44:58 -06:00
parent 10c3cc870f
commit ac1fd056dd
6 changed files with 85 additions and 29 deletions

View File

@ -295,7 +295,8 @@ public:
Key rangeID,
Version begin = 0,
Version end = std::numeric_limits<Version>::max(),
KeyRange range = allKeys);
KeyRange range = allKeys,
bool canReadPopped = true);
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);

View File

@ -7603,7 +7603,8 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
Reference<ChangeFeedData> results,
Key rangeID,
Version* begin,
Version end) {
Version end,
bool canReadPopped) {
state std::vector<Future<Void>> fetchers(interfs.size());
state std::vector<Future<Void>> onErrors(interfs.size());
state std::vector<MutationAndVersionStream> streams(interfs.size());
@ -7616,6 +7617,7 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
req.begin = *begin;
req.end = end;
req.range = it.second;
req.canReadPopped = canReadPopped;
UID debugID = deterministicRandom()->randomUniqueID();
debugIDs.push_back(debugID);
req.debugID = debugID;
@ -7651,6 +7653,13 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
i,
interfs[i].second,
debugIDs[i]);
if (DEBUG_CF_START_VERSION != invalidVersion) {
fmt::print(" [{0} - {1}): {2} {3}\n",
interfs[i].second.begin.printable(),
interfs[i].second.end.printable(),
i,
debugIDs[i].toString().substr(0, 8));
}
}
wait(onCFErrors(onErrors) || doCFMerge(results, interfs, streams, begin, end));
@ -7697,6 +7706,7 @@ ACTOR Future<Void> doSingleCFStream(KeyRange range,
Version* begin,
Version end,
UID debugID /*TODO REMOVE this parameter once BG is correctness clean*/) {
state Promise<Void> refresh = results->refresh;
ASSERT(results->streams.size() == 1);
ASSERT(results->storageData.size() == 1);
@ -7776,7 +7786,8 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
Reference<ChangeFeedData> results,
Key rangeID,
Version* begin,
Version end) {
Version end,
bool canReadPopped) {
state Database cx(db);
state ChangeFeedStreamRequest req;
state UID debugID = deterministicRandom()->randomUniqueID();
@ -7784,6 +7795,7 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
req.begin = *begin;
req.end = end;
req.range = range;
req.canReadPopped = canReadPopped;
req.debugID = debugID;
results->streams.clear();
@ -7803,6 +7815,16 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
results->notAtLatest.set(1);
refresh.send(Void());
if (DEBUG_CF_START_VERSION != invalidVersion) {
fmt::print("Starting single cursor {0} for [{1} - {2}) @ {3} - {4} from {5}\n",
debugID.toString().substr(0, 8),
range.begin.printable(),
range.end.printable(),
*begin,
end,
interf.id().toString().c_str());
}
wait(results->streams[0].onError() || doSingleCFStream(range, results, rangeID, begin, end, debugID));
return Void();
@ -7813,7 +7835,8 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
KeyRange range,
bool canReadPopped) {
state Database cx(db);
state Span span("NAPI:GetChangeFeedStream"_loc);
@ -7892,10 +7915,11 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
interfs.push_back(std::make_pair(locations[i].second->getInterface(chosenLocations[i]),
locations[i].first & range));
}
wait(mergeChangeFeedStream(db, interfs, results, rangeID, &begin, end) || cx->connectionFileChanged());
wait(mergeChangeFeedStream(db, interfs, results, rangeID, &begin, end, canReadPopped) ||
cx->connectionFileChanged());
} else {
StorageServerInterface interf = locations[0].second->getInterface(chosenLocations[0]);
wait(singleChangeFeedStream(db, interf, range, results, rangeID, &begin, end) ||
wait(singleChangeFeedStream(db, interf, range, results, rangeID, &begin, end, canReadPopped) ||
cx->connectionFileChanged());
}
} catch (Error& e) {
@ -7943,8 +7967,10 @@ Future<Void> DatabaseContext::getChangeFeedStream(Reference<ChangeFeedData> resu
Key rangeID,
Version begin,
Version end,
KeyRange range) {
return getChangeFeedStreamActor(Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range);
KeyRange range,
bool canReadPopped) {
return getChangeFeedStreamActor(
Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range, canReadPopped);
}
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> singleLocationOverlappingChangeFeeds(

View File

@ -715,6 +715,7 @@ struct ChangeFeedStreamRequest {
Version begin = 0;
Version end = 0;
KeyRange range;
bool canReadPopped = true;
// TODO REMOVE once BG is correctness clean!! Useful for debugging
UID debugID;
ReplyPromiseStream<ChangeFeedStreamReply> reply;
@ -722,7 +723,7 @@ struct ChangeFeedStreamRequest {
ChangeFeedStreamRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeID, begin, end, range, reply, spanContext, debugID, arena);
serializer(ar, rangeID, begin, end, range, reply, spanContext, canReadPopped, debugID, arena);
}
};

View File

@ -1273,15 +1273,24 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
if (startState.parentGranule.present() && startVersion < startState.changeFeedStartVersion) {
// read from parent change feed up until our new change feed is started
// Required to have canReadPopped = false, otherwise another granule can take over the change feed, and pop
// it. That could cause this worker to think it has the full correct set of data if it then reads the data,
// until it checks the granule lock again.
// passing false for canReadPopped means we will get an exception if we try to read any popped data, killing
// this actor
readOldChangeFeed = true;
oldChangeFeedFuture = bwData->db->getChangeFeedStream(
newCFData, oldCFKey.get(), startVersion + 1, startState.changeFeedStartVersion, metadata->keyRange);
oldChangeFeedFuture = bwData->db->getChangeFeedStream(newCFData,
oldCFKey.get(),
startVersion + 1,
startState.changeFeedStartVersion,
metadata->keyRange,
false);
} else {
readOldChangeFeed = false;
changeFeedFuture =
bwData->db->getChangeFeedStream(newCFData, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange);
changeFeedFuture = bwData->db->getChangeFeedStream(
newCFData, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange, false);
}
// Start actors BEFORE setting new change feed data to ensure the change feed data is properly initialized by
@ -1406,7 +1415,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
Reference<ChangeFeedData> newCFData = makeReference<ChangeFeedData>();
changeFeedFuture = bwData->db->getChangeFeedStream(
newCFData, cfKey, startState.changeFeedStartVersion, MAX_VERSION, metadata->keyRange);
newCFData, cfKey, startState.changeFeedStartVersion, MAX_VERSION, metadata->keyRange, false);
// Start actors BEFORE setting new change feed data to ensure the change feed data is properly
// initialized by the client
@ -1525,7 +1534,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
oldCFKey.get(),
cfRollbackVersion + 1,
startState.changeFeedStartVersion,
metadata->keyRange);
metadata->keyRange,
false);
} else {
if (cfRollbackVersion < startState.changeFeedStartVersion) {
@ -1535,8 +1545,12 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
ASSERT(cfRollbackVersion >= startState.changeFeedStartVersion);
changeFeedFuture = bwData->db->getChangeFeedStream(
newCFData, cfKey, cfRollbackVersion + 1, MAX_VERSION, metadata->keyRange);
changeFeedFuture = bwData->db->getChangeFeedStream(newCFData,
cfKey,
cfRollbackVersion + 1,
MAX_VERSION,
metadata->keyRange,
false);
}
// Start actors BEFORE setting new change feed data to ensure the change feed data
@ -2049,7 +2063,7 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, BlobGranuleFileRequest req) {
if (BW_REQUEST_DEBUG || DEBUG_BW_WAIT_VERSION == req.readVersion) {
fmt::print("BW {0} processing blobGranuleFileRequest for range [{1} -{2}) @ {3}\n",
fmt::print("BW {0} processing blobGranuleFileRequest for range [{1} - {2}) @ {3}\n",
bwData->id.toString(),
req.keyRange.begin.printable(),
req.keyRange.end.printable(),

View File

@ -85,6 +85,7 @@ bool canReplyWith(Error e) {
case error_code_watch_cancelled:
case error_code_unknown_change_feed:
case error_code_server_overloaded:
case error_code_change_feed_popped:
// getRangeAndMap related exceptions that are not retriable:
case error_code_mapper_bad_index:
case error_code_mapper_no_such_key:
@ -1792,6 +1793,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
throw unknown_change_feed();
}
state Reference<ChangeFeedInfo> feedInfo = feed->second;
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: got version %lld >= %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
@ -1800,6 +1803,10 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
req.begin);
}
if (!req.canReadPopped && req.begin <= feedInfo->emptyVersion) {
throw change_feed_popped();
}
// We must copy the mutationDeque when fetching the durable bytes in case mutations are popped from memory while
// waiting for the results
state Version dequeVersion = data->version.get();
@ -1811,14 +1818,14 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
dequeVersion,
feed->second->emptyVersion,
feed->second->storageVersion,
feed->second->durableVersion,
feed->second->fetchVersion);
feedInfo->emptyVersion,
feedInfo->storageVersion,
feedInfo->durableVersion,
feedInfo->fetchVersion);
}
if (req.end > feed->second->emptyVersion + 1) {
for (auto& it : feed->second->mutations) {
if (req.end > feedInfo->emptyVersion + 1) {
for (auto& it : feedInfo->mutations) {
if (it.version >= req.end || it.version > dequeVersion || remainingLimitBytes <= 0) {
break;
}
@ -1839,8 +1846,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
}
if (req.end > feed->second->emptyVersion + 1 && feed->second->durableVersion != invalidVersion &&
req.begin <= feed->second->durableVersion) {
if (req.end > feedInfo->emptyVersion + 1 && feedInfo->durableVersion != invalidVersion &&
req.begin <= feedInfo->durableVersion) {
RangeResult res = wait(data->storage.readRange(
KeyRangeRef(changeFeedDurableKey(req.rangeID, std::max(req.begin, feed->second->emptyVersion)),
changeFeedDurableKey(req.rangeID, req.end)),
@ -1890,6 +1897,11 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
reply = memoryReply;
}
// check if pop happened concurrently with read
if (!req.canReadPopped && req.begin <= feedInfo->emptyVersion) {
throw change_feed_popped();
}
bool gotAll = remainingLimitBytes > 0 && remainingDurableBytes > 0 && data->version.get() == startVersion;
Version finalVersion = std::min(req.end - 1, dequeVersion);
if ((reply.mutations.empty() || reply.mutations.back().version < finalVersion) && remainingLimitBytes > 0 &&
@ -2007,13 +2019,14 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
wait(delay(0, TaskPriority::DefaultEndpoint));
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: got CFSQ [%s - %s) %lld - %lld\n",
printf("CFM: SS %s CF %s: got CFSQ [%s - %s) %lld - %lld, crp=%s\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
req.range.begin.printable().c_str(),
req.range.end.printable().c_str(),
req.begin,
req.end);
req.end,
req.canReadPopped ? "T" : "F");
}
try {
@ -4000,7 +4013,7 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
bool existing) {
state Reference<ChangeFeedData> feedResults = makeReference<ChangeFeedData>();
state Future<Void> feed = data->cx->getChangeFeedStream(
feedResults, rangeId, 0, existing ? fetchVersion + 1 : data->version.get() + 1, range);
feedResults, rangeId, 0, existing ? fetchVersion + 1 : data->version.get() + 1, range, true);
// TODO remove debugging eventually?
state Version firstVersion = invalidVersion;

View File

@ -86,6 +86,7 @@ ERROR( change_feed_cancelled, 1062, "Change feed was cancelled" )
ERROR( blob_granule_file_load_error, 1063, "Error loading a blob file during granule materialization" )
ERROR( blob_granule_transaction_too_old, 1064, "Read version is older than blob granule history supports" )
ERROR( blob_manager_replaced, 1065, "This blob manager has been replaced." )
ERROR( change_feed_popped, 1066, "Tried to read a version older than what has been popped from the change feed" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )