Merge branch 'bw_new_empty_versions' into blob_integration

This commit is contained in:
Josh Slocum 2021-12-02 15:01:31 -06:00
commit 03f341fd3a
3 changed files with 82 additions and 61 deletions

View File

@ -6808,7 +6808,8 @@ Reference<ChangeFeedStorageData> DatabaseContext::getStorageData(StorageServerIn
}
Version ChangeFeedData::getVersion() {
if (notAtLatest.get() == 0 && mutations.isEmpty()) {
// TODO uncomment?
if (notAtLatest.get() == 0 && mutations.isEmpty() /*& storageData.size() > 0*/) {
Version v = storageData[0]->version.get();
for (int i = 1; i < storageData.size(); i++) {
if (storageData[i]->version.get() < v) {
@ -6834,7 +6835,7 @@ ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version)
}
}
choose {
when(wait(lastReturned)) { return Void(); }
when(wait(lastReturned)) { break; }
when(wait(waitForAll(allAtLeast))) {
std::vector<Future<Void>> onEmpty;
if (!self->mutations.isEmpty()) {
@ -6846,14 +6847,14 @@ ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version)
}
}
if (!onEmpty.size()) {
return Void();
break;
}
choose {
when(wait(waitForAll(onEmpty))) {
wait(delay(0));
return Void();
break;
}
when(wait(lastReturned)) { return Void(); }
when(wait(lastReturned)) { break; }
when(wait(self->refresh.getFuture())) {}
when(wait(self->notAtLatest.onChange())) {}
}
@ -6863,12 +6864,14 @@ ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version)
}
} else {
choose {
when(wait(lastReturned)) { return Void(); }
when(wait(lastReturned)) { break; }
when(wait(self->notAtLatest.onChange())) {}
when(wait(self->refresh.getFuture())) {}
}
}
}
ASSERT(self->getVersion() >= version);
return Void();
}
Future<Void> ChangeFeedData::whenAtLeast(Version version) {

View File

@ -98,15 +98,13 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
uint64_t bufferedDeltaBytes = 0;
// for client to know when it is safe to read a certain version and from where (check waitForVersion)
NotifiedVersion bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions)
Version bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions)
Version pendingDeltaVersion = 0; // largest version in progress writing to s3/fdb
NotifiedVersion durableDeltaVersion; // largest version persisted in s3/fdb
NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots
Version pendingSnapshotVersion = 0;
Version initialSnapshotVersion = invalidVersion;
AsyncVar<int> rollbackCount;
int64_t originalEpoch;
int64_t originalSeqno;
int64_t continueEpoch;
@ -118,6 +116,8 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
Promise<Void> resumeSnapshot;
AsyncVar<Reference<ChangeFeedData>> activeCFData;
AssignBlobRangeRequest originalReq;
void resume() {
@ -1011,6 +1011,14 @@ struct InFlightFile {
: future(future), version(version), bytes(bytes), snapshot(snapshot) {}
};
static Reference<ChangeFeedData> newChangeFeedData(Version startVersion) {
// FIXME: should changeFeedStream guarantee that this is always set to begin-1 instead?
Reference<ChangeFeedData> r = makeReference<ChangeFeedData>();
// TODO uncomment?
// r->lastReturnedVersion.set(startVersion);
return r;
}
static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
Version mutationVersion,
Version rollbackVersion,
@ -1074,9 +1082,7 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
metadata->deltaArena = Arena();
metadata->currentDeltas = GranuleDeltas();
metadata->bufferedDeltaBytes = 0;
// Create new notified version so it doesn't go backwards. Do this before signaling the rollback counter so that
// the callers pick this new one up for the next waitForVersion
metadata->bufferedDeltaVersion = NotifiedVersion(cfRollbackVersion);
metadata->bufferedDeltaVersion = cfRollbackVersion;
// Track that this rollback happened, since we have to re-read mutations up to the rollback
// Add this rollback to in progress, and put all completed ones back in progress
@ -1115,9 +1121,7 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
// delete all deltas in rollback range, but we can optimize here to just skip the uncommitted mutations
// directly and immediately pop the rollback out of inProgress
// Create new notified version so it doesn't go backwards. Do this before signaling the rollback counter so that
// the callers pick this new one up for the next waitForVersion
metadata->bufferedDeltaVersion = NotifiedVersion(rollbackVersion);
metadata->bufferedDeltaVersion = rollbackVersion;
cfRollbackVersion = mutationVersion;
}
@ -1128,8 +1132,6 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
cfRollbackVersion);
}
metadata->rollbackCount.set(metadata->rollbackCount.get() + 1);
return cfRollbackVersion;
}
@ -1139,8 +1141,6 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata,
Future<GranuleStartState> assignFuture) {
state Reference<ChangeFeedData> oldChangeFeedStream = makeReference<ChangeFeedData>();
state Reference<ChangeFeedData> changeFeedStream = makeReference<ChangeFeedData>();
state std::deque<InFlightFile> inFlightFiles;
state Future<Void> oldChangeFeedFuture;
state Future<Void> changeFeedFuture;
@ -1237,26 +1237,29 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->durableDeltaVersion.set(startVersion);
metadata->pendingDeltaVersion = startVersion;
metadata->bufferedDeltaVersion.set(startVersion);
metadata->bufferedDeltaVersion = startVersion;
committedVersion.set(startVersion);
ASSERT(metadata->readable.canBeSet());
metadata->readable.send(Void());
metadata->activeCFData.set(newChangeFeedData(startVersion));
if (startState.parentGranule.present() && startVersion < startState.changeFeedStartVersion) {
// read from parent change feed up until our new change feed is started
readOldChangeFeed = true;
oldChangeFeedFuture = bwData->db->getChangeFeedStream(oldChangeFeedStream,
oldChangeFeedFuture = bwData->db->getChangeFeedStream(metadata->activeCFData.get(),
oldCFKey.get(),
startVersion + 1,
startState.changeFeedStartVersion,
metadata->keyRange);
} else {
readOldChangeFeed = false;
changeFeedFuture = bwData->db->getChangeFeedStream(
changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange);
metadata->activeCFData.get(), cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange);
}
ASSERT(metadata->readable.canBeSet());
metadata->readable.send(Void());
loop {
// check outstanding snapshot/delta files for completion
while (inFlightFiles.size() > 0) {
@ -1298,16 +1301,12 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
state Standalone<VectorRef<MutationsAndVersionRef>> mutations;
try {
state Standalone<VectorRef<MutationsAndVersionRef>> _mutations =
waitNext(metadata->activeCFData.get()->mutations.getFuture());
mutations = _mutations;
if (readOldChangeFeed) {
// TODO efficient way to store next in mutations?
Standalone<VectorRef<MutationsAndVersionRef>> oldMutations =
waitNext(oldChangeFeedStream->mutations.getFuture());
mutations = oldMutations;
ASSERT(mutations.back().version < startState.changeFeedStartVersion);
} else {
Standalone<VectorRef<MutationsAndVersionRef>> newMutations =
waitNext(changeFeedStream->mutations.getFuture());
mutations = newMutations;
ASSERT(mutations.front().version >= startState.changeFeedStartVersion);
}
} catch (Error& e) {
@ -1326,11 +1325,15 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
startState.granuleID.toString(),
metadata->bufferedDeltaVersion.get());
metadata->bufferedDeltaVersion);
}
changeFeedFuture = bwData->db->getChangeFeedStream(
changeFeedStream, cfKey, startState.changeFeedStartVersion, MAX_VERSION, metadata->keyRange);
metadata->activeCFData.set(newChangeFeedData(startState.changeFeedStartVersion - 1));
changeFeedFuture = bwData->db->getChangeFeedStream(metadata->activeCFData.get(),
cfKey,
startState.changeFeedStartVersion,
MAX_VERSION,
metadata->keyRange);
}
// process mutations
@ -1339,7 +1342,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// buffer mutations at this version. There should not be multiple MutationsAndVersionRef with the same
// version
ASSERT(deltas.version > metadata->bufferedDeltaVersion.get());
ASSERT(deltas.version > metadata->bufferedDeltaVersion);
if (!deltas.mutations.empty()) {
if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) {
// Note rollbackVerision is durable, [rollbackVersion+1 - deltas.version] needs to be tossed
@ -1386,6 +1389,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
.detail("Version", deltas.version)
.detail("RollbackVersion", rollbackVersion);
}
Version cfRollbackVersion = doGranuleRollback(metadata,
deltas.version,
rollbackVersion,
@ -1394,27 +1398,29 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
rollbacksCompleted);
// Reset change feeds to cfRollbackVersion
metadata->activeCFData.set(newChangeFeedData(cfRollbackVersion));
if (readOldChangeFeed) {
// It shouldn't be possible to roll back across the parent/child feed boundary,
// because the transaction creating the child change feed had to commit before we
// got here.
ASSERT(cfRollbackVersion < startState.changeFeedStartVersion);
oldChangeFeedStream = makeReference<ChangeFeedData>();
oldChangeFeedFuture =
bwData->db->getChangeFeedStream(oldChangeFeedStream,
bwData->db->getChangeFeedStream(metadata->activeCFData.get(),
oldCFKey.get(),
cfRollbackVersion + 1,
startState.changeFeedStartVersion,
metadata->keyRange);
} else {
ASSERT(cfRollbackVersion > startState.changeFeedStartVersion);
changeFeedStream = makeReference<ChangeFeedData>();
changeFeedFuture = bwData->db->getChangeFeedStream(changeFeedStream,
changeFeedFuture = bwData->db->getChangeFeedStream(metadata->activeCFData.get(),
cfKey,
cfRollbackVersion + 1,
MAX_VERSION,
metadata->keyRange);
}
justDidRollback = true;
break;
}
@ -1443,7 +1449,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
// update buffered version and committed version
metadata->bufferedDeltaVersion.set(deltas.version);
metadata->bufferedDeltaVersion = deltas.version;
Version knownNoRollbacksPast = std::min(deltas.version, deltas.knownCommittedVersion);
if (knownNoRollbacksPast > committedVersion.get()) {
// This is the only place it is safe to set committedVersion, as it has to come from the mutation
@ -1622,6 +1628,9 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
if (BW_DEBUG) {
printf("BGUF got error %s\n", e.name());
}
// Free last change feed data
metadata->activeCFData.set(Reference<ChangeFeedData>());
if (e.code() == error_code_operation_cancelled) {
throw;
}
@ -1786,13 +1795,15 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
metadata->keyRange.end.printable().c_str(),
v,
metadata->readable.isSet() ? "T" : "F",
metadata->bufferedDeltaVersion.get(),
metadata->activeCFData.get()->getVersion(),
metadata->pendingDeltaVersion,
metadata->durableDeltaVersion.get(),
metadata->pendingSnapshotVersion,
metadata->durableSnapshotVersion.get());*/
if (v <= metadata->bufferedDeltaVersion.get() &&
ASSERT(metadata->activeCFData.get().isValid());
if (v <= metadata->activeCFData.get()->getVersion() &&
(v <= metadata->durableDeltaVersion.get() ||
metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) &&
(v <= metadata->durableSnapshotVersion.get() ||
@ -1801,31 +1812,36 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
}
// wait for change feed version to catch up to ensure we have all data
if (metadata->bufferedDeltaVersion.get() < v) {
wait(metadata->bufferedDeltaVersion.whenAtLeast(v));
if (metadata->activeCFData.get()->getVersion() < v) {
wait(metadata->activeCFData.get()->whenAtLeast(v));
ASSERT(metadata->activeCFData.get()->getVersion() >= v);
}
// wait for any pending delta and snapshot files as of the moment the change feed version caught up.
state Version pendingDeltaV = metadata->pendingDeltaVersion;
state Version pendingSnapshotV = metadata->pendingSnapshotVersion;
ASSERT(pendingDeltaV <= metadata->bufferedDeltaVersion.get());
if (pendingDeltaV > metadata->durableDeltaVersion.get()) {
// If there are mutations that are no longer buffered but have not been
// persisted to a delta file that are necessary for the query, wait for them
if (pendingDeltaV > metadata->durableDeltaVersion.get() && v > metadata->durableDeltaVersion.get()) {
wait(metadata->durableDeltaVersion.whenAtLeast(pendingDeltaV));
ASSERT(metadata->durableDeltaVersion.get() >= pendingDeltaV);
}
// This isn't strictly needed, but if we're in the process of re-snapshotting, we'd likely rather
// return that snapshot file than the previous snapshot file and all its delta files.
if (pendingSnapshotV > metadata->durableSnapshotVersion.get()) {
if (pendingSnapshotV > metadata->durableSnapshotVersion.get() && v > metadata->durableSnapshotVersion.get()) {
wait(metadata->durableSnapshotVersion.whenAtLeast(pendingSnapshotV));
ASSERT(metadata->durableSnapshotVersion.get() >= pendingSnapshotV);
}
// There is a race here - we wait for pending delta files before this to finish, but while we do, we
// kick off another delta file and roll the mutations. In that case, we must return the new delta
// file instead of in memory mutations, so we wait for that delta file to complete
if (metadata->pendingDeltaVersion != pendingDeltaV) {
wait(metadata->durableDeltaVersion.whenAtLeast(pendingDeltaV + 1));
if (metadata->pendingDeltaVersion > v) {
wait(metadata->durableDeltaVersion.whenAtLeast(v));
ASSERT(metadata->durableDeltaVersion.get() >= v);
}
return Void();
@ -1939,7 +1955,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
// lazily load files for old granule if not present
chunkRange = cur->range;
if (cur->files.isError() || !cur->files.isValid()) {
if (!cur->files.isValid() || cur->files.isError()) {
cur->files = loadHistoryFiles(bwData, cur->granuleID);
}
@ -1971,6 +1987,9 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
} else {
// this is an active granule query
loop {
if (!metadata->activeCFData.get().isValid()) {
throw wrong_shard_server();
}
Future<Void> waitForVersionFuture = waitForVersion(metadata, req.readVersion);
if (waitForVersionFuture.isReady()) {
// didn't wait, so no need to check rollback stuff
@ -1978,19 +1997,15 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
}
// rollback resets all of the version information, so we have to redo wait for
// version on rollback
state int rollbackCount = metadata->rollbackCount.get();
choose {
when(wait(waitForVersionFuture)) {}
when(wait(metadata->rollbackCount.onChange())) {}
when(wait(waitForVersionFuture)) { break; }
when(wait(metadata->activeCFData.onChange())) {}
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
}
if (rollbackCount == metadata->rollbackCount.get()) {
break;
} else if (BW_REQUEST_DEBUG) {
fmt::print("[{0} - {1}) @ {2} hit rollback, restarting waitForVersion\n",
req.keyRange.begin.printable(),
req.keyRange.end.printable(),
if (BW_REQUEST_DEBUG && metadata->activeCFData.get().isValid()) {
fmt::print("{0} - {1}) @ {2} hit CF change, restarting waitForVersion\n",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.readVersion);
}
}
@ -2091,6 +2106,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
req.reply.send(rep);
--bwData->stats.activeReadRequests;
} catch (Error& e) {
// printf("Error in BGFRequest %s\n", e.name());
if (e.code() == error_code_operation_cancelled) {
req.reply.sendError(wrong_shard_server());
throw;

View File

@ -4654,6 +4654,7 @@ void changeServerKeys(StorageServer* data,
}
}
}
data->keyChangeFeed.coalesce(f.second.contents());
auto feed = data->uidChangeFeed.find(f.first);
if (feed != data->uidChangeFeed.end()) {
feed->second->removing = true;
@ -4909,6 +4910,7 @@ private:
}
}
}
data->keyChangeFeed.coalesce(feed->second->range.contents());
data->uidChangeFeed.erase(feed);
} else {
// must be pop or stop