Merge pull request #11112 from sfc-gh-jslocum/stuck_watch_fix_main

Stuck watch bug fix
This commit is contained in:
Jingyu Zhou 2024-01-05 10:41:10 -08:00 committed by GitHub
commit 75f7814ad1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 11 deletions

View File

@ -4005,12 +4005,22 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
// than the current update loop)
Version v = wait(waitForCommittedVersion(cx, resp.version, span.context));
// False if there is a master failure between getting the response and getting the committed version,
// Dependent on SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT
if (v - resp.version < 50000000) {
// False if there is a master failure between getting the response
// and getting the committed version, Dependent on
// SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT. Set to around half of the
// max versions in flight in an attempt to reliably recognize when
// a recovery has occurred, but avoid triggering if it just takes a
// little while to get the committed version.
bool buggifyRetry = g_network->isSimulated() && !g_simulator->speedUpSimulation && BUGGIFY_WITH_PROB(0.1);
CODE_PROBE(buggifyRetry, "Watch buggifying version gap retry");
if (v - resp.version < 50'000'000 && !buggifyRetry) {
return resp.version;
}
ver = v;
if (watchValueID.present()) {
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Retry");
}
} catch (Error& e) {
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
cx->invalidateCache(parameters->tenant.prefix, parameters->key);

View File

@ -2551,6 +2551,7 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext p
metadata->debugID.get().first(),
"watchValueSendReply.Before"); //.detail("TaskID", g_network->getCurrentTask());
state Version originalMetadataVersion = metadata->version;
wait(success(waitForVersionNoTooOld(data, metadata->version)));
if (metadata->debugID.present())
g_traceBatch.addEvent("WatchValueDebug",
@ -2607,8 +2608,21 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext p
metadata->debugID.get().first(),
"watchValueSendReply.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
if (reply.value != metadata->value && latest >= metadata->version) {
return latest; // fire watch
// If the version we read is less than minVersion, then we may fail to be notified of any changes that occur
// up to or including minVersion. To prevent that, we'll check the key again once the version reaches our
// minVersion.
Version waitVersion = minVersion;
if (reply.value != metadata->value) {
if (latest >= metadata->version) {
return latest; // fire watch
} else if (metadata->version > originalMetadataVersion) {
// another watch came in and raced in case 2 and updated the version. simply just wait and read
// again at the higher version to confirm
CODE_PROBE(true, "racing watches for same value at different versions", probe::decoration::rare);
if (metadata->version > waitVersion) {
waitVersion = metadata->version;
}
}
}
if (data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES) {
@ -2622,17 +2636,18 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext p
data->watchBytes += watchBytes;
try {
if (latest < minVersion) {
// If the version we read is less than minVersion, then we may fail to be notified of any changes
// that occur up to or including minVersion To prevent that, we'll check the key again once the
// version reaches our minVersion
watchFuture = watchFuture || data->version.whenAtLeast(minVersion);
if (latest < waitVersion) {
// if we need to wait for a higher version because of a race, wait for that version
watchFuture = watchFuture || data->version.whenAtLeast(waitVersion);
}
if (BUGGIFY) {
// Simulate a trigger on the watch that results in the loop going around without the value changing
watchFuture = watchFuture || delay(deterministicRandom()->random01());
}
if (metadata->debugID.present())
g_traceBatch.addEvent(
"WatchValueDebug", metadata->debugID.get().first(), "watchValueSendReply.WaitChange");
wait(watchFuture);
data->watchBytes -= watchBytes;
} catch (Error& e) {
@ -13323,11 +13338,24 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
}
// case 2: there is a watch in the map and it has the same value so just update version
else if (metadata->value == req.value) {
if (req.debugID.present()) {
if (metadata->debugID.present()) {
g_traceBatch.addAttach(
"WatchRequestCase2", req.debugID.get().first(), metadata->debugID.get().first());
} else {
g_traceBatch.addEvent(
"WatchValueDebug", metadata->debugID.get().first(), "watchValueSendReply.Case2");
}
}
if (req.version > metadata->version) {
metadata->version = req.version;
metadata->tags = req.tags;
metadata->debugID = req.debugID;
if (req.debugID.present()) {
metadata->debugID = req.debugID;
}
}
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
}
// case 3: version in map has a lower version so trigger watch and create a new entry in map