From 2592c3f0ae5fad744e46bb7b02f9da83339dfa83 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 3 Feb 2022 18:56:01 -0600 Subject: [PATCH] Implemented less efficient version of reply stream closing --- fdbclient/NativeAPI.actor.cpp | 33 ++++++++++++++++++++++----------- fdbrpc/fdbrpc.h | 28 ++++++++++++++++++++++++++-- fdbrpc/genericactors.actor.h | 12 +++++++++++- 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 3506137759..ae13509d9c 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7158,9 +7158,9 @@ Version ChangeFeedData::getVersion() { // range that surrounds wait_version enough to figure out what's going on // DEBUG_CF_ID is optional #define DEBUG_CF_ID ""_sr -#define DEBUG_CF_START_VERSION invalidVersion -#define DEBUG_CF_END_VERSION invalidVersion -#define DEBUG_CF_WAIT_VERSION invalidVersion +#define DEBUG_CF_START_VERSION 322240646 +#define DEBUG_CF_END_VERSION 393714633 +#define DEBUG_CF_WAIT_VERSION 383714633 #define DEBUG_CF_VERSION(cfId, v) \ DEBUG_CF_START_VERSION <= v&& v <= DEBUG_CF_END_VERSION && (""_sr == DEBUG_CF_ID || cfId.printable() == DEBUG_CF_ID) @@ -7459,14 +7459,15 @@ ACTOR Future partialChangeFeedStream(StorageServerInterface interf, } } catch (Error& e) { // TODO REMOVE eventually, useful for debugging for now - if (DEBUG_CF_VERSION(feedData->id, nextVersion)) { - fmt::print(" single {0} {1} [{2} - {3}): CFError {4}\n", - idx, - interf.id().toString().substr(0, 4), - range.begin.printable(), - range.end.printable(), - e.name()); - } + // if (DEBUG_CF_VERSION(feedData->id, nextVersion)) { + fmt::print(" single {0} {1} {2} [{3} - {4}): CFError {5}\n", + idx, + interf.id().toString().substr(0, 4), + debugID.toString().substr(0, 8).c_str(), + range.begin.printable(), + range.end.printable(), + e.name()); + // } if (e.code() == error_code_actor_cancelled) { throw; } @@ -7627,6 +7628,16 @@ ACTOR Future mergeChangeFeedStream(Reference db, debugIDs.push_back(debugID); req.debugID = debugID; results->streams.push_back(it.first.changeFeedStream.getReplyStream(req)); + if (debugID.toString().substr(0, 8) == "637bfa4e") { + printf( + "Good example: %s: %p\n", debugID.toString().substr(0, 8).c_str(), results->streams.back().debugAddr()); + printf("\n"); + } + if (debugID.toString().substr(0, 8) == "1ad27675") { + printf( + "Bad example: %s: %p\n", debugID.toString().substr(0, 8).c_str(), results->streams.back().debugAddr()); + printf("\n"); + } } for (auto& it : results->storageData) { diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 5c63ec5251..666454ed25 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -324,10 +324,12 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, AcknowledgementReceiver acknowledgements; Endpoint requestStreamEndpoint; bool sentError = false; + Promise onConnect; - NetNotifiedQueueWithAcknowledgements(int futures, int promises) : NotifiedQueue(futures, promises) {} + NetNotifiedQueueWithAcknowledgements(int futures, int promises) + : NotifiedQueue(futures, promises), onConnect(nullptr) {} NetNotifiedQueueWithAcknowledgements(int futures, int promises, const Endpoint& remoteEndpoint) - : NotifiedQueue(futures, promises), FlowReceiver(remoteEndpoint, true) { + : NotifiedQueue(futures, promises), FlowReceiver(remoteEndpoint, true), onConnect(nullptr) { // A ReplyPromiseStream will be terminated on the server side if the network connection with the client breaks acknowledgements.failures = tagError( makeDependent(IFailureMonitor::failureMonitor()).onDisconnect(remoteEndpoint.getPrimaryAddress()), @@ -348,11 +350,17 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, // GetKeyValuesStream requests on the same endpoint will fail IFailureMonitor::failureMonitor().endpointNotFound(requestStreamEndpoint); } + if (onConnect.isValid() && onConnect.canBeSet()) { + onConnect.send(Void()); + } this->sendError(message.getError()); } else { if (message.get().asUnderlyingType().acknowledgeToken.present()) { acknowledgements = AcknowledgementReceiver( FlowTransport::transport().loadedEndpoint(message.get().asUnderlyingType().acknowledgeToken.get())); + if (onConnect.isValid() && onConnect.canBeSet()) { + onConnect.send(Void()); + } } if (acknowledgements.sequence != message.get().asUnderlyingType().sequence) { TraceEvent(SevError, "StreamSequenceMismatch") @@ -439,6 +447,9 @@ public: } } + // TODO REMOVE + const void* debugAddr() const { return queue; } + template void sendError(const E& exc) const { if (queue->isRemoteEndpoint()) { @@ -485,6 +496,18 @@ public: void setRequestStreamEndpoint(const Endpoint& endpoint) { queue->requestStreamEndpoint = endpoint; } + bool connected() { return queue->acknowledgements.getRawEndpoint().isValid() || queue->error.isValid(); } + + Future onConnected() { + if (connected()) { + return Future(Void()); + } + if (!queue->onConnect.isValid()) { + queue->onConnect = Promise(); + } + return queue->onConnect.getFuture(); + } + ~ReplyPromiseStream() { if (queue) queue->delPromiseRef(); @@ -744,6 +767,7 @@ public: FlowTransport::transport().sendUnreliable(SerializeSource(value), getEndpoint(), true); // FIXME: defer sending the message until we know the connection is established endStreamOnDisconnect(disc, p, getEndpoint(), peer); + holdUntilConnected(disc, p); return p; } else { send(value); diff --git a/fdbrpc/genericactors.actor.h b/fdbrpc/genericactors.actor.h index 46a79d29cf..11c9c38395 100644 --- a/fdbrpc/genericactors.actor.h +++ b/fdbrpc/genericactors.actor.h @@ -197,6 +197,11 @@ struct PeerHolder { } }; +ACTOR template +void holdUntilConnected(Future signal, ReplyPromiseStream stream) { + wait(stream.onConnected() || signal); +} + // Implements getReplyStream, this a void actor with the same lifetime as the input ReplyPromiseStream. // Because this actor holds a reference to the stream, normally it would be impossible to know when there are no other // references. To get around this, there is a SAV inside the stream that has one less promise reference than it should @@ -212,7 +217,12 @@ void endStreamOnDisconnect(Future signal, stream.setRequestStreamEndpoint(endpoint); choose { when(wait(signal)) { stream.sendError(connection_failed()); } - when(wait(stream.getErrorFutureAndDelPromiseRef())) {} + when(wait(stream.getErrorFutureAndDelPromiseRef())) { + // Wait for a response from the server + /*if (!stream.connected()) { + // TODO WANT TO DO holdAndConnected ACTOR HERE INSTEAD! + }*/ + } } }