Implemented less efficient version of reply stream closing

This commit is contained in:
Josh Slocum 2022-02-03 18:56:01 -06:00
parent f16142e639
commit 2592c3f0ae
3 changed files with 59 additions and 14 deletions

View File

@ -7158,9 +7158,9 @@ Version ChangeFeedData::getVersion() {
// range that surrounds wait_version enough to figure out what's going on
// DEBUG_CF_ID is optional
#define DEBUG_CF_ID ""_sr
#define DEBUG_CF_START_VERSION invalidVersion
#define DEBUG_CF_END_VERSION invalidVersion
#define DEBUG_CF_WAIT_VERSION invalidVersion
#define DEBUG_CF_START_VERSION 322240646
#define DEBUG_CF_END_VERSION 393714633
#define DEBUG_CF_WAIT_VERSION 383714633
#define DEBUG_CF_VERSION(cfId, v) \
DEBUG_CF_START_VERSION <= v&& v <= DEBUG_CF_END_VERSION && (""_sr == DEBUG_CF_ID || cfId.printable() == DEBUG_CF_ID)
@ -7459,14 +7459,15 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
}
} catch (Error& e) {
// TODO REMOVE eventually, useful for debugging for now
if (DEBUG_CF_VERSION(feedData->id, nextVersion)) {
fmt::print(" single {0} {1} [{2} - {3}): CFError {4}\n",
idx,
interf.id().toString().substr(0, 4),
range.begin.printable(),
range.end.printable(),
e.name());
}
// if (DEBUG_CF_VERSION(feedData->id, nextVersion)) {
fmt::print(" single {0} {1} {2} [{3} - {4}): CFError {5}\n",
idx,
interf.id().toString().substr(0, 4),
debugID.toString().substr(0, 8).c_str(),
range.begin.printable(),
range.end.printable(),
e.name());
// }
if (e.code() == error_code_actor_cancelled) {
throw;
}
@ -7627,6 +7628,16 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
debugIDs.push_back(debugID);
req.debugID = debugID;
results->streams.push_back(it.first.changeFeedStream.getReplyStream(req));
if (debugID.toString().substr(0, 8) == "637bfa4e") {
printf(
"Good example: %s: %p\n", debugID.toString().substr(0, 8).c_str(), results->streams.back().debugAddr());
printf("\n");
}
if (debugID.toString().substr(0, 8) == "1ad27675") {
printf(
"Bad example: %s: %p\n", debugID.toString().substr(0, 8).c_str(), results->streams.back().debugAddr());
printf("\n");
}
}
for (auto& it : results->storageData) {

View File

@ -324,10 +324,12 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
AcknowledgementReceiver acknowledgements;
Endpoint requestStreamEndpoint;
bool sentError = false;
Promise<Void> onConnect;
NetNotifiedQueueWithAcknowledgements(int futures, int promises) : NotifiedQueue<T>(futures, promises) {}
NetNotifiedQueueWithAcknowledgements(int futures, int promises)
: NotifiedQueue<T>(futures, promises), onConnect(nullptr) {}
NetNotifiedQueueWithAcknowledgements(int futures, int promises, const Endpoint& remoteEndpoint)
: NotifiedQueue<T>(futures, promises), FlowReceiver(remoteEndpoint, true) {
: NotifiedQueue<T>(futures, promises), FlowReceiver(remoteEndpoint, true), onConnect(nullptr) {
// A ReplyPromiseStream will be terminated on the server side if the network connection with the client breaks
acknowledgements.failures = tagError<Void>(
makeDependent<T>(IFailureMonitor::failureMonitor()).onDisconnect(remoteEndpoint.getPrimaryAddress()),
@ -348,11 +350,17 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
// GetKeyValuesStream requests on the same endpoint will fail
IFailureMonitor::failureMonitor().endpointNotFound(requestStreamEndpoint);
}
if (onConnect.isValid() && onConnect.canBeSet()) {
onConnect.send(Void());
}
this->sendError(message.getError());
} else {
if (message.get().asUnderlyingType().acknowledgeToken.present()) {
acknowledgements = AcknowledgementReceiver(
FlowTransport::transport().loadedEndpoint(message.get().asUnderlyingType().acknowledgeToken.get()));
if (onConnect.isValid() && onConnect.canBeSet()) {
onConnect.send(Void());
}
}
if (acknowledgements.sequence != message.get().asUnderlyingType().sequence) {
TraceEvent(SevError, "StreamSequenceMismatch")
@ -439,6 +447,9 @@ public:
}
}
// TODO REMOVE
const void* debugAddr() const { return queue; }
template <class E>
void sendError(const E& exc) const {
if (queue->isRemoteEndpoint()) {
@ -485,6 +496,18 @@ public:
void setRequestStreamEndpoint(const Endpoint& endpoint) { queue->requestStreamEndpoint = endpoint; }
bool connected() { return queue->acknowledgements.getRawEndpoint().isValid() || queue->error.isValid(); }
Future<Void> onConnected() {
if (connected()) {
return Future<Void>(Void());
}
if (!queue->onConnect.isValid()) {
queue->onConnect = Promise<Void>();
}
return queue->onConnect.getFuture();
}
~ReplyPromiseStream() {
if (queue)
queue->delPromiseRef();
@ -744,6 +767,7 @@ public:
FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(), true);
// FIXME: defer sending the message until we know the connection is established
endStreamOnDisconnect(disc, p, getEndpoint(), peer);
holdUntilConnected(disc, p);
return p;
} else {
send(value);

View File

@ -197,6 +197,11 @@ struct PeerHolder {
}
};
ACTOR template <class X>
void holdUntilConnected(Future<Void> signal, ReplyPromiseStream<X> stream) {
wait(stream.onConnected() || signal);
}
// Implements getReplyStream, this a void actor with the same lifetime as the input ReplyPromiseStream.
// Because this actor holds a reference to the stream, normally it would be impossible to know when there are no other
// references. To get around this, there is a SAV inside the stream that has one less promise reference than it should
@ -212,7 +217,12 @@ void endStreamOnDisconnect(Future<Void> signal,
stream.setRequestStreamEndpoint(endpoint);
choose {
when(wait(signal)) { stream.sendError(connection_failed()); }
when(wait(stream.getErrorFutureAndDelPromiseRef())) {}
when(wait(stream.getErrorFutureAndDelPromiseRef())) {
// Wait for a response from the server
/*if (!stream.connected()) {
// TODO WANT TO DO holdAndConnected ACTOR HERE INSTEAD!
}*/
}
}
}