diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index d111ef076e..d71bc5dc3b 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -62,6 +62,7 @@ #include "fdbrpc/Smoother.h" #include "fdbrpc/Stats.h" #include "flow/TDMetric.actor.h" +#include "flow/genericactors.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -77,6 +78,7 @@ inline bool canReplyWith(Error e) { case error_code_future_version: case error_code_wrong_shard_server: case error_code_process_behind: + case error_code_watch_cancelled: //case error_code_all_alternatives_failed: return true; default: @@ -259,6 +261,21 @@ struct FetchInjectionInfo { vector changes; }; +class ServerWatchMetadata: public ReferenceCounted { +public: + Key key; + Optional value; + Version version; + Future watch_impl; + Promise versionPromise; + Optional tags; + Optional debugID; + + + ServerWatchMetadata(Key key, Optional value, Version version, Optional tags, Optional debugID) + : key(key), value(value), version(version), tags(tags), debugID(debugID) {} +}; + struct StorageServer { typedef VersionedMap VersionedData; @@ -286,6 +303,7 @@ private: VersionedData versionedData; std::map> mutationLog; // versions (durableVersion, version] + std::unordered_map> watchMap; // keep track of server watches public: public: @@ -304,6 +322,12 @@ public: Histogram::Unit::bytes_per_second)) {} } fetchKeysHistograms; + // watch map operations + Reference getWatchMetadata(KeyRef key) const; + KeyRef setWatchMetadata(Reference metadata); + void deleteWatchMetadata(KeyRef key); + void clearWatchMetadata(); + class CurrentRunningFetchKeys { std::unordered_map startTimeMap; std::unordered_map keyRangeMap; @@ -782,13 +806,21 @@ public: promise.sendError(err); } - template - Future readGuard(const Request& request, const HandleFunction& fun) { + template + bool shouldRead(const Request& request) { auto rate = currentRate(); if (rate < SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD && deterministicRandom()->random01() > std::max(SERVER_KNOBS->STORAGE_DURABILITY_LAG_MIN_RATE, rate/SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD)) { - //request.error = future_version(); sendErrorWithPenalty(request.reply, server_overloaded(), getPenalty()); ++counters.readsRejected; + return false; + } + return true; + } + + template + Future readGuard(const Request& request, const HandleFunction& fun) { + bool read = shouldRead(request); + if (!read) { return Void(); } return fun(this, request); @@ -815,6 +847,28 @@ void StorageServer::byteSampleApplyMutation( MutationRef const& m, Version ver ) ASSERT(false); // Mutation of unknown type modfying byte sample } +// watchMap Operations +Reference StorageServer::getWatchMetadata(KeyRef key) const { + const auto it = watchMap.find(key); + if (it == watchMap.end()) return Reference(); + return it->second; +} + +KeyRef StorageServer::setWatchMetadata(Reference metadata) { + KeyRef keyRef = metadata->key.contents(); + + watchMap[keyRef] = metadata; + return keyRef; +} + +void StorageServer::deleteWatchMetadata(KeyRef key) { + watchMap.erase(key); +} + +void StorageServer::clearWatchMetadata() { + watchMap.clear(); +} + #ifndef __INTEL_COMPILER #pragma endregion #endif @@ -1091,100 +1145,102 @@ ACTOR Future getValueQ( StorageServer* data, GetValueRequest req ) { // Pessimistic estimate the number of overhead bytes used by each // watch. Watch key references are stored in an AsyncMap, and actors // must be kept alive until the watch is finished. -static constexpr size_t WATCH_OVERHEAD_BYTES = 1000; +extern size_t WATCH_OVERHEAD_WATCHQ, WATCH_OVERHEAD_WATCHIMPL; -ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req, SpanID parent ) { - state Location spanLocation = "SS:WatchValueImpl"_loc; +ACTOR Future watchWaitForValueChange( StorageServer* data, SpanID parent, KeyRef key ) { + state Location spanLocation = "SS:watchWaitForValueChange"_loc; state Span span(spanLocation, { parent }); - try { - ++data->counters.watchQueries; + state Reference metadata = data->getWatchMetadata(key); - if( req.debugID.present() ) - g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask()); + if( metadata->debugID.present() ) + g_traceBatch.addEvent("WatchValueDebug", metadata->debugID.get().first(), "watchValueSendReply.Before"); //.detail("TaskID", g_network->getCurrentTask()); - wait(success(waitForVersionNoTooOld(data, req.version))); - if( req.debugID.present() ) - g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask()); + wait(success(waitForVersionNoTooOld(data, metadata->version))); + if( metadata->debugID.present() ) + g_traceBatch.addEvent("WatchValueDebug", metadata->debugID.get().first(), "watchValueSendReply.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask()); - state Version minVersion = data->data().latestVersion; - state Future watchFuture = data->watches.onChange(req.key); - loop { - try { - state Version latest = data->version.get(); - TEST(latest >= minVersion && latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version - GetValueRequest getReq( span.context, req.key, latest, req.tags, req.debugID ); - state Future getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here - GetValueReply reply = wait( getReq.reply.getFuture() ); - span = Span(spanLocation, parent); - //TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest); + state Version minVersion = data->data().latestVersion; + state Future watchFuture = data->watches.onChange(metadata->key); + loop { + try { + metadata = data->getWatchMetadata(key); + state Version latest = data->version.get(); + TEST(latest >= minVersion && latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version + GetValueRequest getReq( span.context, metadata->key, latest, metadata->tags, metadata->debugID ); + state Future getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here + GetValueReply reply = wait( getReq.reply.getFuture() ); + span = Span(spanLocation, parent); - if(reply.error.present()) { - ASSERT(reply.error.get().code() != error_code_future_version); - throw reply.error.get(); - } - if(BUGGIFY) { - throw transaction_too_old(); - } - - DEBUG_MUTATION("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("") ) ); + if(reply.error.present()) { + ASSERT(reply.error.get().code() != error_code_future_version); + throw reply.error.get(); + } + if(BUGGIFY) { + throw transaction_too_old(); + } + + DEBUG_MUTATION("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, metadata->key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("") ) ); - if( req.debugID.present() ) - g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask()); + if( metadata->debugID.present() ) + g_traceBatch.addEvent("WatchValueDebug", metadata->debugID.get().first(), "watchValueSendReply.AfterRead"); //.detail("TaskID", g_network->getCurrentTask()); - if( reply.value != req.value ) { - req.reply.send(WatchValueReply{ latest }); - return Void(); - } - - if( data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES ) { - TEST(true); //Too many watches, reverting to polling - data->sendErrorWithPenalty(req.reply, watch_cancelled(), data->getPenalty()); - return Void(); - } - - ++data->numWatches; - data->watchBytes += (req.key.expectedSize() + req.value.expectedSize() + WATCH_OVERHEAD_BYTES); - try { - if(latest < minVersion) { - // If the version we read is less than minVersion, then we may fail to be notified of any changes that occur up to or including minVersion - // To prevent that, we'll check the key again once the version reaches our minVersion - watchFuture = watchFuture || data->version.whenAtLeast(minVersion); - } - if(BUGGIFY) { - // Simulate a trigger on the watch that results in the loop going around without the value changing - watchFuture = watchFuture || delay(deterministicRandom()->random01()); - } - wait(watchFuture); - --data->numWatches; - data->watchBytes -= (req.key.expectedSize() + req.value.expectedSize() + WATCH_OVERHEAD_BYTES); - } catch( Error &e ) { - --data->numWatches; - data->watchBytes -= (req.key.expectedSize() + req.value.expectedSize() + WATCH_OVERHEAD_BYTES); - throw; - } - } catch( Error &e ) { - if( e.code() != error_code_transaction_too_old ) { - throw; - } - - TEST(true); // Reading a watched key failed with transaction_too_old + if( reply.value != metadata->value && latest >= metadata->version ) { + return latest; // fire watch } - watchFuture = data->watches.onChange(req.key); - wait(data->version.whenAtLeast(data->data().latestVersion)); + if( data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES ) { + TEST(true); //Too many watches, reverting to polling + throw watch_cancelled(); + } + + state int64_t watchBytes = (metadata->key.expectedSize() + metadata->value.expectedSize() + key.expectedSize() + + sizeof(Reference) + sizeof(ServerWatchMetadata) + WATCH_OVERHEAD_WATCHIMPL); + + data->watchBytes += watchBytes; + try { + if(latest < minVersion) { + // If the version we read is less than minVersion, then we may fail to be notified of any changes that occur up to or including minVersion + // To prevent that, we'll check the key again once the version reaches our minVersion + watchFuture = watchFuture || data->version.whenAtLeast(minVersion); + } + if(BUGGIFY) { + // Simulate a trigger on the watch that results in the loop going around without the value changing + watchFuture = watchFuture || delay(deterministicRandom()->random01()); + } + wait(watchFuture); + data->watchBytes -= watchBytes; + } catch( Error &e ) { + data->watchBytes -= watchBytes; + throw; + } + } catch( Error &e ) { + if( e.code() != error_code_transaction_too_old ) { + throw e; + } + + TEST(true); // Reading a watched key failed with transaction_too_old } - } catch (Error& e) { - if(!canReplyWith(e)) - throw; - data->sendErrorWithPenalty(req.reply, e, data->getPenalty()); + + watchFuture = data->watches.onChange(metadata->key); + wait(data->version.whenAtLeast(data->data().latestVersion)); } - return Void(); } -ACTOR Future watchValueQ( StorageServer* data, WatchValueRequest req ) { - state Span span("SS:watchValue"_loc, { req.spanContext }); - state Future watch = watchValue_impl( data, req, span.context ); +void checkCancelWatchImpl( StorageServer* data, WatchValueRequest req ) { + Reference metadata = data->getWatchMetadata(req.key.contents()); + if(metadata.isValid() && metadata->versionPromise.getFutureReferenceCount() == 1) { + // last watch timed out so cancel watch_impl and delete key from the map + data->deleteWatchMetadata(req.key.contents()); + metadata->watch_impl.cancel(); + } +} + +ACTOR Future watchValueSendReply(StorageServer* data, WatchValueRequest req, Future resp, SpanID spanContext) { + state Span span("SS:watchValue"_loc, { spanContext }); state double startTime = now(); + ++data->counters.watchQueries; + ++data->numWatches; + data->watchBytes += WATCH_OVERHEAD_WATCHQ; loop { double timeoutDelay = -1; @@ -1193,19 +1249,47 @@ ACTOR Future watchValueQ( StorageServer* data, WatchValueRequest req ) { } else if(!BUGGIFY) { timeoutDelay = std::max(CLIENT_KNOBS->WATCH_TIMEOUT - (now() - startTime), 0.0); } - choose { - when( wait( watch ) ) { - return Void(); + + try { + choose { + when( Version ver = wait( resp ) ) { + // fire watch + req.reply.send(WatchValueReply{ ver }); + checkCancelWatchImpl(data, req); + --data->numWatches; + data->watchBytes -= WATCH_OVERHEAD_WATCHQ; + return Void(); + } + when( wait( timeoutDelay < 0 ? Never() : delay(timeoutDelay) ) ) { + // watch timed out + data->sendErrorWithPenalty(req.reply, timed_out(), data->getPenalty()); + checkCancelWatchImpl(data, req); + --data->numWatches; + data->watchBytes -= WATCH_OVERHEAD_WATCHQ; + return Void(); + } + when( wait( data->noRecentUpdates.onChange()) ) {} } - when( wait( timeoutDelay < 0 ? Never() : delay(timeoutDelay) ) ) { - data->sendErrorWithPenalty(req.reply, timed_out(), data->getPenalty()); - return Void(); - } - when( wait( data->noRecentUpdates.onChange()) ) {} + } catch (Error& e) { + data->watchBytes -= WATCH_OVERHEAD_WATCHQ; + checkCancelWatchImpl(data, req); + --data->numWatches; + + if (!canReplyWith(e)) throw e; + data->sendErrorWithPenalty(req.reply, e, data->getPenalty()); + return Void(); } } } +#ifdef NO_INTELLISENSE +size_t WATCH_OVERHEAD_WATCHQ = sizeof(WatchValueSendReplyActorState) + sizeof(WatchValueSendReplyActor); +size_t WATCH_OVERHEAD_WATCHIMPL = sizeof(WatchWaitForValueChangeActorState) + sizeof(WatchWaitForValueChangeActor); +#else +size_t WATCH_OVERHEAD_WATCHQ = 0; // only used in IDE so value is irrelevant +size_t WATCH_OVERHEAD_WATCHIMPL = 0; +#endif + ACTOR Future getShardState_impl( StorageServer* data, GetShardStateRequest req ) { ASSERT( req.mode != GetShardStateRequest::NO_WAIT ); @@ -3891,12 +3975,99 @@ ACTOR Future serveGetKeyRequests( StorageServer* self, FutureStream watchValueWaitForVersion( StorageServer* self, WatchValueRequest req, PromiseStream stream ) { + state Span span("SS:watchValueWaitForVersion"_loc, { req.spanContext }); + try { + wait(success(waitForVersionNoTooOld(self, req.version))); + stream.send(req); + } catch (Error& e) { + if(!canReplyWith(e)) throw e; + self->sendErrorWithPenalty(req.reply, e, self->getPenalty()); + } + return Void(); +} + +ACTOR Future serveWatchValueRequestsImpl( StorageServer* self, FutureStream stream ) { + loop { + state WatchValueRequest req = waitNext(stream); + state Reference metadata = self->getWatchMetadata(req.key.contents()); + state Span span("SS:serveWatchValueRequestsImpl"_loc, { req.spanContext }); + + if (!metadata.isValid()) { // case 1: no watch set for the current key + metadata = makeReference(req.key, req.value, req.version, req.tags, req.debugID); + KeyRef key = self->setWatchMetadata(metadata); + metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise); + self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context)); + } + else if (metadata->value == req.value) { // case 2: there is a watch in the map and it has the same value so just update version + if (req.version > metadata->version) { + metadata->version = req.version; + metadata->tags = req.tags; + metadata->debugID = req.debugID; + } + self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context)); + } + else if (req.version > metadata->version) { // case 3: version in map has a lower version so trigger watch and create a new entry in map + self->deleteWatchMetadata(req.key.contents()); + metadata->versionPromise.send(req.version); + metadata->watch_impl.cancel(); + + metadata = makeReference(req.key, req.value, req.version, req.tags, req.debugID); + KeyRef key = self->setWatchMetadata(metadata); + metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise); + + self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context)); + } else if (req.version < metadata->version) { // case 4: version in the map is higher so immediately trigger watch + TEST(true); // watch version in map is higher so trigger watch (case 4) + req.reply.send(WatchValueReply{ metadata->version }); + } else { // case 5: watch value differs but their versions are the same (rare case) so check with the SS + TEST(true); // watch version in the map is the same but value is different (case 5) + loop { + try { + state Version latest = self->version.get(); + GetValueRequest getReq( span.context, metadata->key, latest, metadata->tags, metadata->debugID ); + state Future getValue = getValueQ( self, getReq ); + GetValueReply reply = wait( getReq.reply.getFuture() ); + metadata = self->getWatchMetadata(req.key.contents()); + + if (metadata.isValid() && reply.value != metadata->value) { // valSS != valMap + self->deleteWatchMetadata(req.key.contents()); + metadata->versionPromise.send(req.version); + metadata->watch_impl.cancel(); + } + + if (reply.value == req.value) { // valSS == valreq + metadata = makeReference(req.key, req.value, req.version, req.tags, req.debugID); + KeyRef key = self->setWatchMetadata(metadata); + metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise); + self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context)); + } else { + req.reply.send(WatchValueReply{ latest }); + } + break; + } catch (Error& e) { + if( e.code() != error_code_transaction_too_old ) { + if(!canReplyWith(e)) throw e; + self->sendErrorWithPenalty(req.reply, e, self->getPenalty()); + break; + } + TEST(true); // Reading a watched key failed with transaction_too_old case 5 + } + } + } + } +} + ACTOR Future serveWatchValueRequests( StorageServer* self, FutureStream watchValue ) { + state PromiseStream stream; + self->actors.add(serveWatchValueRequestsImpl(self, stream.getFuture())); + loop { WatchValueRequest req = waitNext(watchValue); // TODO: fast load balancing? - // SOMEDAY: combine watches for the same key/value into a single watch - self->actors.add(self->readGuard(req, watchValueQ)); + if(self->shouldRead(req)) { + self->actors.add(watchValueWaitForVersion(self, req, stream)); + } } } diff --git a/fdbserver/workloads/WatchesSameKeyCorrectness.actor.cpp b/fdbserver/workloads/WatchesSameKeyCorrectness.actor.cpp index 3e9f8f66bf..abd1a89e7e 100644 --- a/fdbserver/workloads/WatchesSameKeyCorrectness.actor.cpp +++ b/fdbserver/workloads/WatchesSameKeyCorrectness.actor.cpp @@ -39,23 +39,19 @@ struct WatchesSameKeyWorkload : TestWorkload { std::string description() const override { return "WatchesSameKeyCorrectness"; } Future setup(Database const& cx) override { - if ( clientId == 0 ) { - cases.push_back( case1(cx, LiteralStringRef("foo1"), this) ); - cases.push_back( case2(cx, LiteralStringRef("foo2"), this) ); - cases.push_back( case3(cx, LiteralStringRef("foo3"), this) ); - cases.push_back( case4(cx, LiteralStringRef("foo4"), this) ); - cases.push_back( case5(cx, LiteralStringRef("foo5"), this) ); - } + cases.push_back( case1(cx, LiteralStringRef("foo1"), this) ); + cases.push_back( case2(cx, LiteralStringRef("foo2"), this) ); + cases.push_back( case3(cx, LiteralStringRef("foo3"), this) ); + cases.push_back( case4(cx, LiteralStringRef("foo4"), this) ); + cases.push_back( case5(cx, LiteralStringRef("foo5"), this) ); return Void(); } Future start(Database const& cx) override { - if (clientId == 0) return waitForAll( cases ); - return Void(); + return waitForAll( cases ); } Future check(Database const& cx) override { - if ( clientId != 0 ) return true; bool ok = true; for( int i = 0; i < cases.size(); i++ ) { if ( cases[i].isError() ) ok = false; diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index a3e16bf13b..ef3417618f 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -1829,6 +1829,20 @@ Future timeReply(Future replyToTime, PromiseStream timeOutput){ return Void(); } +ACTOR template +Future forward(Future from, Promise to) { + try { + T res = wait(from); + to.send(res); + return res; + } catch (Error& e) { + if (e.code() != error_code_actor_cancelled) { + to.sendError(e); + } + throw e; + } +} + // Monad ACTOR template