Merge pull request #4382 from sfc-gh-nwijetunga/optimize-watches-server

Optimize Watches Server
This commit is contained in:
Markus Pilman 2021-03-02 14:25:21 -07:00 committed by GitHub
commit 4806299821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 283 additions and 102 deletions

View File

@ -62,6 +62,7 @@
#include "fdbrpc/Smoother.h"
#include "fdbrpc/Stats.h"
#include "flow/TDMetric.actor.h"
#include "flow/genericactors.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -77,6 +78,7 @@ inline bool canReplyWith(Error e) {
case error_code_future_version:
case error_code_wrong_shard_server:
case error_code_process_behind:
case error_code_watch_cancelled:
//case error_code_all_alternatives_failed:
return true;
default:
@ -259,6 +261,21 @@ struct FetchInjectionInfo {
vector<VerUpdateRef> changes;
};
class ServerWatchMetadata: public ReferenceCounted<ServerWatchMetadata> {
public:
Key key;
Optional<Value> value;
Version version;
Future<Version> watch_impl;
Promise<Version> versionPromise;
Optional<TagSet> tags;
Optional<UID> debugID;
ServerWatchMetadata(Key key, Optional<Value> value, Version version, Optional<TagSet> tags, Optional<UID> debugID)
: key(key), value(value), version(version), tags(tags), debugID(debugID) {}
};
struct StorageServer {
typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
@ -286,6 +303,7 @@ private:
VersionedData versionedData;
std::map<Version, Standalone<VerUpdateRef>> mutationLog; // versions (durableVersion, version]
std::unordered_map<KeyRef, Reference<ServerWatchMetadata>> watchMap; // keep track of server watches
public:
public:
@ -304,6 +322,12 @@ public:
Histogram::Unit::bytes_per_second)) {}
} fetchKeysHistograms;
// watch map operations
Reference<ServerWatchMetadata> getWatchMetadata(KeyRef key) const;
KeyRef setWatchMetadata(Reference<ServerWatchMetadata> metadata);
void deleteWatchMetadata(KeyRef key);
void clearWatchMetadata();
class CurrentRunningFetchKeys {
std::unordered_map<UID, double> startTimeMap;
std::unordered_map<UID, KeyRange> keyRangeMap;
@ -782,13 +806,21 @@ public:
promise.sendError(err);
}
template<class Request, class HandleFunction>
Future<Void> readGuard(const Request& request, const HandleFunction& fun) {
template<class Request>
bool shouldRead(const Request& request) {
auto rate = currentRate();
if (rate < SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD && deterministicRandom()->random01() > std::max(SERVER_KNOBS->STORAGE_DURABILITY_LAG_MIN_RATE, rate/SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD)) {
//request.error = future_version();
sendErrorWithPenalty(request.reply, server_overloaded(), getPenalty());
++counters.readsRejected;
return false;
}
return true;
}
template<class Request, class HandleFunction>
Future<Void> readGuard(const Request& request, const HandleFunction& fun) {
bool read = shouldRead(request);
if (!read) {
return Void();
}
return fun(this, request);
@ -815,6 +847,28 @@ void StorageServer::byteSampleApplyMutation( MutationRef const& m, Version ver )
ASSERT(false); // Mutation of unknown type modfying byte sample
}
// watchMap Operations
Reference<ServerWatchMetadata> StorageServer::getWatchMetadata(KeyRef key) const {
const auto it = watchMap.find(key);
if (it == watchMap.end()) return Reference<ServerWatchMetadata>();
return it->second;
}
KeyRef StorageServer::setWatchMetadata(Reference<ServerWatchMetadata> metadata) {
KeyRef keyRef = metadata->key.contents();
watchMap[keyRef] = metadata;
return keyRef;
}
void StorageServer::deleteWatchMetadata(KeyRef key) {
watchMap.erase(key);
}
void StorageServer::clearWatchMetadata() {
watchMap.clear();
}
#ifndef __INTEL_COMPILER
#pragma endregion
#endif
@ -1091,100 +1145,102 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
// Pessimistic estimate the number of overhead bytes used by each
// watch. Watch key references are stored in an AsyncMap<Key,bool>, and actors
// must be kept alive until the watch is finished.
static constexpr size_t WATCH_OVERHEAD_BYTES = 1000;
extern size_t WATCH_OVERHEAD_WATCHQ, WATCH_OVERHEAD_WATCHIMPL;
ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req, SpanID parent ) {
state Location spanLocation = "SS:WatchValueImpl"_loc;
ACTOR Future<Version> watchWaitForValueChange( StorageServer* data, SpanID parent, KeyRef key ) {
state Location spanLocation = "SS:watchWaitForValueChange"_loc;
state Span span(spanLocation, { parent });
try {
++data->counters.watchQueries;
state Reference<ServerWatchMetadata> metadata = data->getWatchMetadata(key);
if( req.debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask());
if( metadata->debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", metadata->debugID.get().first(), "watchValueSendReply.Before"); //.detail("TaskID", g_network->getCurrentTask());
wait(success(waitForVersionNoTooOld(data, req.version)));
if( req.debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
wait(success(waitForVersionNoTooOld(data, metadata->version)));
if( metadata->debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", metadata->debugID.get().first(), "watchValueSendReply.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
state Version minVersion = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(req.key);
loop {
try {
state Version latest = data->version.get();
TEST(latest >= minVersion && latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version
GetValueRequest getReq( span.context, req.key, latest, req.tags, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
span = Span(spanLocation, parent);
//TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest);
state Version minVersion = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(metadata->key);
loop {
try {
metadata = data->getWatchMetadata(key);
state Version latest = data->version.get();
TEST(latest >= minVersion && latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version
GetValueRequest getReq( span.context, metadata->key, latest, metadata->tags, metadata->debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
span = Span(spanLocation, parent);
if(reply.error.present()) {
ASSERT(reply.error.get().code() != error_code_future_version);
throw reply.error.get();
}
if(BUGGIFY) {
throw transaction_too_old();
}
DEBUG_MUTATION("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
if( req.debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
if( reply.value != req.value ) {
req.reply.send(WatchValueReply{ latest });
return Void();
}
if( data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES ) {
TEST(true); //Too many watches, reverting to polling
data->sendErrorWithPenalty(req.reply, watch_cancelled(), data->getPenalty());
return Void();
}
++data->numWatches;
data->watchBytes += (req.key.expectedSize() + req.value.expectedSize() + WATCH_OVERHEAD_BYTES);
try {
if(latest < minVersion) {
// If the version we read is less than minVersion, then we may fail to be notified of any changes that occur up to or including minVersion
// To prevent that, we'll check the key again once the version reaches our minVersion
watchFuture = watchFuture || data->version.whenAtLeast(minVersion);
}
if(BUGGIFY) {
// Simulate a trigger on the watch that results in the loop going around without the value changing
watchFuture = watchFuture || delay(deterministicRandom()->random01());
}
wait(watchFuture);
--data->numWatches;
data->watchBytes -= (req.key.expectedSize() + req.value.expectedSize() + WATCH_OVERHEAD_BYTES);
} catch( Error &e ) {
--data->numWatches;
data->watchBytes -= (req.key.expectedSize() + req.value.expectedSize() + WATCH_OVERHEAD_BYTES);
throw;
}
} catch( Error &e ) {
if( e.code() != error_code_transaction_too_old ) {
throw;
}
TEST(true); // Reading a watched key failed with transaction_too_old
if(reply.error.present()) {
ASSERT(reply.error.get().code() != error_code_future_version);
throw reply.error.get();
}
if(BUGGIFY) {
throw transaction_too_old();
}
watchFuture = data->watches.onChange(req.key);
wait(data->version.whenAtLeast(data->data().latestVersion));
DEBUG_MUTATION("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, metadata->key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
if( metadata->debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", metadata->debugID.get().first(), "watchValueSendReply.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
if( reply.value != metadata->value && latest >= metadata->version ) {
return latest; // fire watch
}
if( data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES ) {
TEST(true); //Too many watches, reverting to polling
throw watch_cancelled();
}
state int64_t watchBytes = (metadata->key.expectedSize() + metadata->value.expectedSize() + key.expectedSize() +
sizeof(Reference<ServerWatchMetadata>) + sizeof(ServerWatchMetadata) + WATCH_OVERHEAD_WATCHIMPL);
data->watchBytes += watchBytes;
try {
if(latest < minVersion) {
// If the version we read is less than minVersion, then we may fail to be notified of any changes that occur up to or including minVersion
// To prevent that, we'll check the key again once the version reaches our minVersion
watchFuture = watchFuture || data->version.whenAtLeast(minVersion);
}
if(BUGGIFY) {
// Simulate a trigger on the watch that results in the loop going around without the value changing
watchFuture = watchFuture || delay(deterministicRandom()->random01());
}
wait(watchFuture);
data->watchBytes -= watchBytes;
} catch( Error &e ) {
data->watchBytes -= watchBytes;
throw;
}
} catch( Error &e ) {
if( e.code() != error_code_transaction_too_old ) {
throw e;
}
TEST(true); // Reading a watched key failed with transaction_too_old
}
} catch (Error& e) {
if(!canReplyWith(e))
throw;
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
watchFuture = data->watches.onChange(metadata->key);
wait(data->version.whenAtLeast(data->data().latestVersion));
}
return Void();
}
ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
state Span span("SS:watchValue"_loc, { req.spanContext });
state Future<Void> watch = watchValue_impl( data, req, span.context );
void checkCancelWatchImpl( StorageServer* data, WatchValueRequest req ) {
Reference<ServerWatchMetadata> metadata = data->getWatchMetadata(req.key.contents());
if(metadata.isValid() && metadata->versionPromise.getFutureReferenceCount() == 1) {
// last watch timed out so cancel watch_impl and delete key from the map
data->deleteWatchMetadata(req.key.contents());
metadata->watch_impl.cancel();
}
}
ACTOR Future<Void> watchValueSendReply(StorageServer* data, WatchValueRequest req, Future<Version> resp, SpanID spanContext) {
state Span span("SS:watchValue"_loc, { spanContext });
state double startTime = now();
++data->counters.watchQueries;
++data->numWatches;
data->watchBytes += WATCH_OVERHEAD_WATCHQ;
loop {
double timeoutDelay = -1;
@ -1193,19 +1249,47 @@ ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
} else if(!BUGGIFY) {
timeoutDelay = std::max(CLIENT_KNOBS->WATCH_TIMEOUT - (now() - startTime), 0.0);
}
choose {
when( wait( watch ) ) {
return Void();
try {
choose {
when( Version ver = wait( resp ) ) {
// fire watch
req.reply.send(WatchValueReply{ ver });
checkCancelWatchImpl(data, req);
--data->numWatches;
data->watchBytes -= WATCH_OVERHEAD_WATCHQ;
return Void();
}
when( wait( timeoutDelay < 0 ? Never() : delay(timeoutDelay) ) ) {
// watch timed out
data->sendErrorWithPenalty(req.reply, timed_out(), data->getPenalty());
checkCancelWatchImpl(data, req);
--data->numWatches;
data->watchBytes -= WATCH_OVERHEAD_WATCHQ;
return Void();
}
when( wait( data->noRecentUpdates.onChange()) ) {}
}
when( wait( timeoutDelay < 0 ? Never() : delay(timeoutDelay) ) ) {
data->sendErrorWithPenalty(req.reply, timed_out(), data->getPenalty());
return Void();
}
when( wait( data->noRecentUpdates.onChange()) ) {}
} catch (Error& e) {
data->watchBytes -= WATCH_OVERHEAD_WATCHQ;
checkCancelWatchImpl(data, req);
--data->numWatches;
if (!canReplyWith(e)) throw e;
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
return Void();
}
}
}
#ifdef NO_INTELLISENSE
size_t WATCH_OVERHEAD_WATCHQ = sizeof(WatchValueSendReplyActorState<WatchValueSendReplyActor>) + sizeof(WatchValueSendReplyActor);
size_t WATCH_OVERHEAD_WATCHIMPL = sizeof(WatchWaitForValueChangeActorState<WatchWaitForValueChangeActor>) + sizeof(WatchWaitForValueChangeActor);
#else
size_t WATCH_OVERHEAD_WATCHQ = 0; // only used in IDE so value is irrelevant
size_t WATCH_OVERHEAD_WATCHIMPL = 0;
#endif
ACTOR Future<Void> getShardState_impl( StorageServer* data, GetShardStateRequest req ) {
ASSERT( req.mode != GetShardStateRequest::NO_WAIT );
@ -3891,12 +3975,99 @@ ACTOR Future<Void> serveGetKeyRequests( StorageServer* self, FutureStream<GetKey
}
}
ACTOR Future<Void> watchValueWaitForVersion( StorageServer* self, WatchValueRequest req, PromiseStream<WatchValueRequest> stream ) {
state Span span("SS:watchValueWaitForVersion"_loc, { req.spanContext });
try {
wait(success(waitForVersionNoTooOld(self, req.version)));
stream.send(req);
} catch (Error& e) {
if(!canReplyWith(e)) throw e;
self->sendErrorWithPenalty(req.reply, e, self->getPenalty());
}
return Void();
}
ACTOR Future<Void> serveWatchValueRequestsImpl( StorageServer* self, FutureStream<WatchValueRequest> stream ) {
loop {
state WatchValueRequest req = waitNext(stream);
state Reference<ServerWatchMetadata> metadata = self->getWatchMetadata(req.key.contents());
state Span span("SS:serveWatchValueRequestsImpl"_loc, { req.spanContext });
if (!metadata.isValid()) { // case 1: no watch set for the current key
metadata = makeReference<ServerWatchMetadata>(req.key, req.value, req.version, req.tags, req.debugID);
KeyRef key = self->setWatchMetadata(metadata);
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise);
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
}
else if (metadata->value == req.value) { // case 2: there is a watch in the map and it has the same value so just update version
if (req.version > metadata->version) {
metadata->version = req.version;
metadata->tags = req.tags;
metadata->debugID = req.debugID;
}
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
}
else if (req.version > metadata->version) { // case 3: version in map has a lower version so trigger watch and create a new entry in map
self->deleteWatchMetadata(req.key.contents());
metadata->versionPromise.send(req.version);
metadata->watch_impl.cancel();
metadata = makeReference<ServerWatchMetadata>(req.key, req.value, req.version, req.tags, req.debugID);
KeyRef key = self->setWatchMetadata(metadata);
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise);
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
} else if (req.version < metadata->version) { // case 4: version in the map is higher so immediately trigger watch
TEST(true); // watch version in map is higher so trigger watch (case 4)
req.reply.send(WatchValueReply{ metadata->version });
} else { // case 5: watch value differs but their versions are the same (rare case) so check with the SS
TEST(true); // watch version in the map is the same but value is different (case 5)
loop {
try {
state Version latest = self->version.get();
GetValueRequest getReq( span.context, metadata->key, latest, metadata->tags, metadata->debugID );
state Future<Void> getValue = getValueQ( self, getReq );
GetValueReply reply = wait( getReq.reply.getFuture() );
metadata = self->getWatchMetadata(req.key.contents());
if (metadata.isValid() && reply.value != metadata->value) { // valSS != valMap
self->deleteWatchMetadata(req.key.contents());
metadata->versionPromise.send(req.version);
metadata->watch_impl.cancel();
}
if (reply.value == req.value) { // valSS == valreq
metadata = makeReference<ServerWatchMetadata>(req.key, req.value, req.version, req.tags, req.debugID);
KeyRef key = self->setWatchMetadata(metadata);
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise);
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
} else {
req.reply.send(WatchValueReply{ latest });
}
break;
} catch (Error& e) {
if( e.code() != error_code_transaction_too_old ) {
if(!canReplyWith(e)) throw e;
self->sendErrorWithPenalty(req.reply, e, self->getPenalty());
break;
}
TEST(true); // Reading a watched key failed with transaction_too_old case 5
}
}
}
}
}
ACTOR Future<Void> serveWatchValueRequests( StorageServer* self, FutureStream<WatchValueRequest> watchValue ) {
state PromiseStream<WatchValueRequest> stream;
self->actors.add(serveWatchValueRequestsImpl(self, stream.getFuture()));
loop {
WatchValueRequest req = waitNext(watchValue);
// TODO: fast load balancing?
// SOMEDAY: combine watches for the same key/value into a single watch
self->actors.add(self->readGuard(req, watchValueQ));
if(self->shouldRead(req)) {
self->actors.add(watchValueWaitForVersion(self, req, stream));
}
}
}

View File

@ -39,23 +39,19 @@ struct WatchesSameKeyWorkload : TestWorkload {
std::string description() const override { return "WatchesSameKeyCorrectness"; }
Future<Void> setup(Database const& cx) override {
if ( clientId == 0 ) {
cases.push_back( case1(cx, LiteralStringRef("foo1"), this) );
cases.push_back( case2(cx, LiteralStringRef("foo2"), this) );
cases.push_back( case3(cx, LiteralStringRef("foo3"), this) );
cases.push_back( case4(cx, LiteralStringRef("foo4"), this) );
cases.push_back( case5(cx, LiteralStringRef("foo5"), this) );
}
cases.push_back( case1(cx, LiteralStringRef("foo1"), this) );
cases.push_back( case2(cx, LiteralStringRef("foo2"), this) );
cases.push_back( case3(cx, LiteralStringRef("foo3"), this) );
cases.push_back( case4(cx, LiteralStringRef("foo4"), this) );
cases.push_back( case5(cx, LiteralStringRef("foo5"), this) );
return Void();
}
Future<Void> start(Database const& cx) override {
if (clientId == 0) return waitForAll( cases );
return Void();
return waitForAll( cases );
}
Future<bool> check(Database const& cx) override {
if ( clientId != 0 ) return true;
bool ok = true;
for( int i = 0; i < cases.size(); i++ ) {
if ( cases[i].isError() ) ok = false;

View File

@ -1829,6 +1829,20 @@ Future<Void> timeReply(Future<T> replyToTime, PromiseStream<double> timeOutput){
return Void();
}
ACTOR template<class T>
Future<T> forward(Future<T> from, Promise<T> to) {
try {
T res = wait(from);
to.send(res);
return res;
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
to.sendError(e);
}
throw e;
}
}
// Monad
ACTOR template <class Fun, class T>