memory error(Simple config)

This commit is contained in:
Xiaoxi Wang 2021-07-22 15:45:59 -07:00
parent 68b08a3224
commit cd32478b52
9 changed files with 55 additions and 75 deletions

View File

@ -277,9 +277,9 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated<Acknowledgeme
using FastAllocated<AcknowledgementReceiver>::operator new;
using FastAllocated<AcknowledgementReceiver>::operator delete;
uint64_t bytesSent;
uint64_t bytesAcknowledged;
uint64_t bytesLimit;
int64_t bytesSent;
int64_t bytesAcknowledged;
int64_t bytesLimit;
Promise<Void> ready;
Future<Void> failures;
@ -300,7 +300,7 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated<Acknowledgeme
Promise<Void> hold = ready;
hold.sendError(message.getError());
} else {
ASSERT(message.get().bytes > bytesAcknowledged);
ASSERT(message.get().bytes > bytesAcknowledged || (message.get().bytes < 0 && bytesAcknowledged > 0));
bytesAcknowledged = message.get().bytes;
if (ready.isValid() && bytesSent - bytesAcknowledged < bytesLimit) {
Promise<Void> hold = ready;
@ -336,6 +336,8 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
void destroy() override { delete this; }
void receive(ArenaObjectReader& reader) override {
this->addPromiseRef();
TraceEvent(SevDebug, "NetNotifiedQueueWithAcknowledgementsReceive")
.detail("PromiseRef", this->getPromiseReferenceCount());
ErrorOr<EnsureTable<T>> message;
reader.deserialize(message);
@ -358,25 +360,19 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
// send an ack immediately
if (acknowledgements.getRawEndpoint().isValid()) {
acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize();
// int64_t overflow: we need to reset this stream
if (acknowledgements.bytesAcknowledged > std::numeric_limits<int64_t>::max()) {
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
} else {
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
}
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
false);
}
}
this->send(std::move(message.get().asUnderlyingType()));
}
this->delPromiseRef();
TraceEvent(SevDebug, "NetNotifiedQueueWithAcknowledgementsReceiveEnd")
.detail("PromiseRef", this->getPromiseReferenceCount());
}
T pop() override {
@ -384,17 +380,10 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
// A reply that has been queued up is being consumed, so send an ack to the server
if (acknowledgements.getRawEndpoint().isValid()) {
acknowledgements.bytesAcknowledged += res.expectedSize();
if (acknowledgements.bytesAcknowledged > std::numeric_limits<int64_t>::max()) {
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
} else {
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
}
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
false);
}
return res;
}
@ -408,7 +397,8 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
false);
}
if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) {
// The ReplyPromiseStream was cancelled before sending an error, so the storage server must have died
// Notify the client ReplyPromiseStream was cancelled before sending an error, so the storage server must
// have died
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(broken_promise()),
getEndpoint(TaskPriority::NoDeliverDelay),
false);
@ -431,6 +421,7 @@ public:
void send(U&& value) const {
if (queue->isRemoteEndpoint()) {
if (!queue->acknowledgements.getRawEndpoint().isValid()) {
// register acknowledge receiver on sender and tell the receiver where to send acknowledge messages
value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay).token;
}
queue->acknowledgements.bytesSent += value.expectedSize();
@ -710,16 +701,17 @@ public:
template <class X>
ReplyPromiseStream<REPLYSTREAM_TYPE(X)> getReplyStream(const X& value) const {
Future<Void> disc = makeDependent<T>(IFailureMonitor::failureMonitor()).onDisconnectOrFailure(getEndpoint());
auto& p = getReplyPromiseStream(value);
Reference<Peer> peer;
auto p = getReplyPromiseStream(value);
if (queue->isRemoteEndpoint()) {
peer = FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(), true);
Future<Void> disc =
makeDependent<T>(IFailureMonitor::failureMonitor()).onDisconnectOrFailure(getEndpoint());
Reference<Peer> peer =
FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(), true);
// FIXME: defer sending the message until we know the connection is established
endStreamOnDisconnect(disc, p, getEndpoint(), peer);
} else {
send(value);
}
// FIXME: defer sending the message until we know the connection is established
endStreamOnDisconnect(disc, p, getEndpoint(), peer);
return p;
}

View File

