From 307d049c9d713aecab5ffd9fc05295562d9a1715 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Fri, 10 Dec 2021 16:12:06 -0600 Subject: [PATCH] Cleaning up some memory lifetime issues --- fdbclient/NativeAPI.actor.cpp | 6 +++--- fdbserver/BlobWorker.actor.cpp | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 23107e027c..9d6e1136b0 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7085,7 +7085,7 @@ Version ChangeFeedData::getVersion() { #define DEBUG_CF_WAIT_VERSION invalidVersion #define DEBUG_CF_VERSION(v) DEBUG_CF_START_VERSION <= v&& v <= DEBUG_CF_END_VERSION -ACTOR Future changeFeedWaitLatest(ChangeFeedData* self, Version version) { +ACTOR Future changeFeedWaitLatest(Reference self, Version version) { // first, wait on SS to have sent up through version int desired = 0; int waiting = 0; @@ -7162,7 +7162,7 @@ ACTOR Future changeFeedWaitLatest(ChangeFeedData* self, Version version) { return Void(); } -ACTOR Future changeFeedWhenAtLatest(ChangeFeedData* self, Version version) { +ACTOR Future changeFeedWhenAtLatest(Reference self, Version version) { if (DEBUG_CF_WAIT_VERSION == version) { fmt::print("CFW {0}) WhenAtLeast: LR={1}\n", version, self->lastReturnedVersion.get()); } @@ -7200,7 +7200,7 @@ ACTOR Future changeFeedWhenAtLatest(ChangeFeedData* self, Version version) } Future ChangeFeedData::whenAtLeast(Version version) { - return changeFeedWhenAtLatest(this, version); + return changeFeedWhenAtLatest(Reference::addRef(this), version); } ACTOR Future singleChangeFeedStream(StorageServerInterface interf, diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index aba8332dce..91de0695f6 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -1167,7 +1167,9 @@ ACTOR Future waitVersionCommitted(Reference bwData, // make sure the change feed has consumed mutations up through grvVersion to ensure none of them are rollbacks loop { - state Future atLeast = metadata->activeCFData.get()->whenAtLeast(grvVersion); + // if not valid, we're about to be cancelled anyway + state Future atLeast = + metadata->activeCFData.get().isValid() ? metadata->activeCFData.get()->whenAtLeast(grvVersion) : Never(); choose { when(wait(atLeast)) { break; } when(wait(metadata->activeCFData.onChange())) {} @@ -1885,6 +1887,8 @@ ACTOR Future waitForVersion(Reference metadata, Version v // if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete, // nothing to do + ASSERT(metadata->activeCFData.get().isValid()); + if (v == DEBUG_BW_WAIT_VERSION) { fmt::print("{0}) [{1} - {2}) waiting for {3}\n readable:{4}\n bufferedDelta={5}\n pendingDelta={6}\n " "durableDelta={7}\n pendingSnapshot={8}\n durableSnapshot={9}\n", @@ -1900,8 +1904,6 @@ ACTOR Future waitForVersion(Reference metadata, Version v metadata->durableSnapshotVersion.get()); } - ASSERT(metadata->activeCFData.get().isValid()); - if (v <= metadata->activeCFData.get()->getVersion() && (v <= metadata->durableDeltaVersion.get() || metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) &&