fix pr issues

This commit is contained in:
Nim Wijetunga 2021-02-11 15:51:37 -05:00
parent 270a18f089
commit 058557a3b1
3 changed files with 110 additions and 84 deletions

View File

@ -25,6 +25,7 @@
#include "fdbclient/StorageServerInterface.h"
#include "flow/genericactors.actor.h"
#include <vector>
#include <unordered_map>
#pragma once
#include "fdbclient/NativeAPI.actor.h"
@ -142,6 +143,7 @@ public:
class WatchMetadata: public ReferenceCounted<WatchMetadata> {
public:
Key key;
Optional<Value> value;
Version version;
Promise<Version> watchPromise;
@ -151,7 +153,7 @@ public:
TransactionInfo info;
TagSet tags;
WatchMetadata(Optional<Value> value, Version version, TransactionInfo info, TagSet tags);
WatchMetadata(Key key, Optional<Value> value, Version version, TransactionInfo info, TagSet tags);
};
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
@ -191,9 +193,9 @@ public:
void removeWatch();
// watch map operations
Reference<WatchMetadata> getWatchMetadata(Key key) const;
void setWatchMetadata(Key key, Reference<WatchMetadata> metadata);
void deleteWatchMetadata(Key key);
Reference<WatchMetadata> getWatchMetadata(KeyRef key) const;
KeyRef setWatchMetadata(Reference<WatchMetadata> metadata);
void deleteWatchMetadata(KeyRef key);
void clearWatchMetadata();
void setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value );
@ -385,7 +387,7 @@ public:
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;
std::map<Key, Reference<WatchMetadata>> watchMap;
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
};
#endif

View File