@ -559,7 +559,7 @@ ACTOR Future<TLogPeekReply> peekLogRouter(LogRouterData* self,
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamRequest req) {
self->activePeekStreams++;
TraceEvent(SevDebug, "TLogPeekStream", self->dbgid).detail("Token", req.reply.getEndpoint().token);
TraceEvent(SevDebug, "LogRouterPeekStream", self->dbgid).detail("Token", req.reply.getEndpoint().token);
state Version begin = req.begin;
state bool onlySpilled = false;
@ -576,16 +576,13 @@ ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques
wait(delay(0, g_network->getCurrentTask()));
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", self->dbgid).error(e, true);
TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid).error(e, true);
if (e.code() == error_code_no_action_needed) {
return Void();
} else if (e.code() == error_code_end_of_stream) {
req.reply.sendError(end_of_stream());
} else if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else if (e.code() == error_code_operation_obsolete) {
// reply stream is cancelled on the client
return Void();
} else {
throw;
}

View File

@ -38,7 +38,8 @@ ACTOR Future<Void> tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) {
self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest(
self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max()));
TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID)
.detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress());
.detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress())
.detail("PeerToken", self->interf->get().interf().peekStreamMessages.getEndpoint().token);
return Void();
}
@ -350,10 +351,10 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
fPeekReply)
: Never())) {
updateCursorWithReply(self, res);
TraceEvent("SPC_GetMoreB", self->randomID)
.detail("Has", self->hasMessage())
.detail("End", res.end)
.detail("Popped", res.popped.present() ? res.popped.get() : 0);
// TraceEvent("SPC_GetMoreB", self->randomID)
// .detail("Has", self->hasMessage())
// .detail("End", res.end)
// .detail("Popped", res.popped.present() ? res.popped.get() : 0);
// NOTE: delay is needed here since TLog need to be scheduled to response if there are TLog and SS
// on the same machine
@ -363,7 +364,7 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
}
} catch (Error& e) {
TraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).detail("Error", e.what());
if (e.code() == error_code_connection_failed) {
if (e.code() == error_code_connection_failed || e.code() == error_code_operation_obsolete) {
self->peekReplyStream.reset();
} else if (e.code() == error_code_end_of_stream) {
self->end.reset(self->messageVersion.version);
@ -408,20 +409,20 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri
}
Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
TraceEvent("SPC_GetMore", randomID)
.detail("HasMessage", hasMessage())
.detail("More", !more.isValid() || more.isReady())
.detail("MessageVersion", messageVersion.toString())
.detail("End", end.toString());
// TraceEvent("SPC_GetMore", randomID)
// .detail("HasMessage", hasMessage())
// .detail("More", !more.isValid() || more.isReady())
// .detail("MessageVersion", messageVersion.toString())
// .detail("End", end.toString());
if (hasMessage() && !parallelGetMore)
return Void();
if (!more.isValid() || more.isReady()) {
// more = serverPeekStreamGetMore(this, taskID);
if (parallelGetMore || onlySpilled || futureResults.size()) {
more = serverPeekParallelGetMore(this, taskID);
more = serverPeekStreamGetMore(this, taskID);
/*if (parallelGetMore || onlySpilled || futureResults.size()) {
more = serverPeekParallelGetMore(this, taskID);
} else {
more = serverPeekGetMore(this, taskID);
}
more = serverPeekGetMore(this, taskID);
}*/
}
return more;
}

View File

@ -1143,12 +1143,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream) {
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else if (e.code() == error_code_operation_obsolete) {
// reply stream is cancelled on the client
return Void();
} else {
throw;
}

View File

@ -1466,12 +1466,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream) {
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else if (e.code() == error_code_operation_obsolete) {
// reply stream is cancelled on the client
return Void();
} else {
throw;
}

View File

@ -1897,12 +1897,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream) {
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else if (e.code() == error_code_operation_obsolete) {
// reply stream is cancelled on the client
return Void();
} else {
throw;
}

View File

@ -636,7 +636,9 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
wait(delay(5.0));
// The quiet database check (which runs at the end of every test) will always time out due to active data movement.
// To get around this, quiet Database will disable the perpetual wiggle in the setup phase.
printf("------- 1 -------\n");
wait(setPerpetualStorageWiggle(cx, false, LockAware::True));
printf("------- 2 -------\n");
// Require 3 consecutive successful quiet database checks spaced 2 second apart
state int numSuccesses = 0;

View File

@ -253,7 +253,7 @@ public:
// Refer to FDBTypes.h::TLogVersion. Defaults to the maximum supported version.
int maxTLogVersion = TLogVersion::MAX_SUPPORTED;
// Set true to simplify simulation configs for easier debugging
bool simpleConfig = false;
bool simpleConfig = true;
Optional<bool> generateFearless, buggify;
Optional<int> datacenters, desiredTLogCount, commitProxyCount, grvProxyCount, resolverCount, storageEngineType,
stderrSeverity, machineCount, processesPerMachine, coordinators;

View File

@ -1941,12 +1941,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
if (e.code() == error_code_end_of_stream) {
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else if (e.code() == error_code_operation_obsolete) {
// reply stream is cancelled on the client
return Void();
} else {
throw;
}