change to references

This commit is contained in:
Nim Wijetunga 2021-02-02 20:29:03 -05:00
parent c67aac8212
commit b6e7961b78
3 changed files with 88 additions and 79 deletions

View File

@ -140,6 +140,20 @@ public:
}
};
class WatchMetadata: public ReferenceCounted<WatchMetadata> {
public:
Optional<Value> value;
Version version;
Promise<Version> watchPromise;
Future<Version> watchFuture;
Future<Version> watchFutureSS;
TransactionInfo info;
TagSet tags;
WatchMetadata(Optional<Value> value, Version version, Future<Version> watchFutureSS, TransactionInfo info, TagSet tags);
};
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
public:
static DatabaseContext* allocateOnForeignThread() {
@ -175,7 +189,12 @@ public:
// Update the watch counter for the database
void addWatch();
void removeWatch();
// watch map operations
Reference<WatchMetadata> getWatchMetadata(Key key) const;
void setWatchMetadata(Key key, Reference<WatchMetadata> metadata);
void deleteWatchMetadata(Key key);
void setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value );
Error deferredError;
@ -364,7 +383,8 @@ public:
std::unique_ptr<SpecialKeyRangeReadImpl> &&impl);
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;
static const std::vector<std::string> debugTransactionTagChoices;
std::map<Key, Reference<WatchMetadata>> watchMap;
};
#endif

View File

