even more bug fixes

This commit is contained in:
Josh Slocum 2021-09-25 10:30:27 -05:00
parent 3b23d6aba5
commit 8fb7c45e65
2 changed files with 52 additions and 25 deletions

View File

@ -771,9 +771,17 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
// TODO this is only for chaos testing right now!! REMOVE LATER // TODO this is only for chaos testing right now!! REMOVE LATER
ACTOR Future<Void> rangeMover(BlobManagerData* bmData) { ACTOR Future<Void> rangeMover(BlobManagerData* bmData) {
ASSERT(g_network->isSimulated());
loop { loop {
wait(delay(30.0)); wait(delay(30.0));
if (g_simulator.speedUpSimulation) {
if (BM_DEBUG) {
printf("Range mover stopping\n");
}
return Void();
}
if (bmData->workersById.size() > 1) { if (bmData->workersById.size() > 1) {
int tries = 10; int tries = 10;
while (tries > 0) { while (tries > 0) {

View File

@ -1088,12 +1088,15 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// FIXME: not true for reassigns // FIXME: not true for reassigns
if (!changeFeedInfo.doSnapshot) { if (!changeFeedInfo.doSnapshot) {
startVersion = changeFeedInfo.previousDurableVersion; startVersion = changeFeedInfo.previousDurableVersion;
ASSERT(!metadata->files.snapshotFiles.empty());
metadata->pendingSnapshotVersion = metadata->files.snapshotFiles.back().version;
metadata->durableSnapshotVersion.set(metadata->pendingSnapshotVersion);
} else { } else {
if (changeFeedInfo.blobFilesToSnapshot.present()) { if (changeFeedInfo.blobFilesToSnapshot.present()) {
inFlightBlobSnapshot = compactFromBlob(bwData, metadata, changeFeedInfo.blobFilesToSnapshot.get()); inFlightBlobSnapshot = compactFromBlob(bwData, metadata, changeFeedInfo.blobFilesToSnapshot.get());
startVersion = changeFeedInfo.previousDurableVersion; startVersion = changeFeedInfo.previousDurableVersion;
metadata->durableSnapshotVersion = metadata->durableSnapshotVersion.set(
changeFeedInfo.blobFilesToSnapshot.get().snapshotFiles.back().version; changeFeedInfo.blobFilesToSnapshot.get().snapshotFiles.back().version);
} else { } else {
ASSERT(changeFeedInfo.previousDurableVersion == invalidVersion); ASSERT(changeFeedInfo.previousDurableVersion == invalidVersion);
BlobFileIndex fromFDB = wait(dumpInitialSnapshotFromFDB(bwData, metadata)); BlobFileIndex fromFDB = wait(dumpInitialSnapshotFromFDB(bwData, metadata));
@ -1101,11 +1104,11 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
ASSERT(changeFeedInfo.changeFeedStartVersion <= fromFDB.version); ASSERT(changeFeedInfo.changeFeedStartVersion <= fromFDB.version);
startVersion = newSnapshotFile.version; startVersion = newSnapshotFile.version;
metadata->files.snapshotFiles.push_back(newSnapshotFile); metadata->files.snapshotFiles.push_back(newSnapshotFile);
metadata->durableSnapshotVersion = startVersion; metadata->durableSnapshotVersion.set(startVersion);
} }
metadata->pendingSnapshotVersion = startVersion;
} }
metadata->pendingSnapshotVersion = startVersion;
metadata->durableDeltaVersion.set(startVersion); metadata->durableDeltaVersion.set(startVersion);
metadata->pendingDeltaVersion = startVersion; metadata->pendingDeltaVersion = startVersion;
metadata->bufferedDeltaVersion.set(startVersion); metadata->bufferedDeltaVersion.set(startVersion);
@ -1585,6 +1588,19 @@ ACTOR Future<Void> waitForVersionActor(Reference<GranuleMetadata> metadata, Vers
static Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v) { static Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v) {
// if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete, // if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete,
// nothing to do // nothing to do
/*printf(" [%s - %s) waiting for %lld\n readable:%s\n bufferedDelta=%lld\n pendingDelta=%lld\n "
"durableDelta=%lld\n pendingSnapshot=%lld\n durableSnapshot=%lld\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
v,
metadata->readable.isSet() ? "T" : "F",
metadata->bufferedDeltaVersion.get(),
metadata->pendingDeltaVersion,
metadata->durableDeltaVersion.get(),
metadata->pendingSnapshotVersion,
metadata->durableSnapshotVersion.get());*/
if (metadata->readable.isSet() && v <= metadata->bufferedDeltaVersion.get() && if (metadata->readable.isSet() && v <= metadata->bufferedDeltaVersion.get() &&
(v <= metadata->durableDeltaVersion.get() || (v <= metadata->durableDeltaVersion.get() ||
metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) && metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) &&
@ -1814,13 +1830,13 @@ ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(BlobWorkerData* bwD
info.previousDurableVersion = info.existingFiles.get().deltaFiles.back().version; info.previousDurableVersion = info.existingFiles.get().deltaFiles.back().version;
} }
// for the non-splitting cases, this doesn't need to be 100% accurate, it just needs to be smaller than // for the non-splitting cases, this doesn't need to be 100% accurate, it just needs to be smaller
// the next delta file write. // than the next delta file write.
info.changeFeedStartVersion = info.previousDurableVersion; info.changeFeedStartVersion = info.previousDurableVersion;
} else { } else {
// else we are first, no need to check for owner conflict // else we are first, no need to check for owner conflict
// FIXME: use actual 16 bytes of UID instead of converting it to 32 character string and then that to // FIXME: use actual 16 bytes of UID instead of converting it to 32 character string and then that
// bytes // to bytes
info.changeFeedId = deterministicRandom()->randomUniqueID(); info.changeFeedId = deterministicRandom()->randomUniqueID();
wait(tr.registerChangeFeed(StringRef(info.changeFeedId.toString()), req.keyRange)); wait(tr.registerChangeFeed(StringRef(info.changeFeedId.toString()), req.keyRange));
info.doSnapshot = true; info.doSnapshot = true;
@ -1835,8 +1851,8 @@ ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(BlobWorkerData* bwD
Optional<Value> parentGranulesValue = Optional<Value> parentGranulesValue =
wait(tr.get(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin))); wait(tr.get(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin)));
// If anything in previousGranules, need to do the handoff logic and set ret.previousChangeFeedId, and the // If anything in previousGranules, need to do the handoff logic and set ret.previousChangeFeedId, and
// previous durable version will come from the previous granules // the previous durable version will come from the previous granules
if (parentGranulesValue.present()) { if (parentGranulesValue.present()) {
state Standalone<VectorRef<KeyRangeRef>> parentGranules = state Standalone<VectorRef<KeyRangeRef>> parentGranules =
decodeBlobGranuleHistoryValue(parentGranulesValue.get()); decodeBlobGranuleHistoryValue(parentGranulesValue.get());
@ -1883,7 +1899,8 @@ ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(BlobWorkerData* bwD
req.keyRange, req.keyRange,
info.prevChangeFeedId.get(), info.prevChangeFeedId.get(),
BlobGranuleSplitState::Assigned)); BlobGranuleSplitState::Assigned));
// change feed was created as part of this transaction, changeFeedStartVersion will be set later // change feed was created as part of this transaction, changeFeedStartVersion will be set
// later
} else { } else {
ASSERT(false); ASSERT(false);
} }
@ -1951,16 +1968,16 @@ static bool newerRangeAssignment(GranuleRangeMetadata oldMetadata, int64_t epoch
// TODO unit test this assignment, particularly out-of-order insertions! // TODO unit test this assignment, particularly out-of-order insertions!
// The contract from the blob manager is: // The contract from the blob manager is:
// If a key range [A, B) was assigned to the worker at seqno S1, no part of the keyspace that intersects [A, B] may be // If a key range [A, B) was assigned to the worker at seqno S1, no part of the keyspace that intersects [A, B] may
// re-assigned to the worker until the range has been revoked from this worker. This revoking can either happen by the // be re-assigned to the worker until the range has been revoked from this worker. This revoking can either happen
// blob manager willingly relinquishing the range, or by the blob manager reassigning it somewhere else. This means that // by the blob manager willingly relinquishing the range, or by the blob manager reassigning it somewhere else. This
// if the worker gets an assignment for any range that intersects [A, B) at S3, there must have been a revoke message // means that if the worker gets an assignment for any range that intersects [A, B) at S3, there must have been a
// for [A, B) with seqno S3 where S1 < S2 < S3, that was delivered out of order. This means that if there are any // revoke message for [A, B) with seqno S3 where S1 < S2 < S3, that was delivered out of order. This means that if
// intersecting but not fully overlapping ranges with a new range assignment, they had already been revoked. So the // there are any intersecting but not fully overlapping ranges with a new range assignment, they had already been
// worker will mark them as revoked, but leave the sequence number as S1, so that when the actual revoke message comes // revoked. So the worker will mark them as revoked, but leave the sequence number as S1, so that when the actual
// in, it is a no-op, but updates the sequence number. Similarly, if a worker gets an assign message for any range that // revoke message comes in, it is a no-op, but updates the sequence number. Similarly, if a worker gets an assign
// already has a higher sequence number, that range was either revoked, or revoked and then re-assigned. Either way, // message for any range that already has a higher sequence number, that range was either revoked, or revoked and
// this assignment is no longer valid. // then re-assigned. Either way, this assignment is no longer valid.
// Returns future to wait on to ensure prior work of other granules is done before responding to the manager with a // Returns future to wait on to ensure prior work of other granules is done before responding to the manager with a
// successful assignment And if the change produced a new granule that needs to start doing work, returns the new // successful assignment And if the change produced a new granule that needs to start doing work, returns the new
@ -1983,9 +2000,10 @@ static std::pair<Future<Void>, Reference<GranuleMetadata>> changeBlobRange(BlobW
} }
// For each range that intersects this update: // For each range that intersects this update:
// If the identical range already exists at the same assignment sequence number and it is not a self-reassign, this // If the identical range already exists at the same assignment sequence number and it is not a self-reassign,
// is a noop. Otherwise, this will consist of a series of ranges that are either older, or newer. For each older // this is a noop. Otherwise, this will consist of a series of ranges that are either older, or newer. For each
// range, cancel it if it is active. Insert the current range. Re-insert all newer ranges over the current range. // older range, cancel it if it is active. Insert the current range. Re-insert all newer ranges over the current
// range.
std::vector<Future<Void>> futures; std::vector<Future<Void>> futures;
@ -2005,7 +2023,8 @@ static std::pair<Future<Void>, Reference<GranuleMetadata>> changeBlobRange(BlobW
if (r.value().activeMetadata.isValid()) { if (r.value().activeMetadata.isValid()) {
futures.push_back(success(r.value().activeMetadata->assignFuture)); futures.push_back(success(r.value().activeMetadata->assignFuture));
} }
return std::pair(waitForAll(futures), Reference<GranuleMetadata>()); // already applied, nothing to do return std::pair(waitForAll(futures),
Reference<GranuleMetadata>()); // already applied, nothing to do
} }
} }