Fix merge issue that caused tenant serialization problems. Fix case where arena was passed by value rather than reference. Watches should check the tenant map at the latest version rather than the watch version.

This commit is contained in:
A.J. Beamon 2022-03-01 08:27:00 -08:00
parent c99c7cf55f
commit e898e59d18
2 changed files with 16 additions and 11 deletions

View File

@ -65,7 +65,6 @@ public:
template <class Ar>
void serialize(Ar& ar) {
KeyRef subspace;
serializer(ar, id);
if (ar.isDeserializing) {
serializer(ar, id, subspace);
if (id >= 0) {

View File

@ -2127,7 +2127,7 @@ ACTOR Future<Void> getShardStateQ(StorageServer* data, GetShardStateRequest req)
return Void();
}
KeyRef addPrefix(KeyRef const& key, Optional<Key> prefix, Arena arena) {
KeyRef addPrefix(KeyRef const& key, Optional<Key> prefix, Arena& arena) {
if (prefix.present()) {
return key.withPrefix(prefix.get(), arena);
} else {
@ -6855,7 +6855,7 @@ ACTOR Future<Void> watchValueWaitForVersion(StorageServer* self,
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
try {
wait(success(waitForVersionNoTooOld(self, req.version)));
Optional<TenantMapEntry> entry = self->getTenantEntry(req.version, req.tenantInfo.name);
Optional<TenantMapEntry> entry = self->getTenantEntry(latestVersion, req.tenantInfo.name);
if (entry.present()) {
req.key = req.key.withPrefix(entry.get().prefix);
}
@ -6876,21 +6876,24 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
state Span span("SS:serveWatchValueRequestsImpl"_loc, { req.spanContext });
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
if (!metadata.isValid()) { // case 1: no watch set for the current key
// case 1: no watch set for the current key
if (!metadata.isValid()) {
metadata = makeReference<ServerWatchMetadata>(req.key, req.value, req.version, req.tags, req.debugID);
KeyRef key = self->setWatchMetadata(metadata);
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise);
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
} else if (metadata->value ==
req.value) { // case 2: there is a watch in the map and it has the same value so just update version
}
// case 2: there is a watch in the map and it has the same value so just update version
else if (metadata->value == req.value) {
if (req.version > metadata->version) {
metadata->version = req.version;
metadata->tags = req.tags;
metadata->debugID = req.debugID;
}
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
} else if (req.version > metadata->version) { // case 3: version in map has a lower version so trigger watch and
// create a new entry in map
}
// case 3: version in map has a lower version so trigger watch and create a new entry in map
else if (req.version > metadata->version) {
self->deleteWatchMetadata(req.key.contents());
metadata->versionPromise.send(req.version);
metadata->watch_impl.cancel();
@ -6900,11 +6903,14 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise);
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
} else if (req.version <
metadata->version) { // case 4: version in the map is higher so immediately trigger watch
}
// case 4: version in the map is higher so immediately trigger watch
else if (req.version < metadata->version) {
TEST(true); // watch version in map is higher so trigger watch (case 4)
req.reply.send(WatchValueReply{ metadata->version });
} else { // case 5: watch value differs but their versions are the same (rare case) so check with the SS
}
// case 5: watch value differs but their versions are the same (rare case) so check with the SS
else {
TEST(true); // watch version in the map is the same but value is different (case 5)
loop {
try {