Clean up blob worker changes.

This commit is contained in:
Suraj Gupta 2021-10-12 19:54:22 -04:00
parent ef67feed67
commit d002df3b21
1 changed files with 161 additions and 291 deletions

View File

@ -40,7 +40,7 @@
#include "flow/flow.h"
#define BW_DEBUG true
#define BW_REQUEST_DEBUG true
#define BW_REQUEST_DEBUG false
// TODO add comments + documentation
struct BlobFileIndex {
@ -109,13 +109,10 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
ASSERT(resumeSnapshot.canBeSet());
resumeSnapshot.send(Void());
}
~GranuleMetadata() { printf("in dtor for GranuleMetadata\n"); }
};
// TODO: rename this struct
struct GranuleRangeMetadata {
int id = 0;
int64_t lastEpoch;
int64_t lastSeqno;
Reference<GranuleMetadata> activeMetadata;
@ -125,42 +122,16 @@ struct GranuleRangeMetadata {
GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {}
GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference<GranuleMetadata> activeMetadata)
: lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {
/*
if (activeMetadata.isValid()) {
activeMetadata->cancelled.reset();
}
*/
}
~GranuleRangeMetadata() {
printf("GranuleRangeMetadata is being destroyed\n");
/*
if (activeMetadata.isValid() && activeMetadata->cancelled.canBeSet()) {
printf("Cancelling activeMetadata\n");
activeMetadata->cancelled.send(Void());
}
assignFuture.cancel();
fileUpdaterFuture.cancel();
*/
/*
if (id == 42) {
printf("GranuleRangeMetadata with id %d is being destroyed\n", id);
sleep(10);
}
*/
}
: lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {}
};
struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
UID id;
Database db;
AsyncVar<bool> dead;
BlobWorkerStats stats;
PromiseStream<Future<Void>> addActor;
ActorCollection actors{ false };
LocalityData locality;
int64_t currentManagerEpoch = -1;
@ -178,8 +149,7 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
PromiseStream<AssignBlobRangeRequest> granuleUpdateErrors;
BlobWorkerData(UID id, Database db)
: id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL), actors(false) {}
BlobWorkerData(UID id, Database db) : id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL) {}
~BlobWorkerData() { printf("Destroying blob worker data for %s\n", id.toString().c_str()); }
bool managerEpochOk(int64_t epoch) {
@ -229,7 +199,6 @@ static void checkGranuleLock(int64_t epoch, int64_t seqno, int64_t ownerEpoch, i
ASSERT(epoch < ownerEpoch || (epoch == ownerEpoch && seqno <= ownerSeqno));
// returns true if we still own the lock, false if someone else does
printf("epoch: %lld, seqno: %lld, ownerEpoch: %lld, ownerSeqno: %lld\n", epoch, seqno, ownerEpoch, ownerSeqno);
if (epoch != ownerEpoch || seqno != ownerSeqno) {
if (BW_DEBUG) {
printf("Lock assignment check failed. Expected (%lld, %lld), got (%lld, %lld)\n",
@ -578,7 +547,6 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
return BlobFileIndex(currentDeltaVersion, fname, 0, serialized.size());
} catch (Error& e) {
numIterations++;
printf("writeDeltaFile error: %s\n", e.name());
wait(tr->onError(e));
}
}
@ -586,7 +554,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
if (e.code() == error_code_operation_cancelled) {
throw e;
}
// TODO: do this for writeSnapshot
if (numIterations != 1 || e.code() != error_code_granule_assignment_conflict) {
throw e;
}
@ -912,15 +880,12 @@ ACTOR Future<Void> handleCompletedDeltaFile(Reference<BlobWorkerData> bwData,
// have completed
// FIXME: also have these be async, have each pop change feed wait on the prior one, wait on them before
// re-snapshotting
printf("in handleCompletedDeltaFile for BW %s\n", bwData->id.toString().c_str());
Future<Void> popFuture = bwData->db->popChangeFeedMutations(cfKey, completedDeltaFile.version);
wait(popFuture);
printf("popChangeFeedMutations returned\n");
}
while (!rollbacksInProgress.empty() && completedDeltaFile.version >= rollbacksInProgress.front().first) {
rollbacksInProgress.pop_front();
}
printf("removed rollbacks\n");
return Void();
}
@ -1071,16 +1036,12 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
state bool snapshotEligible; // just wrote a delta file or just took granule over from another worker
state bool justDidRollback = false;
printf(
"metadata %s %s\n", metadata->keyRange.begin.printable().c_str(), metadata->keyRange.end.printable().c_str());
try {
// set resume snapshot so it's not valid until we pause to ask the blob manager for a re-snapshot
metadata->resumeSnapshot.send(Void());
// before starting, make sure worker persists range assignment and acquires the granule lock
printf("before wait on assignFuture\n");
GranuleChangeFeedInfo _info = wait(assignFuture);
printf("after wait on assignFuture\n");
changeFeedInfo = _info;
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
@ -1175,14 +1136,11 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
changeFeedInfo.granuleSplitFrom.get() /*metadata->keyRange*/);
} else {
readOldChangeFeed = false;
printf("before getChangeFeedStream, my ID is %s\n", bwData->id.toString().c_str());
changeFeedFuture = bwData->db->getChangeFeedStream(
changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange);
}
loop {
printf("bw %s\n", bwData->id.toString().c_str());
// check outstanding snapshot/delta files for completion
if (inFlightBlobSnapshot.isValid() && inFlightBlobSnapshot.isReady()) {
BlobFileIndex completedSnapshot = wait(inFlightBlobSnapshot);
@ -1198,8 +1156,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
if (!inFlightBlobSnapshot.isValid()) {
printf("!inFlightBlobSnapshot.isValid\n");
printf("inFlightDeltaFiles.size() = %d\n", inFlightDeltaFiles.size());
while (inFlightDeltaFiles.size() > 0) {
if (inFlightDeltaFiles.front().future.isReady()) {
BlobFileIndex completedDeltaFile = wait(inFlightDeltaFiles.front().future);
@ -1227,7 +1183,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
state Standalone<VectorRef<MutationsAndVersionRef>> mutations;
if (readOldChangeFeed) {
printf("readOldChangeFeed\n");
Standalone<VectorRef<MutationsAndVersionRef>> oldMutations = waitNext(oldChangeFeedStream.getFuture());
// TODO filter old mutations won't be necessary, SS does it already
if (filterOldMutations(
@ -1254,12 +1209,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
// process mutations
printf("mutations.size() == %d\n", mutations.size());
for (MutationsAndVersionRef d : mutations) {
state MutationsAndVersionRef deltas = d;
if (deltas.version >= 158685394) {
printf("deltas.version=%lld\n", deltas.version);
}
ASSERT(deltas.version >= metadata->bufferedDeltaVersion.get());
// Write a new delta file IF we have enough bytes, and we have all of the previous version's stuff
// there to ensure no versions span multiple delta files. Check this by ensuring the version of this
@ -1331,8 +1282,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// bunch of extra delta files at some point, even if we don't consider it for a split yet
if (snapshotEligible && metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT &&
!readOldChangeFeed) {
printf("snapshotEligible && metadata->bytesInNewDeltaFiles >= "
"SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT && !readOldChangeFeed\n");
if (BW_DEBUG && (inFlightBlobSnapshot.isValid() || !inFlightDeltaFiles.empty())) {
printf("Granule [%s - %s) ready to re-snapshot, waiting for outstanding %d snapshot and %d "
"deltas to "
@ -1399,19 +1348,12 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
when(wait(bwData->currentManagerStatusStream.onChange())) {}
}
/*
if (bwData->dead.get()) {
std::cout << "bw detected dead in blobGranuleUpdateFiles" << std::endl;
throw actor_cancelled();
}
*/
if (BW_DEBUG) {
printf("Granule [%s - %s)\n, hasn't heard back from BM in BW %s, re-sending status\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
bwData->id.toString().c_str());
}
// wait(yield());
}
if (BW_DEBUG) {
@ -1437,8 +1379,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->bytesInNewDeltaFiles = 0;
} else if (snapshotEligible &&
metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT) {
printf("snapshotEligible && metadata->bytesInNewDeltaFiles >= "
"SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT\n");
// if we're in the old change feed case and can't snapshot but we have enough data to, don't
// queue too many delta files in parallel
while (inFlightDeltaFiles.size() > 10) {
@ -1469,7 +1409,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// finally, after we optionally write delta and snapshot files, add new mutations to buffer
if (!deltas.mutations.empty()) {
printf("!deltas.mutations.empty()\n");
if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) {
// Note rollbackVerision is durable, [rollbackVersion+1 - deltas.version] needs to be tossed
// For correctness right now, there can be no waits and yields either in rollback handling
@ -1586,11 +1525,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
}
justDidRollback = false;
std::cout << "looping in blobGranuleUpdateFiles" << std::endl;
}
} catch (Error& e) {
printf("IN CATCH FOR %s blobGranuleUpdateFiles -----\n", bwData->id.toString().c_str());
printf("error is %s\n", e.name());
if (e.code() == error_code_operation_cancelled) {
throw;
}
@ -1682,19 +1618,19 @@ static Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version
// if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete,
// nothing to do
if (v >= 162692991) {
printf(" [%s - %s) waiting for %lld\n readable:%s\n bufferedDelta=%lld\n pendingDelta=%lld\n "
"durableDelta=%lld\n pendingSnapshot=%lld\n durableSnapshot=%lld\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
v,
metadata->readable.isSet() ? "T" : "F",
metadata->bufferedDeltaVersion.get(),
metadata->pendingDeltaVersion,
metadata->durableDeltaVersion.get(),
metadata->pendingSnapshotVersion,
metadata->durableSnapshotVersion.get());
}
/*
printf(" [%s - %s) waiting for %lld\n readable:%s\n bufferedDelta=%lld\n pendingDelta=%lld\n "
"durableDelta=%lld\n pendingSnapshot=%lld\n durableSnapshot=%lld\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
v,
metadata->readable.isSet() ? "T" : "F",
metadata->bufferedDeltaVersion.get(),
metadata->pendingDeltaVersion,
metadata->durableDeltaVersion.get(),
metadata->pendingSnapshotVersion,
metadata->durableSnapshotVersion.get());
*/
if (metadata->readable.isSet() && v <= metadata->bufferedDeltaVersion.get() &&
(v <= metadata->durableDeltaVersion.get() ||
@ -1708,8 +1644,6 @@ static Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version
}
ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, BlobGranuleFileRequest req) {
printf(
"In handleBlobGranuleFileRequest for BW %s @ version %lld\n", bwData->id.toString().c_str(), req.readVersion);
try {
// TODO REMOVE in api V2
ASSERT(req.beginVersion == 0);
@ -1732,7 +1666,6 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
req.keyRange.end.printable().c_str());
}
printf("lastRangeEnd < r.begin() || !isValid\n");
throw wrong_shard_server();
}
granules.push_back(r.value().activeMetadata);
@ -1747,16 +1680,11 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
req.keyRange.end.printable().c_str());
}
printf("lastRangeEnd < req.keyRange.end\n");
throw wrong_shard_server();
}
// do work for each range
for (auto m : granules) {
if (req.readVersion >= 162692991) {
printf(
"For BW %s, granule: %s\n", bwData->id.toString().c_str(), m->keyRange.begin.printable().c_str());
}
state Reference<GranuleMetadata> metadata = m;
// try to check version_too_old, cancelled, waitForVersion without yielding first
if (metadata->readable.isSet() &&
@ -1773,7 +1701,6 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
throw transaction_too_old();
}
if (metadata->cancelled.isSet()) {
printf("metadata->cancelled.isSet()\n");
throw wrong_shard_server();
}
@ -1785,19 +1712,10 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
}
// rollback resets all of the version information, so we have to redo wait for version on rollback
state int rollbackCount = metadata->rollbackCount.get();
if (req.readVersion >= 162692991) {
printf("For BW %s, before choose\n", bwData->id.toString().c_str());
}
choose {
when(wait(waitForVersionFuture)) {}
when(wait(metadata->rollbackCount.onChange())) {}
when(wait(metadata->cancelled.getFuture())) {
printf("metadata->cancelled.getFuture()\n");
throw wrong_shard_server();
}
}
if (req.readVersion >= 162692991) {
printf("For BW %s, after choose\n", bwData->id.toString().c_str());
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
}
if (rollbackCount == metadata->rollbackCount.get()) {
@ -1810,10 +1728,6 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
}
}
if (req.readVersion >= 162692991) {
printf("For BW %s, after loop\n", bwData->id.toString().c_str());
}
// granule is up to date, do read
BlobGranuleChunkRef chunk;
@ -1911,7 +1825,6 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(Reference<BlobWorkerData> bwData,
AssignBlobRangeRequest req) {
printf("in persistAssignWorkerRange\n");
ASSERT(!req.continueAssignment);
state Transaction tr(bwData->db);
state Key lockKey = granuleLockKey(req.keyRange);
@ -1923,158 +1836,149 @@ ACTOR Future<GranuleChangeFeedInfo> persistAssignWorkerRange(Reference<BlobWorke
req.keyRange.end.printable().c_str());
}
try {
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// FIXME: could add list of futures and do the different parts that are disjoint in parallel?
info.changeFeedStartVersion = invalidVersion;
Optional<Value> prevLockValue = wait(tr.get(lockKey));
state bool hasPrevOwner = prevLockValue.present();
if (hasPrevOwner) {
std::tuple<int64_t, int64_t, UID> prevOwner = decodeBlobGranuleLockValue(prevLockValue.get());
acquireGranuleLock(
req.managerEpoch, req.managerSeqno, std::get<0>(prevOwner), std::get<1>(prevOwner));
info.changeFeedId = std::get<2>(prevOwner);
// FIXME: could add list of futures and do the different parts that are disjoint in parallel?
info.changeFeedStartVersion = invalidVersion;
Optional<Value> prevLockValue = wait(tr.get(lockKey));
state bool hasPrevOwner = prevLockValue.present();
if (hasPrevOwner) {
std::tuple<int64_t, int64_t, UID> prevOwner = decodeBlobGranuleLockValue(prevLockValue.get());
acquireGranuleLock(req.managerEpoch, req.managerSeqno, std::get<0>(prevOwner), std::get<1>(prevOwner));
info.changeFeedId = std::get<2>(prevOwner);
GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, req.keyRange));
info.existingFiles = granuleFiles;
info.doSnapshot = false;
GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, req.keyRange));
info.existingFiles = granuleFiles;
info.doSnapshot = false;
if (info.existingFiles.get().snapshotFiles.empty()) {
ASSERT(info.existingFiles.get().deltaFiles.empty());
info.previousDurableVersion = invalidVersion;
info.doSnapshot = true;
} else if (info.existingFiles.get().deltaFiles.empty()) {
info.previousDurableVersion = info.existingFiles.get().snapshotFiles.back().version;
} else {
info.previousDurableVersion = info.existingFiles.get().deltaFiles.back().version;
}
// for the non-splitting cases, this doesn't need to be 100% accurate, it just needs to be
// smaller than the next delta file write.
info.changeFeedStartVersion = info.previousDurableVersion;
} else {
// else we are first, no need to check for owner conflict
// FIXME: use actual 16 bytes of UID instead of converting it to 32 character string and then
// that to bytes
info.changeFeedId = deterministicRandom()->randomUniqueID();
wait(tr.registerChangeFeed(StringRef(info.changeFeedId.toString()), req.keyRange));
info.doSnapshot = true;
if (info.existingFiles.get().snapshotFiles.empty()) {
ASSERT(info.existingFiles.get().deltaFiles.empty());
info.previousDurableVersion = invalidVersion;
}
tr.set(lockKey, blobGranuleLockValueFor(req.managerEpoch, req.managerSeqno, info.changeFeedId));
wait(krmSetRange(
&tr, blobGranuleMappingKeys.begin, req.keyRange, blobGranuleMappingValueFor(bwData->id)));
Tuple historyKey;
historyKey.append(req.keyRange.begin).append(req.keyRange.end);
Optional<Value> parentGranulesValue =
wait(tr.get(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin)));
// If anything in previousGranules, need to do the handoff logic and set ret.previousChangeFeedId,
// and the previous durable version will come from the previous granules
if (parentGranulesValue.present()) {
state Standalone<VectorRef<KeyRangeRef>> parentGranules =
decodeBlobGranuleHistoryValue(parentGranulesValue.get());
// TODO REMOVE
if (BW_DEBUG) {
printf("Decoded parent granules for [%s - %s)\n",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
for (auto& pg : parentGranules) {
printf(" [%s - %s)\n", pg.begin.printable().c_str(), pg.end.printable().c_str());
}
}
// TODO change this for merge
ASSERT(parentGranules.size() == 1);
state std::pair<BlobGranuleSplitState, Version> granuleSplitState;
if (hasPrevOwner) {
std::pair<BlobGranuleSplitState, Version> _st =
wait(getGranuleSplitState(&tr, parentGranules[0], req.keyRange));
granuleSplitState = _st;
} else {
granuleSplitState = std::pair(BlobGranuleSplitState::Started, invalidVersion);
}
ASSERT(!hasPrevOwner || granuleSplitState.first > BlobGranuleSplitState::Started);
// if granule wasn't done with old change feed, load it
if (granuleSplitState.first < BlobGranuleSplitState::Done) {
Optional<Value> prevGranuleLockValue = wait(tr.get(granuleLockKey(parentGranules[0])));
ASSERT(prevGranuleLockValue.present());
std::tuple<int64_t, int64_t, UID> prevGranuleLock =
decodeBlobGranuleLockValue(prevGranuleLockValue.get());
info.prevChangeFeedId = std::get<2>(prevGranuleLock);
info.granuleSplitFrom = parentGranules[0];
if (granuleSplitState.first == BlobGranuleSplitState::Assigned) {
// was already assigned, use change feed start version
ASSERT(granuleSplitState.second != invalidVersion);
info.changeFeedStartVersion = granuleSplitState.second;
} else if (granuleSplitState.first == BlobGranuleSplitState::Started) {
wait(updateGranuleSplitState(&tr,
parentGranules[0],
req.keyRange,
info.prevChangeFeedId.get(),
BlobGranuleSplitState::Assigned));
// change feed was created as part of this transaction, changeFeedStartVersion will be
// set later
} else {
ASSERT(false);
}
}
if (info.doSnapshot) {
// only need to do snapshot if no files exist yet for this granule.
ASSERT(info.previousDurableVersion == invalidVersion);
// FIXME: store this somewhere useful for time travel reads
GranuleFiles prevFiles = wait(loadPreviousFiles(&tr, parentGranules[0]));
ASSERT(!prevFiles.snapshotFiles.empty() || !prevFiles.deltaFiles.empty());
info.blobFilesToSnapshot = prevFiles;
info.previousDurableVersion = info.blobFilesToSnapshot.get().deltaFiles.empty()
? info.blobFilesToSnapshot.get().snapshotFiles.back().version
: info.blobFilesToSnapshot.get().deltaFiles.back().version;
}
}
wait(tr.commit());
if (info.changeFeedStartVersion == invalidVersion) {
info.changeFeedStartVersion = tr.getCommittedVersion();
info.doSnapshot = true;
} else if (info.existingFiles.get().deltaFiles.empty()) {
info.previousDurableVersion = info.existingFiles.get().snapshotFiles.back().version;
} else {
ASSERT(info.changeFeedStartVersion != invalidVersion);
info.previousDurableVersion = info.existingFiles.get().deltaFiles.back().version;
}
TraceEvent("BlobWorkerPersistedAssignment", bwData->id).detail("Granule", req.keyRange);
return info;
} catch (Error& e) {
if (e.code() == error_code_granule_assignment_conflict) {
throw e;
}
wait(tr.onError(e));
// for the non-splitting cases, this doesn't need to be 100% accurate, it just needs to be
// smaller than the next delta file write.
info.changeFeedStartVersion = info.previousDurableVersion;
} else {
// else we are first, no need to check for owner conflict
// FIXME: use actual 16 bytes of UID instead of converting it to 32 character string and then
// that to bytes
info.changeFeedId = deterministicRandom()->randomUniqueID();
wait(tr.registerChangeFeed(StringRef(info.changeFeedId.toString()), req.keyRange));
info.doSnapshot = true;
info.previousDurableVersion = invalidVersion;
}
tr.set(lockKey, blobGranuleLockValueFor(req.managerEpoch, req.managerSeqno, info.changeFeedId));
wait(krmSetRange(&tr, blobGranuleMappingKeys.begin, req.keyRange, blobGranuleMappingValueFor(bwData->id)));
Tuple historyKey;
historyKey.append(req.keyRange.begin).append(req.keyRange.end);
Optional<Value> parentGranulesValue =
wait(tr.get(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin)));
// If anything in previousGranules, need to do the handoff logic and set ret.previousChangeFeedId,
// and the previous durable version will come from the previous granules
if (parentGranulesValue.present()) {
state Standalone<VectorRef<KeyRangeRef>> parentGranules =
decodeBlobGranuleHistoryValue(parentGranulesValue.get());
// TODO REMOVE
if (BW_DEBUG) {
printf("Decoded parent granules for [%s - %s)\n",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
for (auto& pg : parentGranules) {
printf(" [%s - %s)\n", pg.begin.printable().c_str(), pg.end.printable().c_str());
}
}
// TODO change this for merge
ASSERT(parentGranules.size() == 1);
state std::pair<BlobGranuleSplitState, Version> granuleSplitState;
if (hasPrevOwner) {
std::pair<BlobGranuleSplitState, Version> _st =
wait(getGranuleSplitState(&tr, parentGranules[0], req.keyRange));
granuleSplitState = _st;
} else {
granuleSplitState = std::pair(BlobGranuleSplitState::Started, invalidVersion);
}
ASSERT(!hasPrevOwner || granuleSplitState.first > BlobGranuleSplitState::Started);
// if granule wasn't done with old change feed, load it
if (granuleSplitState.first < BlobGranuleSplitState::Done) {
Optional<Value> prevGranuleLockValue = wait(tr.get(granuleLockKey(parentGranules[0])));
ASSERT(prevGranuleLockValue.present());
std::tuple<int64_t, int64_t, UID> prevGranuleLock =
decodeBlobGranuleLockValue(prevGranuleLockValue.get());
info.prevChangeFeedId = std::get<2>(prevGranuleLock);
info.granuleSplitFrom = parentGranules[0];
if (granuleSplitState.first == BlobGranuleSplitState::Assigned) {
// was already assigned, use change feed start version
ASSERT(granuleSplitState.second != invalidVersion);
info.changeFeedStartVersion = granuleSplitState.second;
} else if (granuleSplitState.first == BlobGranuleSplitState::Started) {
wait(updateGranuleSplitState(&tr,
parentGranules[0],
req.keyRange,
info.prevChangeFeedId.get(),
BlobGranuleSplitState::Assigned));
// change feed was created as part of this transaction, changeFeedStartVersion will be
// set later
} else {
ASSERT(false);
}
}
if (info.doSnapshot) {
// only need to do snapshot if no files exist yet for this granule.
ASSERT(info.previousDurableVersion == invalidVersion);
// FIXME: store this somewhere useful for time travel reads
GranuleFiles prevFiles = wait(loadPreviousFiles(&tr, parentGranules[0]));
ASSERT(!prevFiles.snapshotFiles.empty() || !prevFiles.deltaFiles.empty());
info.blobFilesToSnapshot = prevFiles;
info.previousDurableVersion = info.blobFilesToSnapshot.get().deltaFiles.empty()
? info.blobFilesToSnapshot.get().snapshotFiles.back().version
: info.blobFilesToSnapshot.get().deltaFiles.back().version;
}
}
wait(tr.commit());
if (info.changeFeedStartVersion == invalidVersion) {
info.changeFeedStartVersion = tr.getCommittedVersion();
} else {
ASSERT(info.changeFeedStartVersion != invalidVersion);
}
TraceEvent("BlobWorkerPersistedAssignment", bwData->id).detail("Granule", req.keyRange);
return info;
} catch (Error& e) {
if (e.code() == error_code_granule_assignment_conflict) {
throw e;
}
wait(tr.onError(e));
}
} catch (Error& e) {
printf("ERROR IN PERSIST: %s\n", e.name());
throw;
}
}
// try to use GranuleMetadata
ACTOR Future<Void> start(Reference<BlobWorkerData> bwData, GranuleRangeMetadata* meta, AssignBlobRangeRequest req) {
ASSERT(meta->activeMetadata.isValid());
meta->activeMetadata->originalReq = req;
meta->assignFuture = persistAssignWorkerRange(bwData, req);
meta->fileUpdaterFuture = blobGranuleUpdateFiles(bwData, meta->activeMetadata, meta->assignFuture);
// bwData->actors.add(meta->fileUpdaterFuture);
wait(success(meta->assignFuture));
return Void();
}
@ -2128,7 +2032,6 @@ ACTOR Future<bool> changeBlobRange(Reference<BlobWorkerData> bwData,
bool active,
bool disposeOnCleanup,
bool selfReassign) {
printf("changeBlobRange called\n");
if (BW_DEBUG) {
printf("%s range for [%s - %s): %s @ (%lld, %lld)\n",
selfReassign ? "Re-assigning" : "Changing",
@ -2154,7 +2057,9 @@ ACTOR Future<bool> changeBlobRange(Reference<BlobWorkerData> bwData,
for (auto& r : ranges) {
if (!active) {
if (r.value().activeMetadata.isValid() && r.value().activeMetadata->cancelled.canBeSet()) {
printf("Cancelling activeMetadata\n");
if (BW_DEBUG) {
printf("Cancelling activeMetadata\n");
}
r.value().activeMetadata->cancelled.send(Void());
}
}
@ -2201,7 +2106,7 @@ ACTOR Future<bool> changeBlobRange(Reference<BlobWorkerData> bwData,
GranuleRangeMetadata newMetadata = (active && newerRanges.empty())
? constructActiveBlobRange(bwData, keyRange, epoch, seqno)
: constructInactiveBlobRange(epoch, seqno);
newMetadata.id = 42;
bwData->granuleMetadata.insert(keyRange, newMetadata);
if (BW_DEBUG) {
printf("Inserting new range [%s - %s): %s @ (%lld, %lld)\n",
@ -2304,38 +2209,15 @@ ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
if (shouldStart) {
auto m = bwData->granuleMetadata.rangeContaining(req.keyRange.begin);
ASSERT(m.begin() == req.keyRange.begin && m.end() == req.keyRange.end);
printf("About to start for BW %s\n", bwData->id.toString().c_str());
wait(start(bwData, &m.value(), req));
/*
int count = 0;
// GranuleRangeMetadata& x;
for (auto& it : m) {
printf("BW %s ABOUT TO WAIT IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str());
wait(start(bwData, &it.value(), req));
printf("done waiting in handleRangeAssign\n");
count++;
}
ASSERT(count == 1);
// x.id = 42;
printf("BW %s ABOUT TO WAIT IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str());
// WAITING ON START BUT ITS NOT AN ACTOR!!!!!!! SO WHEN handlerangeassign gets operation_cancelled,
// it won't get propogated to start
// wait(start(bwData, x, req));
printf("done waiting in handleRangeAssign\n");
*/
}
}
if (!isSelfReassign) {
ASSERT(!req.reply.isSet());
printf("about to send reply\n");
req.reply.send(AssignBlobRangeReply(true));
printf("done sending reply\n");
}
return Void();
} catch (Error& e) {
printf("BW %s GOT ERROR %s IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str(), e.name());
state Error eState = e;
if (BW_DEBUG) {
printf("AssignRange [%s - %s) got error %s\n",
req.keyRange.begin.printable().c_str(),
@ -2343,25 +2225,19 @@ ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
e.name());
}
//
// if (futureAndNewGranule.get().second.isValid()) {
// wait(futureAndNewGranule.get().second->cancel(false));
//}
//
if (!isSelfReassign) {
if (canReplyWith(eState)) {
req.reply.sendError(eState);
if (canReplyWith(e)) {
req.reply.sendError(e);
}
}
throw eState;
throw;
}
}
ACTOR Future<Void> handleRangeRevoke(Reference<BlobWorkerData> bwData, RevokeBlobRangeRequest req) {
try {
bool _ =
bool _shouldStart =
wait(changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false));
req.reply.send(AssignBlobRangeReply(true));
return Void();
@ -2466,8 +2342,8 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
rep.interf = bwInterf;
recruitReply.send(rep);
self->actors.add(waitFailureServer(bwInterf.waitFailure.getFuture()));
self->actors.add(runCommitVersionChecks(self));
self->addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture()));
self->addActor.send(runCommitVersionChecks(self));
try {
loop choose {
@ -2477,7 +2353,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
req.keyRange.end.printable().c_str());*/
++self->stats.readRequests;
++self->stats.activeReadRequests;
self->actors.add(handleBlobGranuleFileRequest(self, req));
self->addActor.send(handleBlobGranuleFileRequest(self, req));
}
when(state GranuleStatusStreamRequest req = waitNext(bwInterf.granuleStatusStreamRequest.getFuture())) {
if (self->managerEpochOk(req.managerEpoch)) {
@ -2505,7 +2381,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
}
if (self->managerEpochOk(assignReq.managerEpoch)) {
self->actors.add(handleRangeAssign(self, assignReq, false));
self->addActor.send(handleRangeAssign(self, assignReq, false));
} else {
assignReq.reply.send(AssignBlobRangeReply(false));
}
@ -2524,13 +2400,13 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
}
if (self->managerEpochOk(revokeReq.managerEpoch)) {
self->actors.add(handleRangeRevoke(self, revokeReq));
self->addActor.send(handleRangeRevoke(self, revokeReq));
} else {
revokeReq.reply.send(AssignBlobRangeReply(false));
}
}
when(AssignBlobRangeRequest granuleToReassign = waitNext(self->granuleUpdateErrors.getFuture())) {
self->actors.add(handleRangeAssign(self, granuleToReassign, true));
self->addActor.send(handleRangeAssign(self, granuleToReassign, true));
}
when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) {
req.reply.send(Void());
@ -2540,25 +2416,19 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
break;
}
}
// when(wait(delay(10))) { throw granule_assignment_conflict(); }
when(wait(collection)) {
if (BW_DEBUG) {
printf("BW actor collection returned, exiting\n");
}
TraceEvent("BlobWorkerActorCollectionError");
ASSERT(false);
throw granule_assignment_conflict();
throw internal_error();
}
}
} catch (Error& e) {
if (BW_DEBUG) {
printf("Blob worker got error %s, exiting\n", e.name());
printf("Blob worker got error %s. Exiting...\n", e.name());
}
TraceEvent("BlobWorkerDied", self->id).error(e, true);
}
printf("cancelling actors for BW %s\n", self->id.toString().c_str());
self->actors.clear(false);
// self->dead = true;
return Void();
}