diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 1f40bb47a7..5a56fc120c 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -277,9 +277,9 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated::operator new; using FastAllocated::operator delete; - uint64_t bytesSent; - uint64_t bytesAcknowledged; - uint64_t bytesLimit; + int64_t bytesSent; + int64_t bytesAcknowledged; + int64_t bytesLimit; Promise ready; Future failures; @@ -300,7 +300,7 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated hold = ready; hold.sendError(message.getError()); } else { - ASSERT(message.get().bytes > bytesAcknowledged); + ASSERT(message.get().bytes > bytesAcknowledged || (message.get().bytes < 0 && bytesAcknowledged > 0)); bytesAcknowledged = message.get().bytes; if (ready.isValid() && bytesSent - bytesAcknowledged < bytesLimit) { Promise hold = ready; @@ -336,6 +336,8 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, void destroy() override { delete this; } void receive(ArenaObjectReader& reader) override { this->addPromiseRef(); + TraceEvent(SevDebug, "NetNotifiedQueueWithAcknowledgementsReceive") + .detail("PromiseRef", this->getPromiseReferenceCount()); ErrorOr> message; reader.deserialize(message); @@ -358,25 +360,19 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, // send an ack immediately if (acknowledgements.getRawEndpoint().isValid()) { acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize(); - // int64_t overflow: we need to reset this stream - if (acknowledgements.bytesAcknowledged > std::numeric_limits::max()) { - FlowTransport::transport().sendUnreliable( - SerializeSource>(operation_obsolete()), - acknowledgements.getEndpoint(TaskPriority::ReadSocket), - false); - } else { - FlowTransport::transport().sendUnreliable( - SerializeSource>( - AcknowledgementReply(acknowledgements.bytesAcknowledged)), - acknowledgements.getEndpoint(TaskPriority::ReadSocket), - false); - } + FlowTransport::transport().sendUnreliable( + SerializeSource>( + AcknowledgementReply(acknowledgements.bytesAcknowledged)), + acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay), + false); } } this->send(std::move(message.get().asUnderlyingType())); } this->delPromiseRef(); + TraceEvent(SevDebug, "NetNotifiedQueueWithAcknowledgementsReceiveEnd") + .detail("PromiseRef", this->getPromiseReferenceCount()); } T pop() override { @@ -384,17 +380,10 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, // A reply that has been queued up is being consumed, so send an ack to the server if (acknowledgements.getRawEndpoint().isValid()) { acknowledgements.bytesAcknowledged += res.expectedSize(); - if (acknowledgements.bytesAcknowledged > std::numeric_limits::max()) { - FlowTransport::transport().sendUnreliable( - SerializeSource>(operation_obsolete()), - acknowledgements.getEndpoint(TaskPriority::ReadSocket), - false); - } else { - FlowTransport::transport().sendUnreliable(SerializeSource>( - AcknowledgementReply(acknowledgements.bytesAcknowledged)), - acknowledgements.getEndpoint(TaskPriority::ReadSocket), - false); - } + FlowTransport::transport().sendUnreliable(SerializeSource>( + AcknowledgementReply(acknowledgements.bytesAcknowledged)), + acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay), + false); } return res; } @@ -408,7 +397,8 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue, false); } if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) { - // The ReplyPromiseStream was cancelled before sending an error, so the storage server must have died + // Notify the client ReplyPromiseStream was cancelled before sending an error, so the storage server must + // have died FlowTransport::transport().sendUnreliable(SerializeSource>>(broken_promise()), getEndpoint(TaskPriority::NoDeliverDelay), false); @@ -431,6 +421,7 @@ public: void send(U&& value) const { if (queue->isRemoteEndpoint()) { if (!queue->acknowledgements.getRawEndpoint().isValid()) { + // register acknowledge receiver on sender and tell the receiver where to send acknowledge messages value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay).token; } queue->acknowledgements.bytesSent += value.expectedSize(); @@ -710,16 +701,17 @@ public: template ReplyPromiseStream getReplyStream(const X& value) const { - Future disc = makeDependent(IFailureMonitor::failureMonitor()).onDisconnectOrFailure(getEndpoint()); - auto& p = getReplyPromiseStream(value); - Reference peer; + auto p = getReplyPromiseStream(value); if (queue->isRemoteEndpoint()) { - peer = FlowTransport::transport().sendUnreliable(SerializeSource(value), getEndpoint(), true); + Future disc = + makeDependent(IFailureMonitor::failureMonitor()).onDisconnectOrFailure(getEndpoint()); + Reference peer = + FlowTransport::transport().sendUnreliable(SerializeSource(value), getEndpoint(), true); + // FIXME: defer sending the message until we know the connection is established + endStreamOnDisconnect(disc, p, getEndpoint(), peer); } else { send(value); } - // FIXME: defer sending the message until we know the connection is established - endStreamOnDisconnect(disc, p, getEndpoint(), peer); return p; } diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index eb3ea81326..f91a138b9e 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -559,7 +559,7 @@ ACTOR Future peekLogRouter(LogRouterData* self, // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover ACTOR Future logRouterPeekStream(LogRouterData* self, TLogPeekStreamRequest req) { self->activePeekStreams++; - TraceEvent(SevDebug, "TLogPeekStream", self->dbgid).detail("Token", req.reply.getEndpoint().token); + TraceEvent(SevDebug, "LogRouterPeekStream", self->dbgid).detail("Token", req.reply.getEndpoint().token); state Version begin = req.begin; state bool onlySpilled = false; @@ -576,16 +576,13 @@ ACTOR Future logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques wait(delay(0, g_network->getCurrentTask())); } catch (Error& e) { self->activePeekStreams--; - TraceEvent(SevDebug, "TLogPeekStreamEnd", self->dbgid).error(e, true); + TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid).error(e, true); if (e.code() == error_code_no_action_needed) { - return Void(); - } else if (e.code() == error_code_end_of_stream) { + req.reply.sendError(end_of_stream()); + } else if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); return Void(); - } else if (e.code() == error_code_operation_obsolete) { - // reply stream is cancelled on the client - return Void(); } else { throw; } diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index b4e76d3eb8..4f3d7a6d0b 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -38,7 +38,8 @@ ACTOR Future tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) { self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest( self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits::max())); TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID) - .detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress()); + .detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress()) + .detail("PeerToken", self->interf->get().interf().peekStreamMessages.getEndpoint().token); return Void(); } @@ -350,10 +351,10 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T fPeekReply) : Never())) { updateCursorWithReply(self, res); - TraceEvent("SPC_GetMoreB", self->randomID) - .detail("Has", self->hasMessage()) - .detail("End", res.end) - .detail("Popped", res.popped.present() ? res.popped.get() : 0); + // TraceEvent("SPC_GetMoreB", self->randomID) + // .detail("Has", self->hasMessage()) + // .detail("End", res.end) + // .detail("Popped", res.popped.present() ? res.popped.get() : 0); // NOTE: delay is needed here since TLog need to be scheduled to response if there are TLog and SS // on the same machine @@ -363,7 +364,7 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T } } catch (Error& e) { TraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).detail("Error", e.what()); - if (e.code() == error_code_connection_failed) { + if (e.code() == error_code_connection_failed || e.code() == error_code_operation_obsolete) { self->peekReplyStream.reset(); } else if (e.code() == error_code_end_of_stream) { self->end.reset(self->messageVersion.version); @@ -408,20 +409,20 @@ ACTOR Future serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri } Future ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { - TraceEvent("SPC_GetMore", randomID) - .detail("HasMessage", hasMessage()) - .detail("More", !more.isValid() || more.isReady()) - .detail("MessageVersion", messageVersion.toString()) - .detail("End", end.toString()); + // TraceEvent("SPC_GetMore", randomID) + // .detail("HasMessage", hasMessage()) + // .detail("More", !more.isValid() || more.isReady()) + // .detail("MessageVersion", messageVersion.toString()) + // .detail("End", end.toString()); if (hasMessage() && !parallelGetMore) return Void(); if (!more.isValid() || more.isReady()) { - // more = serverPeekStreamGetMore(this, taskID); - if (parallelGetMore || onlySpilled || futureResults.size()) { - more = serverPeekParallelGetMore(this, taskID); + more = serverPeekStreamGetMore(this, taskID); + /*if (parallelGetMore || onlySpilled || futureResults.size()) { + more = serverPeekParallelGetMore(this, taskID); } else { - more = serverPeekGetMore(this, taskID); - } + more = serverPeekGetMore(this, taskID); + }*/ } return more; } diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index f84a4a47c8..1ea777e550 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -1143,12 +1143,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); - if (e.code() == error_code_end_of_stream) { + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); return Void(); - } else if (e.code() == error_code_operation_obsolete) { - // reply stream is cancelled on the client - return Void(); } else { throw; } diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index b76a865bac..abc7c37517 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1466,12 +1466,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); - if (e.code() == error_code_end_of_stream) { + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); return Void(); - } else if (e.code() == error_code_operation_obsolete) { - // reply stream is cancelled on the client - return Void(); } else { throw; } diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index 7f6bea5f57..c536aab0d4 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -1897,12 +1897,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); - if (e.code() == error_code_end_of_stream) { + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); return Void(); - } else if (e.code() == error_code_operation_obsolete) { - // reply stream is cancelled on the client - return Void(); } else { throw; } diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 13ef1bff8c..d633352088 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -636,7 +636,9 @@ ACTOR Future waitForQuietDatabase(Database cx, wait(delay(5.0)); // The quiet database check (which runs at the end of every test) will always time out due to active data movement. // To get around this, quiet Database will disable the perpetual wiggle in the setup phase. + printf("------- 1 -------\n"); wait(setPerpetualStorageWiggle(cx, false, LockAware::True)); + printf("------- 2 -------\n"); // Require 3 consecutive successful quiet database checks spaced 2 second apart state int numSuccesses = 0; diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 5f656f13f1..bcab98b9fc 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -253,7 +253,7 @@ public: // Refer to FDBTypes.h::TLogVersion. Defaults to the maximum supported version. int maxTLogVersion = TLogVersion::MAX_SUPPORTED; // Set true to simplify simulation configs for easier debugging - bool simpleConfig = false; + bool simpleConfig = true; Optional generateFearless, buggify; Optional datacenters, desiredTLogCount, commitProxyCount, grvProxyCount, resolverCount, storageEngineType, stderrSeverity, machineCount, processesPerMachine, coordinators; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 814c6ba317..3e50dc9132 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1941,12 +1941,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); - if (e.code() == error_code_end_of_stream) { + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); return Void(); - } else if (e.code() == error_code_operation_obsolete) { - // reply stream is cancelled on the client - return Void(); } else { throw; }