fixing a couple bugs

This commit is contained in:
Josh Slocum 2021-09-10 12:52:33 -05:00
parent e2a51a4fe7
commit cefb66d64c
4 changed files with 21 additions and 11 deletions

View File

@ -253,19 +253,19 @@ public:
Future<Void> createSnapshot(StringRef uid, StringRef snapshot_command);
Future<Standalone<VectorRef<MutationsAndVersionRef>>> getChangeFeedMutations(
StringRef rangeID,
Key rangeID,
Version begin = 0,
Version end = std::numeric_limits<Version>::max(),
KeyRange range = allKeys);
Future<Void> getChangeFeedStream(const PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>& results,
StringRef rangeID,
Key rangeID,
Version begin = 0,
Version end = std::numeric_limits<Version>::max(),
KeyRange range = allKeys);
Future<std::vector<std::pair<Key, KeyRange>>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(StringRef rangeID, Version version);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
Future<Void> getBlobGranuleRangesStream(const PromiseStream<KeyRange>& results, KeyRange range);
// TODO add optional for end version so it can do a GRV in the transaction it already has to do

View File

@ -6583,7 +6583,7 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
}
ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> getChangeFeedMutationsActor(Reference<DatabaseContext> db,
StringRef rangeID,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
@ -6625,7 +6625,7 @@ ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> getChangeFeedMutatio
return Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena);
}
Future<Standalone<VectorRef<MutationsAndVersionRef>>> DatabaseContext::getChangeFeedMutations(StringRef rangeID,
Future<Standalone<VectorRef<MutationsAndVersionRef>>> DatabaseContext::getChangeFeedMutations(Key rangeID,
Version begin,
Version end,
KeyRange range) {
@ -6741,7 +6741,7 @@ ACTOR Future<Void> mergeChangeFeedStream(std::vector<std::pair<StorageServerInte
ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
StringRef rangeID,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
@ -6875,7 +6875,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
Future<Void> DatabaseContext::getChangeFeedStream(
const PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>>& results,
StringRef rangeID,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
@ -6956,7 +6956,7 @@ Future<std::vector<std::pair<Key, KeyRange>>> DatabaseContext::getOverlappingCha
return getOverlappingChangeFeedsActor(Reference<DatabaseContext>::addRef(this), range, minVersion);
}
ACTOR static Future<Void> popChangeFeedBackup(Database cx, StringRef rangeID, Version version) {
ACTOR static Future<Void> popChangeFeedBackup(Database cx, Key rangeID, Version version) {
state Transaction tr(cx);
loop {
try {
@ -6981,7 +6981,7 @@ ACTOR static Future<Void> popChangeFeedBackup(Database cx, StringRef rangeID, Ve
}
}
ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, StringRef rangeID, Version version) {
ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, Key rangeID, Version version) {
state Database cx(db);
state Transaction tr(cx);
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
@ -7029,7 +7029,7 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, St
return Void();
}
Future<Void> DatabaseContext::popChangeFeedMutations(StringRef rangeID, Version version) {
Future<Void> DatabaseContext::popChangeFeedMutations(Key rangeID, Version version) {
return popChangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, version);
}

View File

@ -985,7 +985,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
oldChangeFeedDataComplete,
changeFeedInfo.prevChangeFeedId));
// add new delta file
oldChangeFeedDataComplete.reset();
// add new pending delta file
ASSERT(metadata->pendingDeltaVersion < metadata->bufferedDeltaVersion);
metadata->pendingDeltaVersion = metadata->bufferedDeltaVersion;
metadata->bytesInNewDeltaFiles += metadata->bufferedDeltaBytes;
@ -1100,6 +1101,12 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// set this so next delta file write updates granule split metadata to done
ASSERT(changeFeedInfo.granuleSplitFrom.present());
oldChangeFeedDataComplete = changeFeedInfo.granuleSplitFrom;
if (BW_DEBUG) {
printf("Granule [%s - %s) switching to new change feed %s, exiting\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
changeFeedInfo.changeFeedId.toString().c_str());
}
}
}
} catch (Error& e) {
@ -1282,6 +1289,8 @@ ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(BlobWorkerData* bwD
} else {
// else we are first, no need to check for owner conflict
// FIXME: use actual 16 bytes of UID instead of converting it to 32 character string and then that to
// bytes
wait(tr.registerChangeFeed(StringRef(info.changeFeedId.toString()), req.keyRange));
info.doSnapshot = true;
info.previousDurableVersion = invalidVersion;

View File

@ -1644,6 +1644,7 @@ ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, Change
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end()) {
printf("Unknown change feed %s\n", req.rangeID.printable().c_str());
throw unknown_change_feed();
}
if (req.end <= feed->second->emptyVersion + 1) {