@ -1372,17 +1372,17 @@ Database Database::createDatabase( std::string connFileName, int apiVersion, boo
return Database::createDatabase(rccf, apiVersion, internal, clientLocality);
}
Optional<WatchMetadata> Database::getWatchMetadata(Key key) const {
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(Key key) const {
const auto it = watchMap.find(key);
if (it == watchMap.end()) return Optional<WatchMetadata>();
if (it == watchMap.end()) return Reference<WatchMetadata>();
return it->second;
}
void Database::setWatchMetadata(Key key, WatchMetadata metadata) {
void DatabaseContext::setWatchMetadata(Key key, Reference<WatchMetadata> metadata) {
watchMap[key] = metadata;
}
void Database::deleteWatchMetadata(Key key) {
void DatabaseContext::deleteWatchMetadata(Key key) {
watchMap.erase(key);
}
@ -1391,8 +1391,6 @@ WatchMetadata::WatchMetadata(Optional<Value> value, Version version, Future<Vers
watchFuture = watchPromise.getFuture();
}
WatchMetadata::WatchMetadata(): info(TransactionInfo(TaskPriority::Zero, SpanID(0, 0))) {} // needed since TransactionInfo cannot be default constructed
const UniqueOrderedOptionList<FDBTransactionOptions>& Database::getTransactionDefaults() const {
ASSERT(db);
return db->transactionDefaults;
@ -2169,6 +2167,7 @@ ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Valu
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
state WatchValueReply resp;
TraceEvent("Nim_WatcherCommitted Started");
choose {
when(WatchValueReply r = wait(
loadBalance(cx.getPtr(), ssi.second, &StorageServerInterface::watchValue,
@ -2187,11 +2186,14 @@ ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Valu
//cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop)
Version v = wait(waitForCommittedVersion(cx, resp.version, span.context));
//TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value);
TraceEvent("Nim_WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value);
// False if there is a master failure between getting the response and getting the committed version,
// Dependent on SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT
if (v - resp.version < 50000000) return resp.version;
if (v - resp.version < 50000000) {
TraceEvent("Nim_WatcherCommitted Done!");
return resp.version;
}
ver = v;
} catch (Error& e) {
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
@ -2214,75 +2216,81 @@ ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Valu
}
ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
state Optional<WatchMetadata> metadata = cx.getWatchMetadata(key);
if (!metadata.present()) return Void();
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata) return Void();
loop {
try {
Version watchVersion = wait(metadata.get().watchFutureSS);
if (watchVersion >= metadata.get().version) { // case 1: version_1 (SS) >= version_2 (map)
cx.deleteWatchMetadata(key);
metadata.get().watchPromise.send(watchVersion);
TraceEvent("Nim_Case SS enter").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
Version watchVersion = wait(metadata->watchFutureSS);
TraceEvent("Nim_Case SS done wait");
if (watchVersion >= metadata->version) { // case 1: version_1 (SS) >= version_2 (map)
TraceEvent("Nim_Case SS 1").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
cx->deleteWatchMetadata(key);
metadata->watchPromise.send(watchVersion);
} else {
// 2 because we have one future ref in map and one in the current function
if (metadata.get().watchPromise.getFutureReferenceCount() == 2) { // case 2: version_1 < version_2 and future_count == 2
cx.deleteWatchMetadata(key);
} else { // case 3: future_count > 2
metadata.get().watchFutureSS = watchValue(Future<Version>(metadata.get().version), key, metadata.get().value, cx, metadata.get().info, metadata.get().tags);
cx.setWatchMetadata(key, metadata.get());
if (metadata->watchPromise.getFutureReferenceCount() == 1) { // case 2: version_1 < version_2 and future_count == 1
TraceEvent("Nim_Case SS 2").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
cx->deleteWatchMetadata(key);
} else { // case 3: future_count > 1
TraceEvent("Nim_Case SS 3").detail("Key", key).detail("Value", metadata->value).detail("numF", metadata->watchPromise.getFutureReferenceCount());
metadata->watchFutureSS = watchValue(Future<Version>(metadata->version), key, metadata->value, cx, metadata->info, metadata->tags);
}
}
// if there is no key in the map then destroy actor
metadata = cx.getWatchMetadata(key);
if (!metadata.present()) return Void();
metadata = cx->getWatchMetadata(key);
if (!metadata) return Void();
} catch(Error &e) {
// no more watches for the key so shutdown actor
metadata = cx.getWatchMetadata(key);
if (!metadata.present()) return Void();
metadata = cx->getWatchMetadata(key);
if (!metadata) return Void();
}
}
}
ACTOR Future<Void> watchValueMap(Future<Version> version, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
TraceEvent("Nim_Case start").detail("Key", key).detail("Value", value);
state Version ver = wait( version );
state Optional<WatchMetadata> metadata = cx.getWatchMetadata(key);
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata.present()) { // case 1: key not in map
if (!metadata) { // case 1: key not in map
TraceEvent("Nim_Case 1").detail("Key", key).detail("Value", value);
Future<Version> watchFutureSS = watchValue(version, key, value, cx, info, tags);
WatchMetadata newMetadata = WatchMetadata(value, ver, watchFutureSS, info, tags);
cx.setWatchMetadata(key, newMetadata);
metadata = Reference<WatchMetadata>(new WatchMetadata(value, ver, watchFutureSS, info, tags));
cx->setWatchMetadata(key, metadata);
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
Future<Void> watchFuture = success(newMetadata.watchPromise.getFuture());
Future<Void> SSResp = watchStorageServerResp(key, cx);
wait(watchFuture);
} else if (metadata.get().value == value) { // case 2: val_1 == val_2
if (ver > metadata.get().version) {
metadata.get().version = ver;
metadata.get().info = info;
metadata.get().tags = tags;
cx.setWatchMetadata(key, metadata.get());
wait(SSResp && watchFuture);
}
else if (metadata->value == value) { // case 2: val_1 == val_2
TraceEvent("Nim_Case 2").detail("Key", key).detail("Value", value);
if (ver > metadata->version) {
metadata->version = ver;
metadata->info = info;
metadata->tags = tags;
}
Future<Void> watchFuture = success(metadata.get().watchPromise.getFuture());
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
wait(watchFuture);
} else if(ver > metadata.get().version) { // case 3: val_1 != val_2 && version_2 > version_1
} else if(ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1
TraceEvent("Nim_Case 3").detail("Key", key).detail("Value", value);
Future<Version> watchFutureSS = watchValue(version, key, value, cx, info, tags);
WatchMetadata newMetadata = WatchMetadata(value, ver, watchFutureSS, info, tags);
cx.setWatchMetadata(key, newMetadata);
Reference<WatchMetadata> newMetadata(new WatchMetadata(value, ver, watchFutureSS, info, tags));
cx->setWatchMetadata(key, newMetadata);
metadata.get().watchPromise.send(ver);
metadata.get().watchFutureSS.cancel();
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
Future<Void> watchFuture = success(newMetadata.watchPromise.getFuture());
Future<Void> watchFuture = success(newMetadata->watchPromise.getFuture());
wait(watchFuture);
} else if (metadata.get().value != value && metadata.get().version == ver) { // case 5: val_1 != val_2 && version_1 == version_2
} else if (metadata->value != value && metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2
TraceEvent("Nim_Case 5").detail("Key", key).detail("Value", value);
state ReadYourWritesTransaction tr(cx);
try {
@ -2290,24 +2298,24 @@ ACTOR Future<Void> watchValueMap(Future<Version> version, Key key, Optional<Valu
if (valSS == value) { // val_3 == val_2
Future<Version> watchFutureSS = watchValue(version, key, value, cx, info, tags);
WatchMetadata newMetadata = WatchMetadata(value, ver, watchFutureSS, info, tags);
cx.setWatchMetadata(key, newMetadata);
Reference<WatchMetadata> newMetadata(new WatchMetadata(value, ver, watchFutureSS, info, tags));
cx->setWatchMetadata(key, newMetadata);
}
if (valSS != metadata.get().value) { // val_3 != val_1
if (valSS != value) cx.deleteWatchMetadata(key);
if (valSS != metadata->value) { // val_3 != val_1
if (valSS != value) cx->deleteWatchMetadata(key);
metadata.get().watchPromise.send(ver);
metadata.get().watchFutureSS.cancel();
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
}
wait(tr.commit());
if (valSS != value) return Void();
metadata = cx.getWatchMetadata(key);
if (metadata.present()) { // if val_3 == val_2
Future<Void> watchFuture = success(metadata.get().watchPromise.getFuture());
metadata = cx->getWatchMetadata(key);
if (metadata) { // if val_3 == val_2
Future<Void> watchFuture = success(metadata->watchPromise.getFuture());
wait(watchFuture);
}
}
@ -2315,7 +2323,7 @@ ACTOR Future<Void> watchValueMap(Future<Version> version, Key key, Optional<Valu
wait(tr.onError(e));
}
}
TraceEvent("Nim_Case return").detail("Key", key).detail("Value", value);
// Note: We will reach here for case 3: val_1 != val_2 && version_2 < version_1
return Void();
}
@ -3523,7 +3531,7 @@ void Transaction::cancelWatches(Error const& e) {
void Transaction::setupWatches() {
try {
Future<Version> watchVersion = getCommittedVersion() > 0 ? getCommittedVersion() : getReadVersion();
TraceEvent("Nim_calling watchValueMap");
for(int i = 0; i < watches.size(); ++i)
watches[i]->setWatch(watchValueMap(watchVersion, watches[i]->key, watches[i]->value, cx, info, options.readTags));

View File

@ -94,13 +94,8 @@ public:
const UniqueOrderedOptionList<FDBTransactionOptions>& getTransactionDefaults() const;
Optional<WatchMetadata> getWatchMetadata(Key key) const;
void setWatchMetadata(Key key, WatchMetadata metadata);
void deleteWatchMetadata(Key key);
private:
Reference<DatabaseContext> db;
std::map<Key, WatchMetadata> watchMap;
};
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
@ -174,20 +169,6 @@ struct TransactionInfo {
: taskID(taskID), spanID(spanID), useProvisionalProxies(false) {}
};
struct WatchMetadata {
Optional<Value> value;
Version version;
Promise<Version> watchPromise;
Future<Version> watchFuture;
Future<Version> watchFutureSS;
TransactionInfo info;
TagSet tags;
WatchMetadata();
WatchMetadata(Optional<Value> value, Version version, Future<Version> watchFutureSS, TransactionInfo info, TagSet tags);
};
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 };