correctness fixes

This commit is contained in:
Nim Wijetunga 2021-02-05 19:23:37 -05:00
parent b1fd8a439e
commit 83af53b1bb
1 changed files with 62 additions and 35 deletions

View File

@ -1383,6 +1383,7 @@ void DatabaseContext::setWatchMetadata(Key key, Reference<WatchMetadata> metadat
}
void DatabaseContext::deleteWatchMetadata(Key key) {
// TraceEvent("Nim_Case Delete key").detail("Key", key);
watchMap.erase(key);
}
@ -2167,7 +2168,7 @@ ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Valu
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
state WatchValueReply resp;
TraceEvent("Nim_WatcherCommitted Started");
// TraceEvent("Nim_WatcherCommitted Started");
choose {
when(WatchValueReply r = wait(
loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue,
@ -2186,12 +2187,12 @@ ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Valu
//cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop)
Version v = wait(waitForCommittedVersion(cx, resp.version, span.context));
TraceEvent("Nim_WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value);
// TraceEvent("Nim_WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value);
// False if there is a master failure between getting the response and getting the committed version,
// Dependent on SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT
if (v - resp.version < 50000000) {
TraceEvent("Nim_WatcherCommitted Done!");
// TraceEvent("Nim_WatcherCommitted Done!");
return resp.version;
}
ver = v;
@ -2216,64 +2217,81 @@ ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Valu
}
ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
try {
loop {
loop {
try {
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata) {TraceEvent("Nim_Case SS exit normal"); return Void();}
TraceEvent("Nim_Case SS enter").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
if (!metadata.isValid()) {
// TraceEvent("Nim_Case SS exit normal");
return Void();
}
// TraceEvent("Nim_Case SS enter").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
state Future<Version> watchFutureSS = watchValue(Future<Version>(metadata->version), key, metadata->value, cx, metadata->info, metadata->tags);
Version watchVersion = wait(watchFutureSS);
TraceEvent("Nim_Case SS done wait").detail("version", watchVersion);
// TraceEvent("Nim_Case SS done wait").detail("version", watchVersion);
if (watchVersion >= metadata->version) { // case 1: version_1 (SS) >= version_2 (map)
TraceEvent("Nim_Case SS 1").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
// TraceEvent("Nim_Case SS 1").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
cx->deleteWatchMetadata(key);
metadata->watchPromise.send(watchVersion);
} else {
if (metadata->watchPromise.getFutureReferenceCount() == 1) { // case 2: version_1 < version_2 and future_count == 1
TraceEvent("Nim_Case SS 2").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
// TraceEvent("Nim_Case SS 2").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
cx->deleteWatchMetadata(key);
} else {
// TraceEvent("Nim_Case SS 3").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
}
}
}
} catch(Error &e) {
TraceEvent("Nim_Case SS exit exception");
if (e.code() == error_code_actor_cancelled) {
TraceEvent("Nim_Case SS exit actor cancelled");
}
throw e;
}
} catch(Error &e) {
// TraceEvent("Nim_Case SS exit exception").detail("Code", e.code()).detail("key", key);
if (e.code() == error_code_actor_cancelled) {
// TraceEvent("Nim_Case SS exit actor cancelled").detail("key", key);
throw e;
}
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata.isValid() || metadata->watchPromise.getFutureReferenceCount() == 1) {
return Void();
}
}
}
}
ACTOR Future<Void> sameVersionDifKey(Version ver, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
state ReadYourWritesTransaction tr(cx);
try {
state Optional<Value> valSS = wait(tr.get(key));
// TraceEvent("Nim_Case 5 enter").detail("Key", key).detail("Value", value);
if (valSS != metadata->value) { // val_3 != val_1
try {
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
state Optional<Value> valSS = wait(tr.get(key));
// TraceEvent("Nim_Case 5 got value").detail("Key", key).detail("ValueSS", valSS).detail("Value", value);
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (metadata.isValid() && valSS != metadata->value) { // val_3 != val_1
// TraceEvent("Nim_Case 5 valSS != metadata_value");
cx->deleteWatchMetadata(key);
// TraceEvent("Nim_Case 5 counts").detail("version", ver).detail("PCount", metadata->watchPromise.getPromiseReferenceCount()).detail("FCount", metadata->watchPromise.getFutureReferenceCount()).detail("can be set", metadata->watchPromise.canBeSet());
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
}
if (valSS == value) { // val_3 == val_2
// TraceEvent("Nim_Case 5 valSS == value");
metadata = Reference<WatchMetadata>(new WatchMetadata(value, ver, info, tags));
cx->setWatchMetadata(key, metadata);
metadata->watchFutureSS = watchStorageServerResp(key, cx);
}
wait(tr.commit());
// TraceEvent("Nim_Case 5 got here");
if (valSS != value) return Void(); // if val_3 != val_2
metadata = cx->getWatchMetadata(key);
if (metadata) { // if val_3 == val_2
if (metadata.isValid()) { // if val_3 == val_2
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
wait(watchFuture);
}
@ -2281,6 +2299,7 @@ ACTOR Future<Void> sameVersionDifKey(Version ver, Key key, Optional<Value> value
return Void();
}
catch(Error& e) {
// TraceEvent("Nim_Case 5 exception").detail("Key", key).detail("Value", value).detail("Code", e.code());
state Error err = e;
wait(tr.onError(e));
throw err;
@ -2289,11 +2308,11 @@ ACTOR Future<Void> sameVersionDifKey(Version ver, Key key, Optional<Value> value
Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
TraceEvent("Nim_Case start").detail("Key", key).detail("Value", value).detail("Version", ver);
// TraceEvent("Nim_Case start").detail("Key", key).detail("Value", value).detail("Version", ver);
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata) { // case 1: key not in map
TraceEvent("Nim_Case 1").detail("Key", key).detail("Value", value);
if (!metadata.isValid()) { // case 1: key not in map
// TraceEvent("Nim_Case 1").detail("Key", key).detail("Value", value);
metadata = Reference<WatchMetadata>(new WatchMetadata(value, ver, info, tags));
cx->setWatchMetadata(key, metadata);
@ -2304,9 +2323,9 @@ Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Databas
return watchFuture;
}
else if (metadata->value == value) { // case 2: val_1 == val_2
TraceEvent("Nim_Case 2").detail("Key", key).detail("Value", value);
// TraceEvent("Nim_Case 2").detail("Key", key).detail("Value", value);
if (ver > metadata->version) {
TraceEvent("Nim_Case 2 replace version").detail("Key", key).detail("Value", value);
// TraceEvent("Nim_Case 2 replace version").detail("Key", key).detail("Value", value);
metadata->version = ver;
metadata->info = info;
metadata->tags = tags;
@ -2315,7 +2334,7 @@ Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Databas
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
return watchFuture;
} else if(ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1
TraceEvent("Nim_Case 3").detail("Key", key).detail("Value", value);
// TraceEvent("Nim_Case 3").detail("Key", key).detail("Value", value);
cx->deleteWatchMetadata(key);
@ -2330,10 +2349,10 @@ Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Databas
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
return watchFuture;
} else if (metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2
TraceEvent("Nim_Case 5").detail("Key", key).detail("Value", value);
// TraceEvent("Nim_Case 5").detail("Key", key).detail("Value", value);
return sameVersionDifKey(ver, key, value, cx, info, tags);
}
TraceEvent("Nim_Case return").detail("Key", key).detail("Value", value);
// TraceEvent("Nim_Case return").detail("Key", key).detail("Value", value);
// Note: We will reach here for case 3: val_1 != val_2 && version_2 < version_1
return Future<Void>();
}
@ -2341,8 +2360,15 @@ Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Databas
ACTOR Future<Void> watchValueMap(Future<Version> version, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
state Version ver = wait(version);
wait(getWatchFuture(ver, key, value, cx, info, tags));
return Void();
try {
Future<Void> watchFuture = getWatchFuture(ver, key, value, cx, info, tags);
// TraceEvent("Nim_Case Got Future").detail("Key", key).detail("Value", value);
wait(watchFuture);
return Void();
} catch (Error & e) {
// TraceEvent("Nim_Case major exception 2").detail("Key", key).detail("Value", value).detail("Code", e.code());
throw e;
}
}
void transformRangeLimits(GetRangeLimits limits, bool reverse, GetKeyValuesRequest &req) {
@ -3015,6 +3041,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch, Database cx, TagSet tags, Trans
when(wait(cx->connectionFileChanged())) {
TEST(true); // Recreated a watch after switch
cx->watchMap.clear();
watch->watchFuture =
watchValueMap(cx->minAcceptableReadVersion, watch->key, watch->value, cx, info, tags);
}
@ -3548,7 +3575,7 @@ void Transaction::cancelWatches(Error const& e) {
void Transaction::setupWatches() {
try {
Future<Version> watchVersion = getCommittedVersion() > 0 ? getCommittedVersion() : getReadVersion();
TraceEvent("Nim_calling watchValueMap");
// TraceEvent("Nim_calling watchValueMap");
for(int i = 0; i < watches.size(); ++i)
watches[i]->setWatch(watchValueMap(watchVersion, watches[i]->key, watches[i]->value, cx, info, options.readTags));