fix double destruction memory bug

This commit is contained in:
Xiaoxi Wang 2021-07-07 22:55:49 +00:00
parent 569de5259f
commit 15347773d9
2 changed files with 29 additions and 38 deletions

View File

@ -197,7 +197,7 @@ struct PeerHolder {
}
};
// Implements getRepyStream, this a void actor with the same lifetime as the input ReplyPromiseStream.
// Implements getReplyStream, this a void actor with the same lifetime as the input ReplyPromiseStream.
// Because this actor holds a reference to the stream, normally it would be impossible to know when there are no other
// references. To get around this, there is a SAV inside the stream that has one less promise reference than it should
// (caused by getErrorFutureAndDelPromiseRef()). When that SAV gets a broken promise because no one besides this void

View File

@ -238,8 +238,7 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
if (!self->interf || self->isExhausted()) {
if (self->hasMessage())
return Void();
wait(Future<Void>(Never()));
throw internal_error();
return Never();
}
if (!self->interfaceChanged.isValid()) {
@ -315,54 +314,46 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
}
ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) {
if (self->isExhausted()) {
if (!self->interf || self->isExhausted()) {
if (self->hasMessage())
return Void();
wait(Future<Void>(Never()));
throw internal_error();
return Never();
}
try {
tryEstablishPeekStream(self);
loop {
try {
choose {
when(wait(self->interf->onChange())) {
self->onlySpilled = false;
self->peekReplyStream.reset();
tryEstablishPeekStream(self);
}
when(TLogPeekStreamReply res =
wait(self->peekReplyStream.present()
? brokenPromiseToNever(waitAndForward(self->peekReplyStream.get().getFuture()))
: Never())) {
updateCursorWithReply(self, res.rep);
return Void();
}
}
} catch (Error& e) {
if (e.code() == error_code_connection_failed) {
tryEstablishPeekStream(self);
loop {
try {
choose {
when(wait(self->interf->onChange())) {
self->onlySpilled = false;
self->peekReplyStream.reset();
} else {
throw;
tryEstablishPeekStream(self);
}
when(TLogPeekStreamReply res = wait(self->peekReplyStream.present()
? brokenPromiseToNever(waitAndForward(self->peekReplyStream.get().getFuture()))
: Never())) {
updateCursorWithReply(self, res.rep);
return Void();
}
}
} catch (Error& e) {
if (e.code() == error_code_connection_failed) {
self->peekReplyStream.reset();
}
else if(e.code() == error_code_end_of_stream) {
self->end.reset(self->messageVersion.version);
return Void();
}
else {
throw;
}
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
self->end.reset(self->messageVersion.version);
self->peekReplyStream.reset();
return Void();
}
self->peekReplyStream.reset();
throw;
}
}
ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) {
if (!self->interf || self->isExhausted()) {
wait(Future<Void>(Never()));
throw internal_error();
return Never();
}
try {
loop {