@ -1372,17 +1372,19 @@ Database Database::createDatabase( std::string connFileName, int apiVersion, boo
return Database::createDatabase(rccf, apiVersion, internal, clientLocality);
}
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(Key key) const {
Reference<WatchMetadata> DatabaseContext::getWatchMetadata(KeyRef key) const {
const auto it = watchMap.find(key);
if (it == watchMap.end()) return Reference<WatchMetadata>();
return it->second;
}
void DatabaseContext::setWatchMetadata(Key key, Reference<WatchMetadata> metadata) {
watchMap[key] = metadata;
KeyRef DatabaseContext::setWatchMetadata(Reference<WatchMetadata> metadata) {
KeyRef keyRef = metadata->key.contents();
watchMap[keyRef] = metadata;
return keyRef;
}
void DatabaseContext::deleteWatchMetadata(Key key) {
void DatabaseContext::deleteWatchMetadata(KeyRef key) {
watchMap.erase(key);
}
@ -1390,7 +1392,8 @@ void DatabaseContext::clearWatchMetadata() {
watchMap.clear();
}
WatchMetadata::WatchMetadata(Optional<Value> value, Version version, TransactionInfo info, TagSet tags): value(value), version(version), info(info), tags(tags) {
WatchMetadata::WatchMetadata(Key key, Optional<Value> value, Version version, TransactionInfo info, TagSet tags)
: key(key), value(value), version(version), info(info), tags(tags) {
// create dummy future
watchFuture = watchPromise.getFuture();
}
@ -2153,7 +2156,7 @@ ACTOR Future<Void> readVersionBatcher(
uint32_t flags);
ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
TransactionInfo info, TagSet tags) {
state Version ver = wait( version );
state Span span("NAPI:watchValue"_loc, info.spanID);
cx->validateVersion(ver);
@ -2217,13 +2220,14 @@ ACTOR Future<Version> watchValue(Future<Version> version, Key key, Optional<Valu
}
}
ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
ACTOR Future<Void> watchStorageServerResp(KeyRef key, Database cx) {
loop {
try {
state Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (!metadata.isValid()) return Void();
Version watchVersion = wait(watchValue(Future<Version>(metadata->version), key, metadata->value, cx, metadata->info, metadata->tags));
Version watchVersion = wait(watchValue(Future<Version>(metadata->version), metadata->key, metadata->value,
cx, metadata->info, metadata->tags));
metadata = cx->getWatchMetadata(key);
if (!metadata.isValid()) return Void();
@ -2237,7 +2241,7 @@ ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
cx->deleteWatchMetadata(key);
}
}
} catch(Error &e) {
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
@ -2249,6 +2253,9 @@ ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
cx->deleteWatchMetadata(key);
return Void();
}
cx->deleteWatchMetadata(key);
metadata->watchPromise.sendError(e);
throw e;
}
}
}
@ -2256,48 +2263,48 @@ ACTOR Future<Void> watchStorageServerResp(Key key, Database cx) {
ACTOR Future<Void> sameVersionDiffValue(Version ver, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
state Optional<Value> valSS = wait(tr.get(key));
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key.contents());
try {
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
state Optional<Value> valSS = wait(tr.get(key));
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
if (metadata.isValid() &&
valSS != metadata->value) { // val_3 != val_1 (storage server value doesnt match value in map)
cx->deleteWatchMetadata(key.contents());
if (metadata.isValid() && valSS != metadata->value) { // val_3 != val_1 (storage server value doesnt match value in map)
cx->deleteWatchMetadata(key);
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
}
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
if (valSS ==
value) { // val_3 == val_2 (storage server value matches value passed into the function -> new watch)
metadata = makeReference<WatchMetadata>(key, value, ver, info, tags);
KeyRef keyRef = cx->setWatchMetadata(metadata);
metadata->watchFutureSS = watchStorageServerResp(keyRef, cx);
}
if (valSS != value) return Void(); // if val_3 != val_2
wait(success(metadata->watchPromise.getFuture())); // val_3 == val_2
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
if (valSS == value) { // val_3 == val_2 (storage server value matches value passed into the function -> new watch)
metadata = makeReference<WatchMetadata>(value, ver, info, tags);
cx->setWatchMetadata(key, metadata);
metadata->watchFutureSS = watchStorageServerResp(key, cx);
}
if (valSS != value) return Void(); // if val_3 != val_2
wait(success(metadata->watchPromise.getFuture())); // val_3 == val_2
return Void();
}
catch(Error& e) {
state Error err = e;
wait(tr.onError(e));
throw err;
}
}
Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Database cx,
TransactionInfo info, TagSet tags) {
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key);
Reference<WatchMetadata> metadata = cx->getWatchMetadata(key.contents());
if (!metadata.isValid()) { // case 1: key not in map
metadata = makeReference<WatchMetadata>(value, ver, info, tags);
cx->setWatchMetadata(key, metadata);
metadata = makeReference<WatchMetadata>(key, value, ver, info, tags);
KeyRef keyRef = cx->setWatchMetadata(metadata);
metadata->watchFutureSS = watchStorageServerResp(key, cx);
metadata->watchFutureSS = watchStorageServerResp(keyRef, cx);
return success(metadata->watchPromise.getFuture());
}
else if (metadata->value == value) { // case 2: val_1 == val_2 (received watch with same value as key already in the map so just update)
@ -2310,15 +2317,15 @@ Future<Void> getWatchFuture(Version ver, Key key, Optional<Value> value, Databas
return success(metadata->watchPromise.getFuture());
} else if(ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1 (recived watch with different value and a higher version so recreate in SS)
TEST(true); // Setting a watch that has a different value than the one in the map but a higher version (newer)
cx->deleteWatchMetadata(key);
cx->deleteWatchMetadata(key.contents());
metadata->watchPromise.send(ver);
metadata->watchFutureSS.cancel();
metadata = makeReference<WatchMetadata>(value, ver, info, tags);
cx->setWatchMetadata(key, metadata);
metadata = makeReference<WatchMetadata>(key, value, ver, info, tags);
KeyRef keyRef = cx->setWatchMetadata(metadata);
metadata->watchFutureSS = watchStorageServerResp(key, cx);
metadata->watchFutureSS = watchStorageServerResp(keyRef, cx);
return success(metadata->watchPromise.getFuture());
} else if (metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2 (recived watch with different value but same version)
@ -2338,7 +2345,7 @@ ACTOR Future<Void> watchValueMap(Future<Version> version, Key key, Optional<Valu
wait(watchFuture);
return Void();
} catch (Error & e) {
throw e;
throw;
}
}

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -65,35 +65,41 @@ struct WatchesSameKeyWorkload : TestWorkload {
}
ACTOR static Future<Void> setKeyRandomValue(Database cx, Key key, Optional<Value> val) {
try {
if ( !val.present() ) val = Value( deterministicRandom()->randomUniqueID().toString() );
state ReadYourWritesTransaction tr(cx);
tr.set(key, val.get());
wait( tr.commit() );
return Void();
} catch ( Error& e ) {
throw e;
state ReadYourWritesTransaction tr(cx);
loop {
try {
if (!val.present()) val = Value(deterministicRandom()->randomUniqueID().toString());
tr.set(key, val.get());
wait(tr.commit());
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR static Future<Optional<Value>> getValue(Database cx, Key key) {
try {
state ReadYourWritesTransaction tr(cx);
Optional<Value> val = wait(tr.get(key));
return val;
} catch ( Error& e ) {
throw e;
state ReadYourWritesTransaction tr(cx);
loop {
try {
Optional<Value> val = wait(tr.get(key));
return val;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR static Future<Future<Void>> watchKey(Database cx, Key key) {
try {
state ReadYourWritesTransaction tr(cx);
state Future<Void> watchFuture = tr.watch(key);
wait( tr.commit() );
return watchFuture;
} catch ( Error& e ) {
throw e;
state ReadYourWritesTransaction tr(cx);
loop {
try {
state Future<Void> watchFuture = tr.watch(key);
wait(tr.commit());
return watchFuture;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
@ -102,22 +108,25 @@ struct WatchesSameKeyWorkload : TestWorkload {
* Tests case 2 in the design doc:
* - we get a watch that has the same value as a key in the watch map
* */
state ReadYourWritesTransaction tr(cx);
loop {
try {
state std::vector<Future<Void>> watchFutures;
state int i;
for ( i = 0; i < self->numWatches; i++ ) {
Future<Void> watchFuture = wait(watchKey(cx, key));
watchFutures.push_back(watchFuture);
watchFutures.push_back(tr.watch(key));
}
wait(tr.commit());
wait( setKeyRandomValue(cx, key, Optional<Value>()) );
for ( i = 0; i < watchFutures.size(); i++) {
wait( watchFutures[i] );
}
return Void();
} catch( Error& e ) {}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
@ -126,6 +135,7 @@ struct WatchesSameKeyWorkload : TestWorkload {
* Tests case 3 in the design doc:
* - we get a watch that has a different value than the key in the map but the version is larger
* */
state ReadYourWritesTransaction tr(cx);
loop {
try {
state std::vector<Future<Void>> watchFutures;
@ -133,7 +143,6 @@ struct WatchesSameKeyWorkload : TestWorkload {
state int i;
state Value val = Value( deterministicRandom()->randomUniqueID().toString() );
state ReadYourWritesTransaction tr(cx);
tr.set(key, val);
for ( i = 0; i < self->numWatches; i++ ) {
watchFutures.push_back(tr.watch(key));
@ -145,7 +154,9 @@ struct WatchesSameKeyWorkload : TestWorkload {
wait( watchFutures[i] );
}
return Void();
} catch ( Error& e ) {}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
@ -154,6 +165,7 @@ struct WatchesSameKeyWorkload : TestWorkload {
* Tests case 2 for the storage server response:
* - i.e ABA but when the storage server responds the future count == 1 so we do nothing (no refire)
* */
state ReadYourWritesTransaction tr(cx);
loop {
try {
wait ( setKeyRandomValue(cx, key, Optional<Value>()) );
@ -161,7 +173,6 @@ struct WatchesSameKeyWorkload : TestWorkload {
state Future<Void> watch1 = wait(watchKey(cx, key));
wait ( setKeyRandomValue(cx, key, Optional<Value>()) );
state ReadYourWritesTransaction tr(cx);
tr.set(key, val.get());
state Future<Void> watch2 = tr.watch(key);
wait( tr.commit() );
@ -169,7 +180,9 @@ struct WatchesSameKeyWorkload : TestWorkload {
watch1.cancel();
watch2.cancel();
return Void();
} catch ( Error& e ) {}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
@ -178,6 +191,7 @@ struct WatchesSameKeyWorkload : TestWorkload {
* Tests case 3 for the storage server response:
* - i.e ABA but when the storage server responds the future count > 1 so we refire request to SS
* */
state ReadYourWritesTransaction tr(cx);
loop {
try {
wait ( setKeyRandomValue(cx, key, Optional<Value>()) );
@ -185,7 +199,6 @@ struct WatchesSameKeyWorkload : TestWorkload {
state Future<Void> watch1 = wait(watchKey(cx, key));
wait ( setKeyRandomValue(cx, key, Optional<Value>()) );
state ReadYourWritesTransaction tr(cx);
tr.set(key, val.get());
state Future<Void> watch2 = tr.watch(key);
wait( tr.commit() );
@ -194,7 +207,9 @@ struct WatchesSameKeyWorkload : TestWorkload {
wait( watch1 );
wait( watch2 );
return Void();
} catch ( Error& e ) {}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
@ -203,13 +218,13 @@ struct WatchesSameKeyWorkload : TestWorkload {
* Tests case 5 in the design doc:
* - i.e values of watches are different but versions are the same
* */
state ReadYourWritesTransaction tr1(cx);
state ReadYourWritesTransaction tr2(cx);
loop {
try {
state Value val1 = Value( deterministicRandom()->randomUniqueID().toString() );
state Value val2 = Value( deterministicRandom()->randomUniqueID().toString() );
state ReadYourWritesTransaction tr1(cx);
tr1.setOption( FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE );
state ReadYourWritesTransaction tr2(cx);
tr2.setOption( FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE );
tr1.set(key, val1);
tr2.set(key, val2);
@ -222,7 +237,9 @@ struct WatchesSameKeyWorkload : TestWorkload {
wait( watch1 && watch2 );
return Void();
} catch ( Error& e ) {}
} catch (Error& e) {
wait(tr1.onError(e) && tr2.onError(e));
}
}
}