Cleaning up some memory lifetime issues
This commit is contained in:
parent
ff2cd691cd
commit
307d049c9d
|
@ -7085,7 +7085,7 @@ Version ChangeFeedData::getVersion() {
|
|||
#define DEBUG_CF_WAIT_VERSION invalidVersion
|
||||
#define DEBUG_CF_VERSION(v) DEBUG_CF_START_VERSION <= v&& v <= DEBUG_CF_END_VERSION
|
||||
|
||||
ACTOR Future<Void> changeFeedWaitLatest(ChangeFeedData* self, Version version) {
|
||||
ACTOR Future<Void> changeFeedWaitLatest(Reference<ChangeFeedData> self, Version version) {
|
||||
// first, wait on SS to have sent up through version
|
||||
int desired = 0;
|
||||
int waiting = 0;
|
||||
|
@ -7162,7 +7162,7 @@ ACTOR Future<Void> changeFeedWaitLatest(ChangeFeedData* self, Version version) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version) {
|
||||
ACTOR Future<Void> changeFeedWhenAtLatest(Reference<ChangeFeedData> self, Version version) {
|
||||
if (DEBUG_CF_WAIT_VERSION == version) {
|
||||
fmt::print("CFW {0}) WhenAtLeast: LR={1}\n", version, self->lastReturnedVersion.get());
|
||||
}
|
||||
|
@ -7200,7 +7200,7 @@ ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version)
|
|||
}
|
||||
|
||||
Future<Void> ChangeFeedData::whenAtLeast(Version version) {
|
||||
return changeFeedWhenAtLatest(this, version);
|
||||
return changeFeedWhenAtLatest(Reference<ChangeFeedData>::addRef(this), version);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
|
||||
|
|
|
@ -1167,7 +1167,9 @@ ACTOR Future<Void> waitVersionCommitted(Reference<BlobWorkerData> bwData,
|
|||
// make sure the change feed has consumed mutations up through grvVersion to ensure none of them are rollbacks
|
||||
|
||||
loop {
|
||||
state Future<Void> atLeast = metadata->activeCFData.get()->whenAtLeast(grvVersion);
|
||||
// if not valid, we're about to be cancelled anyway
|
||||
state Future<Void> atLeast =
|
||||
metadata->activeCFData.get().isValid() ? metadata->activeCFData.get()->whenAtLeast(grvVersion) : Never();
|
||||
choose {
|
||||
when(wait(atLeast)) { break; }
|
||||
when(wait(metadata->activeCFData.onChange())) {}
|
||||
|
@ -1885,6 +1887,8 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
|
|||
// if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete,
|
||||
// nothing to do
|
||||
|
||||
ASSERT(metadata->activeCFData.get().isValid());
|
||||
|
||||
if (v == DEBUG_BW_WAIT_VERSION) {
|
||||
fmt::print("{0}) [{1} - {2}) waiting for {3}\n readable:{4}\n bufferedDelta={5}\n pendingDelta={6}\n "
|
||||
"durableDelta={7}\n pendingSnapshot={8}\n durableSnapshot={9}\n",
|
||||
|
@ -1900,8 +1904,6 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
|
|||
metadata->durableSnapshotVersion.get());
|
||||
}
|
||||
|
||||
ASSERT(metadata->activeCFData.get().isValid());
|
||||
|
||||
if (v <= metadata->activeCFData.get()->getVersion() &&
|
||||
(v <= metadata->durableDeltaVersion.get() ||
|
||||
metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) &&
|
||||
|
|
Loading…
Reference in New Issue