Cleaned up debugging and fixed a couple bugs

This commit is contained in:
Josh Slocum 2021-08-31 12:30:43 -05:00
parent bbeec49533
commit 46adada5ff
5 changed files with 525 additions and 389 deletions

View File

@ -6777,6 +6777,7 @@ Future<Void> DatabaseContext::popRangeFeedMutations(StringRef rangeID, Version v
ACTOR Future<Void> getBlobGranuleRangesStreamActor(Reference<DatabaseContext> db,
PromiseStream<KeyRange> results,
KeyRange keyRange) {
// FIXME: use streaming range read
state Database cx(db);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state KeyRange currentRange = keyRange;
@ -6820,191 +6821,189 @@ struct BWLocationInfo : MultiInterface<ReferencedInterface<BlobWorkerInterface>>
ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db,
PromiseStream<Standalone<BlobGranuleChunkRef>> results,
KeyRange keyRange,
KeyRange range,
Version begin,
Optional<Version> end) { // end not present is just latest
state Database cx(db);
try {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state RangeResult blobGranuleMapping;
state Version endVersion;
state Key granuleStartKey;
state Key granuleEndKey;
state int i;
state UID workerId;
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state RangeResult blobGranuleMapping;
state Version endVersion;
state Key granuleStartKey;
state Key granuleEndKey;
state KeyRange keyRange = range;
state int i, loopCounter = 0;
state UID workerId;
loop {
try {
// FIXME NOW: handle errors, handle mapping changes
// FIXME: Use streaming parallelism?
// Read mapping and worker interfaces from DB
loopCounter++;
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (loopCounter == 1) {
// if retrying, use new version for mapping but original version for read version
if (end.present()) {
endVersion = end.get();
} else {
Version _end = wait(tr->getReadVersion());
endVersion = _end;
}
}
// Read mapping and worker interfaces from DB
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (end.present()) {
endVersion = end.get();
} else {
Version _end = wait(tr->getReadVersion());
endVersion = _end;
}
// Right now just read whole blob range assignments from DB
// FIXME: eventually we probably want to cache this and invalidate similarly to storage servers.
// Cache misses could still read from the DB, or we could add it to the Transaction State Store and
// have proxies serve it from memory.
RangeResult _bgMapping = wait(krmGetRanges(
tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
blobGranuleMapping = _bgMapping;
if (blobGranuleMapping.more) {
// TODO REMOVE
if (BG_REQUEST_DEBUG) {
printf("BG Mapping for [%s - %s) too large!\n");
}
throw unsupported_operation();
}
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
// Right now just read whole blob range assignments from DB
// FIXME: eventually we probably want to cache this and invalidate similarly to storage servers.
// Cache misses could still read from the DB, or we could add it to the Transaction State Store and have
// proxies serve it from memory.
RangeResult _bgMapping = wait(krmGetRanges(
tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
blobGranuleMapping = _bgMapping;
if (blobGranuleMapping.more) {
// TODO REMOVE
printf("BG Mapping for [%s - %s) too large!\n");
throw unsupported_operation();
}
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
if (blobGranuleMapping.size() == 0) {
printf("no blob worker assignments yet \n");
throw transaction_too_old();
}
if (BG_REQUEST_DEBUG) {
printf("Doing blob granule request @ %lld\n", endVersion);
printf("blob worker assignments:\n");
}
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
granuleStartKey = blobGranuleMapping[i].key;
granuleEndKey = blobGranuleMapping[i + 1].key;
if (!blobGranuleMapping[i].value.size()) {
printf("Key range [%s - %s) missing worker assignment!\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str());
// TODO probably new exception type instead
if (blobGranuleMapping.size() == 0) {
if (BG_REQUEST_DEBUG) {
printf("no blob worker assignments yet \n");
}
throw transaction_too_old();
}
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
if (BG_REQUEST_DEBUG) {
printf(" [%s - %s): %s\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
workerId.toString().c_str());
printf("Doing blob granule request @ %lld\n", endVersion);
printf("blob worker assignments:\n");
}
if (!cx->blobWorker_interf.count(workerId)) {
Optional<Value> workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId)));
ASSERT(workerInterface.present());
cx->blobWorker_interf[workerId] = decodeBlobWorkerListValue(workerInterface.get());
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
granuleStartKey = blobGranuleMapping[i].key;
granuleEndKey = blobGranuleMapping[i + 1].key;
if (!blobGranuleMapping[i].value.size()) {
if (BG_REQUEST_DEBUG) {
printf("Key range [%s - %s) missing worker assignment!\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str());
// TODO probably new exception type instead
}
throw transaction_too_old();
}
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
if (BG_REQUEST_DEBUG) {
printf(" decoded worker interface for %s\n", workerId.toString().c_str());
printf(" [%s - %s): %s\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
workerId.toString().c_str());
}
if (!cx->blobWorker_interf.count(workerId)) {
Optional<Value> workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId)));
ASSERT(workerInterface.present());
cx->blobWorker_interf[workerId] = decodeBlobWorkerListValue(workerInterface.get());
if (BG_REQUEST_DEBUG) {
printf(" decoded worker interface for %s\n", workerId.toString().c_str());
}
}
}
break;
} catch (Error& e) {
wait(tr->onError(e));
}
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
// Make request for each granule
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
granuleStartKey = blobGranuleMapping[i].key;
granuleEndKey = blobGranuleMapping[i + 1].key;
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
// prune first/last granules to requested range
if (i == 0) {
granuleStartKey = keyRange.begin;
}
if (i == blobGranuleMapping.size() - 2) {
granuleEndKey = keyRange.end;
}
state BlobGranuleFileRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
req.beginVersion = begin;
req.readVersion = endVersion;
// Make request for each granule
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
granuleStartKey = blobGranuleMapping[i].key;
granuleEndKey = blobGranuleMapping[i + 1].key;
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
// prune first/last granules to requested range
if (i == 0) {
granuleStartKey = keyRange.begin;
}
if (i == blobGranuleMapping.size() - 2) {
granuleEndKey = keyRange.end;
}
std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>> v;
v.push_back(makeReference<ReferencedInterface<BlobWorkerInterface>>(cx->blobWorker_interf[workerId]));
state Reference<MultiInterface<ReferencedInterface<BlobWorkerInterface>>> location =
makeReference<BWLocationInfo>(v);
// use load balance with one option for now for retry and error handling
BlobGranuleFileReply rep = wait(loadBalance(location,
&BlobWorkerInterface::blobGranuleFileRequest,
req,
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::False,
nullptr));
state BlobGranuleFileRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
req.beginVersion = begin;
req.readVersion = endVersion;
std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>> v;
v.push_back(makeReference<ReferencedInterface<BlobWorkerInterface>>(cx->blobWorker_interf[workerId]));
state Reference<MultiInterface<ReferencedInterface<BlobWorkerInterface>>> location =
makeReference<BWLocationInfo>(v);
// use load balance with one option for now for retry and error handling
BlobGranuleFileReply rep = wait(loadBalance(location,
&BlobWorkerInterface::blobGranuleFileRequest,
req,
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::False,
nullptr));
/*ErrorOr<BlobGranuleFileReply> _rep =
wait(cx->blobWorker_interf[workerId].blobGranuleFileRequest.tryGetReply(req));
if (_rep.isError()) {
throw _rep.getError();
}
BlobGranuleFileReply rep = _rep.get();*/
if (BG_REQUEST_DEBUG) {
printf("Blob granule request for [%s - %s) @ %lld - %lld got reply from %s:\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
begin,
endVersion,
workerId.toString().c_str());
}
for (auto& chunk : rep.chunks) {
if (BG_REQUEST_DEBUG) {
printf("[%s - %s)\n",
chunk.keyRange.begin.printable().c_str(),
chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n",
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
printf(" with version [%lld - %lld]",
chunk.newDeltas[0].version,
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
}
printf(" IncludedVersion: %lld\n", chunk.includedVersion);
printf("\n\n");
printf("Blob granule request for [%s - %s) @ %lld - %lld got reply from %s:\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
begin,
endVersion,
workerId.toString().c_str());
}
Arena a;
a.dependsOn(rep.arena);
results.send(Standalone<BlobGranuleChunkRef>(chunk, a));
}
for (auto& chunk : rep.chunks) {
if (BG_REQUEST_DEBUG) {
printf("[%s - %s)\n",
chunk.keyRange.begin.printable().c_str(),
chunk.keyRange.end.printable().c_str());
/*state PromiseStream<RangeResult> results;
state Future<Void> granuleReader = readBlobGranules(req, rep, bstore, results);
try {
loop {
// printf("Waiting for result chunk\n");
RangeResult result = waitNext(results.getFuture());
printf("Result chunk (%d):\n", result.size());
int resultIdx = 0;
for (auto& it : result) {
printf(" %s=%s\n", it.key.printable().c_str(), it.value.printable().c_str());
resultIdx++;
if (resultIdx >= 10) {
break;
}
}
if (resultIdx >= 10) {
printf(" ...\n");
}
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
printf("granule reader got unexpected error %s\n", e.name());
} else {
// printf("granule reader got end of stream\n");
printf("\n");
}
}*/
printf(" SnapshotFile:\n %s\n",
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
printf(" with version [%lld - %lld]",
chunk.newDeltas[0].version,
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
}
printf(" IncludedVersion: %lld\n", chunk.includedVersion);
printf("\n\n");
}
Arena a;
a.dependsOn(rep.arena);
results.send(Standalone<BlobGranuleChunkRef>(chunk, a));
keyRange = KeyRangeRef(chunk.keyRange.end, keyRange.end);
}
}
results.sendError(end_of_stream());
return Void();
} catch (Error& e) {
// only print this error with exponential backoff
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
e.code() == error_code_connection_failed) {
// TODO would invalidate mapping cache here if we had it
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
} else {
if (BG_REQUEST_DEBUG) {
printf("blob granule file request got unexpected error %s\n", e.name());
}
results.sendError(e);
return Void();
}
// TODO add a wait here!
}
results.sendError(end_of_stream());
} catch (Error& e) {
printf("blob granule file request got error %s\n", e.name());
}
return Void();
}
Future<Void> DatabaseContext::readBlobGranulesStream(const PromiseStream<Standalone<BlobGranuleChunkRef>>& results,

View File

@ -1082,10 +1082,10 @@ const KeyRangeRef configKnobKeys("\xff\xff/knobs/"_sr, "\xff\xff/knobs0"_sr);
const KeyRangeRef configClassKeys("\xff\xff/configClasses/"_sr, "\xff\xff/configClasses0"_sr);
// key to watch for changes in active blob ranges + KeyRangeMap of active blob ranges
// TODO go over which of these should be \xff/ and \xff\x02/
// Blob Manager + Worker stuff is all \xff\x02 to avoid Transaction State Store
const KeyRef blobRangeChangeKey = LiteralStringRef("\xff\x02/blobRangeChange");
const KeyRangeRef blobRangeKeys(LiteralStringRef("\xff/blobRange/"), LiteralStringRef("\xff/blobRange0"));
const KeyRef blobManagerEpochKey = LiteralStringRef("\xff\x02/blobRangeChange");
const KeyRangeRef blobRangeKeys(LiteralStringRef("\xff\x02/blobRange/"), LiteralStringRef("\xff\x02/blobRange0"));
const KeyRef blobManagerEpochKey = LiteralStringRef("\xff\x02/blobManagerEpoch");
const Value blobManagerEpochValueFor(int64_t epoch) {
BinaryWriter wr(Unversioned());
@ -1101,9 +1101,9 @@ int64_t decodeBlobManagerEpochValue(ValueRef const& value) {
}
// blob range file data
const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff/bgf/"), LiteralStringRef("\xff/bgf0"));
const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff/bgm/"), LiteralStringRef("\xff/bgm0"));
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff/bgl/"), LiteralStringRef("\xff/bgl0"));
const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff\x02/bgf/"), LiteralStringRef("\xff\x02/bgf0"));
const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), LiteralStringRef("\xff\x02/bgm0"));
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0"));
const Value blobGranuleMappingValueFor(UID const& workerID) {
BinaryWriter wr(Unversioned());
@ -1133,7 +1133,7 @@ std::pair<int64_t, int64_t> decodeBlobGranuleLockValue(const ValueRef& value) {
return std::pair(epoch, seqno);
}
const KeyRangeRef blobWorkerListKeys(LiteralStringRef("\xff/bwList/"), LiteralStringRef("\xff/bwList0"));
const KeyRangeRef blobWorkerListKeys(LiteralStringRef("\xff\x02/bwList/"), LiteralStringRef("\xff\x02/bwList0"));
const Key blobWorkerListKeyFor(UID workerID) {
BinaryWriter wr(Unversioned());

View File

@ -29,6 +29,8 @@
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include
#define BM_DEBUG 1
// TODO add comments + documentation
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
Arena ar,
@ -37,10 +39,12 @@ void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
KeyRef rangeStart,
KeyRef rangeEnd,
bool rangeActive) {
/*printf("db range [%s - %s): %s\n",
rangeStart.printable().c_str(),
rangeEnd.printable().c_str(),
rangeActive ? "T" : "F");*/
if (BM_DEBUG) {
printf("db range [%s - %s): %s\n",
rangeStart.printable().c_str(),
rangeEnd.printable().c_str(),
rangeActive ? "T" : "F");
}
KeyRange keyRange(KeyRangeRef(rangeStart, rangeEnd));
auto allRanges = knownBlobRanges->intersectingRanges(keyRange);
for (auto& r : allRanges) {
@ -49,21 +53,21 @@ void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
KeyRef overlapEnd = (keyRange.end < r.end()) ? keyRange.end : r.end();
KeyRangeRef overlap(overlapStart, overlapEnd);
if (rangeActive) {
/*printf("BM Adding client range [%s - %s)\n",
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());*/
if (BM_DEBUG) {
printf("BM Adding client range [%s - %s)\n",
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());
}
rangesToAdd->push_back_deep(ar, overlap);
} else {
/*printf("BM Removing client range [%s - %s)\n",
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());*/
if (BM_DEBUG) {
printf("BM Removing client range [%s - %s)\n",
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());
}
rangesToRemove->push_back_deep(ar, overlap);
}
}
// results.emplace_back(r.range(), r.value());
// printf(" [%s - %s): %s\n", r.begin().printable().c_str(), r.end().printable().c_str(), r.value() ? "T" :
// "F");
}
knownBlobRanges->insert(keyRange, rangeActive);
}
@ -73,11 +77,13 @@ void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
Arena ar,
VectorRef<KeyRangeRef>* rangesToAdd,
VectorRef<KeyRangeRef>* rangesToRemove) {
/*printf("Updating %d client blob ranges", dbBlobRanges.size() / 2);
for (int i = 0; i < dbBlobRanges.size() - 1; i += 2) {
printf(" [%s - %s)", dbBlobRanges[i].key.printable().c_str(), dbBlobRanges[i + 1].key.printable().c_str());
if (BM_DEBUG) {
printf("Updating %d client blob ranges", dbBlobRanges.size() / 2);
for (int i = 0; i < dbBlobRanges.size() - 1; i += 2) {
printf(" [%s - %s)", dbBlobRanges[i].key.printable().c_str(), dbBlobRanges[i + 1].key.printable().c_str());
}
printf("\n");
}
printf("\n");*/
// essentially do merge diff of current known blob ranges and new ranges, to assign new ranges to
// workers and revoke old ranges from workers
@ -98,21 +104,27 @@ void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
}
for (int i = 0; i < dbBlobRanges.size() - 1; i++) {
if (dbBlobRanges[i].key >= normalKeys.end) {
printf("Found invalid blob range start %s\n", dbBlobRanges[i].key.printable().c_str());
if (BM_DEBUG) {
printf("Found invalid blob range start %s\n", dbBlobRanges[i].key.printable().c_str());
}
break;
}
bool active = dbBlobRanges[i].value == LiteralStringRef("1");
if (active) {
ASSERT(dbBlobRanges[i + 1].value == StringRef());
printf("BM sees client range [%s - %s)\n",
dbBlobRanges[i].key.printable().c_str(),
dbBlobRanges[i + 1].key.printable().c_str());
if (BM_DEBUG) {
printf("BM sees client range [%s - %s)\n",
dbBlobRanges[i].key.printable().c_str(),
dbBlobRanges[i + 1].key.printable().c_str());
}
}
KeyRef endKey = dbBlobRanges[i + 1].key;
if (endKey > normalKeys.end) {
printf("Removing system keyspace from blob range [%s - %s)\n",
dbBlobRanges[i].key.printable().c_str(),
endKey.printable().c_str());
if (BM_DEBUG) {
printf("Removing system keyspace from blob range [%s - %s)\n",
dbBlobRanges[i].key.printable().c_str(),
endKey.printable().c_str());
}
endKey = normalKeys.end;
}
handleClientBlobRange(
@ -132,11 +144,16 @@ void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
}
void getRanges(std::vector<std::pair<KeyRangeRef, bool>>& results, KeyRangeMap<bool>& knownBlobRanges) {
printf("Getting ranges:\n");
if (BM_DEBUG) {
printf("Getting ranges:\n");
}
auto allRanges = knownBlobRanges.ranges();
for (auto& r : allRanges) {
results.emplace_back(r.range(), r.value());
printf(" [%s - %s): %s\n", r.begin().printable().c_str(), r.end().printable().c_str(), r.value() ? "T" : "F");
if (BM_DEBUG) {
printf(
" [%s - %s): %s\n", r.begin().printable().c_str(), r.end().printable().c_str(), r.value() ? "T" : "F");
}
}
}
@ -186,7 +203,9 @@ ACTOR Future<Void> nukeBlobWorkerData(BlobManagerData* bmData) {
return Void();
} catch (Error& e) {
printf("Nuking blob worker data got error %s\n", e.name());
if (BM_DEBUG) {
printf("Nuking blob worker data got error %s\n", e.name());
}
wait(tr->onError(e));
}
}
@ -196,13 +215,17 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitNewRange(Reference<ReadYourWrit
// TODO is it better to just pass empty metrics to estimated?
// TODO handle errors here by pulling out into its own transaction instead of the main loop's transaction, and
// retrying
printf("Splitting new range [%s - %s)\n", range.begin.printable().c_str(), range.end.printable().c_str());
if (BM_DEBUG) {
printf("Splitting new range [%s - %s)\n", range.begin.printable().c_str(), range.end.printable().c_str());
}
StorageMetrics estimated = wait(tr->getTransaction().getStorageMetrics(range, CLIENT_KNOBS->TOO_MANY));
printf("Estimated bytes for [%s - %s): %lld\n",
range.begin.printable().c_str(),
range.end.printable().c_str(),
estimated.bytes);
if (BM_DEBUG) {
printf("Estimated bytes for [%s - %s): %lld\n",
range.begin.printable().c_str(),
range.end.printable().c_str(),
estimated.bytes);
}
if (estimated.bytes > SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
// printf(" Splitting range\n");
@ -228,14 +251,18 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitNewRange(Reference<ReadYourWrit
static UID pickWorkerForAssign(BlobManagerData* bmData) {
// FIXME: Right now just picks a random worker, this is very suboptimal
int idx = deterministicRandom()->randomInt(0, bmData->workersById.size());
printf("picked random worker %d: ", idx);
if (BM_DEBUG) {
printf("picked random worker %d: ", idx);
}
auto it = bmData->workersById.begin();
while (idx > 0) {
idx--;
it++;
}
printf("%s\n", it->first.toString().c_str());
if (BM_DEBUG) {
printf("%s\n", it->first.toString().c_str());
}
return it->first;
}
@ -247,18 +274,22 @@ ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment as
req.managerSeqno = seqNo;
req.isAssign = assignment.isAssign;
printf("BM %s %s range [%s - %s) @ (%lld, %lld)\n",
workerID.toString().c_str(),
req.isAssign ? "assigning" : "revoking",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.managerEpoch,
req.managerSeqno);
if (BM_DEBUG) {
printf("BM %s %s range [%s - %s) @ (%lld, %lld)\n",
workerID.toString().c_str(),
req.isAssign ? "assigning" : "revoking",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.managerEpoch,
req.managerSeqno);
}
try {
AssignBlobRangeReply rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req));
if (!rep.epochOk) {
printf("BM heard from BW that there is a new manager with higher epoch\n");
if (BM_DEBUG) {
printf("BM heard from BW that there is a new manager with higher epoch\n");
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
@ -267,14 +298,16 @@ ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment as
// TODO confirm: using reliable delivery this should only trigger if the worker is marked as failed, right?
// So assignment needs to be retried elsewhere, and a revoke is trivially complete
if (assignment.isAssign) {
printf("BM got error assigning range [%s - %s) to worker %s, requeueing\n",
assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str());
if (BM_DEBUG) {
printf("BM got error assigning range [%s - %s) to worker %s, requeueing\n",
assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str());
}
// re-send revoke to queue to handle range being un-assigned from that worker before the new one
bmData->rangesToAssign.send(RangeAssignment(assignment.keyRange, false));
bmData->rangesToAssign.send(assignment);
// FIXME: improvement would be to add history of failed workers to assignment so it can try other ones first
} else {
} else if (BM_DEBUG) {
printf("BM got error revoking range [%s - %s) from worker %s, ignoring\n",
assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str());
@ -348,7 +381,9 @@ ACTOR Future<int64_t> acquireManagerLock(BlobManagerData* bmData) {
wait(tr->commit());
return newEpoch;
} catch (Error& e) {
printf("Acquiring blob manager lock got error %s\n", e.name());
if (BM_DEBUG) {
printf("Acquiring blob manager lock got error %s\n", e.name());
}
wait(tr->onError(e));
}
}
@ -358,7 +393,9 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
loop {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
printf("Blob manager checking for range updates\n");
if (BM_DEBUG) {
printf("Blob manager checking for range updates\n");
}
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -377,9 +414,11 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
updateClientBlobRanges(&bmData->knownBlobRanges, results, ar, &rangesToAdd, &rangesToRemove);
for (KeyRangeRef range : rangesToRemove) {
printf("BM Got range to revoke [%s - %s)\n",
range.begin.printable().c_str(),
range.end.printable().c_str());
if (BM_DEBUG) {
printf("BM Got range to revoke [%s - %s)\n",
range.begin.printable().c_str(),
range.end.printable().c_str());
}
bmData->rangesToAssign.send(RangeAssignment(range, false));
}
@ -393,14 +432,18 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
for (auto f : splitFutures) {
Standalone<VectorRef<KeyRef>> splits = wait(f);
printf("Split client range [%s - %s) into %d ranges:\n",
splits[0].printable().c_str(),
splits[splits.size() - 1].printable().c_str(),
splits.size() - 1);
if (BM_DEBUG) {
printf("Split client range [%s - %s) into %d ranges:\n",
splits[0].printable().c_str(),
splits[splits.size() - 1].printable().c_str(),
splits.size() - 1);
}
for (int i = 0; i < splits.size() - 1; i++) {
KeyRange range = KeyRange(KeyRangeRef(splits[i], splits[i + 1]));
printf(" [%s - %s)\n", range.begin.printable().c_str(), range.end.printable().c_str());
if (BM_DEBUG) {
printf(" [%s - %s)\n", range.begin.printable().c_str(), range.end.printable().c_str());
}
bmData->rangesToAssign.send(RangeAssignment(range, true));
}
@ -408,11 +451,15 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
state Future<Void> watchFuture = tr->watch(blobRangeChangeKey);
wait(tr->commit());
printf("Blob manager done processing client ranges, awaiting update\n");
if (BM_DEBUG) {
printf("Blob manager done processing client ranges, awaiting update\n");
}
wait(watchFuture);
break;
} catch (Error& e) {
printf("Blob manager got error looking for range updates %s\n", e.name());
if (BM_DEBUG) {
printf("Blob manager got error looking for range updates %s\n", e.name());
}
wait(tr->onError(e));
}
}
@ -430,20 +477,22 @@ ACTOR Future<Void> rangeMover(BlobManagerData* bmData) {
tries--;
auto randomRange = bmData->workerAssignments.randomRange();
if (randomRange.value() != UID()) {
printf("Range mover moving range [%s - %s): %s\n",
randomRange.begin().printable().c_str(),
randomRange.end().printable().c_str(),
randomRange.value().toString().c_str());
if (BM_DEBUG) {
printf("Range mover moving range [%s - %s): %s\n",
randomRange.begin().printable().c_str(),
randomRange.end().printable().c_str(),
randomRange.value().toString().c_str());
}
bmData->rangesToAssign.send(RangeAssignment(randomRange.range(), false));
bmData->rangesToAssign.send(RangeAssignment(randomRange.range(), true));
break;
}
}
if (tries == 0) {
if (tries == 0 && BM_DEBUG) {
printf("Range mover couldn't find range to move, skipping\n");
}
} else {
} else if (BM_DEBUG) {
printf("Range mover found %d workers, skipping\n", bmData->workerAssignments.size());
}
}
@ -459,14 +508,21 @@ ACTOR Future<Void> blobManager(LocalityData locality, Reference<AsyncVar<ServerD
state Future<Void> collection = actorCollection(addActor.getFuture());
// TODO remove once we have persistence + failure detection
printf("Blob manager nuking previous workers and range assignments on startup\n");
if (BM_DEBUG) {
printf("Blob manager nuking previous workers and range assignments on startup\n");
}
wait(nukeBlobWorkerData(&self));
printf("Blob manager nuked previous workers and range assignments\n");
printf("Blob manager taking lock\n");
if (BM_DEBUG) {
printf("Blob manager nuked previous workers and range assignments\n");
printf("Blob manager taking lock\n");
}
int64_t _epoch = wait(acquireManagerLock(&self));
self.epoch = _epoch;
printf("Blob manager acquired lock at epoch %lld\n", _epoch);
if (BM_DEBUG) {
printf("Blob manager acquired lock at epoch %lld\n", _epoch);
}
int numWorkers = 2;
for (int i = 0; i < numWorkers; i++) {
@ -478,12 +534,16 @@ ACTOR Future<Void> blobManager(LocalityData locality, Reference<AsyncVar<ServerD
addActor.send(monitorClientRanges(&self));
addActor.send(rangeAssigner(&self));
addActor.send(rangeMover(&self));
// TODO add back once everything is properly implemented!
// addActor.send(rangeMover(&self));
// TODO probably other things here eventually
loop choose {
when(wait(self.iAmReplaced.getFuture())) {
printf("Blob Manager exiting because it is replaced\n");
if (BM_DEBUG) {
printf("Blob Manager exiting because it is replaced\n");
}
return Void();
}
}

View File

@ -32,6 +32,9 @@
#include "flow/Arena.h"
#include "flow/actorcompiler.h" // has to be last include
#define BW_DEBUG true
#define BW_REQUEST_DEBUG false
// TODO add comments + documentation
struct BlobFileIndex {
Version version;
@ -69,15 +72,6 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
assignFuture = Never();
fileUpdaterFuture = Never();
}
~GranuleMetadata() {
// only print for "active" metadata
if (lastWriteVersion != 0) {
printf("Destroying granule metadata for [%s - %s)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
}
}
};
// for a range that may or may not be set
@ -114,11 +108,13 @@ struct BlobWorkerData {
static void acquireGranuleLock(int64_t epoch, int64_t seqno, std::pair<int64_t, int64_t> prevOwner) {
// returns true if our lock (E, S) >= (Eprev, Sprev)
if (epoch < prevOwner.first || (epoch == prevOwner.first && seqno < prevOwner.second)) {
printf("Lock acquire check failed. Proposed (%lld, %lld) < previous (%lld, %lld)\n",
epoch,
seqno,
prevOwner.first,
prevOwner.second);
if (BW_DEBUG) {
printf("Lock acquire check failed. Proposed (%lld, %lld) < previous (%lld, %lld)\n",
epoch,
seqno,
prevOwner.first,
prevOwner.second);
}
throw granule_assignment_conflict();
}
}
@ -130,11 +126,13 @@ static void checkGranuleLock(int64_t epoch, int64_t seqno, std::pair<int64_t, in
// returns true if we still own the lock, false if someone else does
if (epoch != currentOwner.first || seqno != currentOwner.second) {
printf("Lock assignment check failed. Expected (%lld, %lld), got (%lld, %lld)\n",
epoch,
seqno,
currentOwner.first,
currentOwner.second);
if (BW_DEBUG) {
printf("Lock assignment check failed. Expected (%lld, %lld), got (%lld, %lld)\n",
epoch,
seqno,
currentOwner.first,
currentOwner.second);
}
throw granule_assignment_conflict();
}
}
@ -202,10 +200,12 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
getFileValue(fname, 0, serialized.size()));
wait(tr->commit());
printf("blob worker updated fdb with delta file %s of size %d at version %lld\n",
fname.c_str(),
serialized.size(),
currentDeltaVersion);
if (BW_DEBUG) {
printf("blob worker updated fdb with delta file %s of size %d at version %lld\n",
fname.c_str(),
serialized.size(),
currentDeltaVersion);
}
return BlobFileIndex(currentDeltaVersion, fname, 0, serialized.size());
} catch (Error& e) {
wait(tr->onError(e));
@ -213,7 +213,9 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
}
} catch (Error& e) {
// if transaction throws non-retryable error, delete s3 file before exiting
printf("deleting s3 delta file %s after error %s\n", fname.c_str(), e.name());
if (BW_DEBUG) {
printf("deleting s3 delta file %s after error %s\n", fname.c_str(), e.name());
}
bwData->bstore->deleteFile(fname);
throw e;
}
@ -244,13 +246,15 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
}
}
printf("Granule [%s - %s) read %d snapshot rows\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
snapshot.size());
if (snapshot.size() < 10) {
for (auto& row : snapshot) {
printf(" %s=%s\n", row.key.printable().c_str(), row.value.printable().c_str());
if (BW_DEBUG) {
printf("Granule [%s - %s) read %d snapshot rows\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
snapshot.size());
if (snapshot.size() < 10) {
for (auto& row : snapshot) {
printf(" %s=%s\n", row.key.printable().c_str(), row.value.printable().c_str());
}
}
}
@ -295,24 +299,30 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
}
} catch (Error& e) {
// if transaction throws non-retryable error, delete s3 file before exiting
printf("deleting s3 snapshot file %s after error %s\n", fname.c_str(), e.name());
if (BW_DEBUG) {
printf("deleting s3 snapshot file %s after error %s\n", fname.c_str(), e.name());
}
bwData->bstore->deleteFile(fname);
throw e;
}
printf("Granule [%s - %s) committed new snapshot file %s with %d bytes\n\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
fname.c_str(),
serialized.size());
if (BW_DEBUG) {
printf("Granule [%s - %s) committed new snapshot file %s with %d bytes\n\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
fname.c_str(),
serialized.size());
}
return BlobFileIndex(version, fname, 0, serialized.size());
}
ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, Reference<GranuleMetadata> metadata) {
printf("Dumping snapshot from FDB for [%s - %s)\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
if (BW_DEBUG) {
printf("Dumping snapshot from FDB for [%s - %s)\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
}
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
loop {
@ -338,19 +348,23 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, R
BlobFileIndex f = wait(snapshotWriter);
return f;
} catch (Error& e) {
printf("Dumping snapshot from FDB for [%s - %s) got error %s\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
e.name());
if (BW_DEBUG) {
printf("Dumping snapshot from FDB for [%s - %s) got error %s\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
e.name());
}
wait(tr->onError(e));
}
}
}
ACTOR Future<BlobFileIndex> compactFromBlob(BlobWorkerData* bwData, Reference<GranuleMetadata> metadata) {
printf("Compacting snapshot from blob for [%s - %s)\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
if (BW_DEBUG) {
printf("Compacting snapshot from blob for [%s - %s)\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
}
ASSERT(!metadata->snapshotFiles.empty());
ASSERT(!metadata->deltaFiles.empty());
@ -375,15 +389,17 @@ ACTOR Future<BlobFileIndex> compactFromBlob(BlobWorkerData* bwData, Reference<Gr
}
chunk.includedVersion = version;
printf("Re-snapshotting [%s - %s) @ %lld\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
version);
if (BW_DEBUG) {
printf("Re-snapshotting [%s - %s) @ %lld\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
version);
printf(" SnapshotFile:\n %s\n", chunk.snapshotFile.get().toString().c_str());
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
printf(" SnapshotFile:\n %s\n", chunk.snapshotFile.get().toString().c_str());
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
}
loop {
@ -400,10 +416,12 @@ ACTOR Future<BlobFileIndex> compactFromBlob(BlobWorkerData* bwData, Reference<Gr
} catch (Error& e) {
// TODO better error handling eventually - should retry unless the error is because another worker took over
// the range
printf("Compacting snapshot from blob for [%s - %s) got error %s\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
e.name());
if (BW_DEBUG) {
printf("Compacting snapshot from blob for [%s - %s) got error %s\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
e.name());
}
throw e;
}
}
@ -441,11 +459,13 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// create range feed first so the version the SS start recording mutations <= the snapshot version
state std::pair<Key, Version> rangeFeedData = wait(createRangeFeed(bwData, metadata->keyRange));
printf("Successfully created range feed %s for [%s - %s) @ %lld\n",
rangeFeedData.first.printable().c_str(),
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
rangeFeedData.second);
if (BW_DEBUG) {
printf("Successfully created range feed %s for [%s - %s) @ %lld\n",
rangeFeedData.first.printable().c_str(),
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
rangeFeedData.second);
}
BlobFileIndex newSnapshotFile = wait(dumpInitialSnapshotFromFDB(bwData, metadata));
ASSERT(rangeFeedData.second <= newSnapshotFile.version);
@ -456,6 +476,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
rangeFeedStream, rangeFeedData.first, newSnapshotFile.version + 1, maxVersion, metadata->keyRange);
loop {
// TODO: Buggify delay in change feed stream
state Standalone<VectorRef<MutationsAndVersionRef>> mutations = waitNext(rangeFeedStream.getFuture());
for (auto& deltas : mutations) {
if (!deltas.mutations.empty()) {
@ -473,10 +494,12 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// TODO handle version batch barriers
if (metadata->currentDeltaBytes >= SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES &&
metadata->currentDeltaVersion > metadata->lastWriteVersion) {
printf("Granule [%s - %s) flushing delta file after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->currentDeltaBytes);
if (BW_DEBUG) {
printf("Granule [%s - %s) flushing delta file after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->currentDeltaBytes);
}
BlobFileIndex newDeltaFile = wait(writeDeltaFile(bwData,
metadata->keyRange,
metadata->lockEpoch,
@ -494,17 +517,21 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->currentDeltas = GranuleDeltas();
metadata->currentDeltaBytes = 0;
printf("Popping range feed %s at %lld\n\n",
rangeFeedData.first.printable().c_str(),
metadata->lastWriteVersion);
if (BW_DEBUG) {
printf("Popping range feed %s at %lld\n\n",
rangeFeedData.first.printable().c_str(),
metadata->lastWriteVersion);
}
wait(bwData->db->popRangeFeedMutations(rangeFeedData.first, metadata->lastWriteVersion));
}
if (metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT) {
printf("Granule [%s - %s) re-snapshotting after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->bytesInNewDeltaFiles);
if (BW_DEBUG) {
printf("Granule [%s - %s) re-snapshotting after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->bytesInNewDeltaFiles);
}
// FIXME: instead of just doing new snapshot, it should offer shard back to blob manager and get
// reassigned
// TODO: this could read from FDB read previous snapshot + delta files instead if it knew there was
@ -541,32 +568,38 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
KeyRef lastRangeEnd = req.keyRange.begin;
for (auto& r : checkRanges) {
if (lastRangeEnd < r.begin()) {
printf("No blob data for [%s - %s) in request range [%s - %s), skipping request\n",
lastRangeEnd.printable().c_str(),
r.begin().printable().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
req.reply.sendError(transaction_too_old());
if (BW_REQUEST_DEBUG) {
printf("No blob data for [%s - %s) in request range [%s - %s), skipping request\n",
lastRangeEnd.printable().c_str(),
r.begin().printable().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
}
req.reply.sendError(wrong_shard_server());
return;
}
if (!r.value().activeMetadata.isValid()) {
printf("No valid blob data for [%s - %s) in request range [%s - %s), skipping request\n",
lastRangeEnd.printable().c_str(),
r.begin().printable().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
req.reply.sendError(transaction_too_old());
if (BW_REQUEST_DEBUG) {
printf("No valid blob data for [%s - %s) in request range [%s - %s), skipping request\n",
lastRangeEnd.printable().c_str(),
r.begin().printable().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
}
req.reply.sendError(wrong_shard_server());
return;
}
lastRangeEnd = r.end();
}
if (lastRangeEnd < req.keyRange.end) {
printf("No blob data for [%s - %s) in request range [%s - %s), skipping request\n",
lastRangeEnd.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
req.reply.sendError(transaction_too_old());
if (BW_REQUEST_DEBUG) {
printf("No blob data for [%s - %s) in request range [%s - %s), skipping request\n",
lastRangeEnd.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
}
req.reply.sendError(wrong_shard_server());
return;
}
@ -594,6 +627,13 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
// if version is older than oldest snapshot file (or no snapshot files), throw too old
// FIXME: probably want a dedicated exception like blob_range_too_old or something instead
if (i < 0) {
if (BW_REQUEST_DEBUG) {
printf("Oldest snapshot file for [%s - %s) is @ %lld, later than request version %lld\n",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
metadata->snapshotFiles.size() == 0 ? 0 : metadata->snapshotFiles[0].version,
req.readVersion);
}
req.reply.sendError(transaction_too_old());
return;
}
@ -665,17 +705,21 @@ ACTOR Future<Void> persistAssignWorkerRange(BlobWorkerData* bwData, KeyRange key
wait(tr->commit());
printf("Blob worker %s persisted key range [%s - %s)\n",
bwData->id.toString().c_str(),
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
if (BW_DEBUG) {
printf("Blob worker %s persisted key range [%s - %s)\n",
bwData->id.toString().c_str(),
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
}
return Void();
} catch (Error& e) {
printf("Persisting key range [%s - %s) for blob worker %s got error %s\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
bwData->id.toString().c_str(),
e.name());
if (BW_DEBUG) {
printf("Persisting key range [%s - %s) for blob worker %s got error %s\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
bwData->id.toString().c_str(),
e.name());
}
wait(tr->onError(e));
}
}
@ -685,9 +729,11 @@ static GranuleRangeMetadata constructActiveBlobRange(BlobWorkerData* bwData,
KeyRange keyRange,
int64_t epoch,
int64_t seqno) {
printf("Creating new worker metadata for range [%s - %s)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
if (BW_DEBUG) {
printf("Creating new worker metadata for range [%s - %s)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
}
Reference<GranuleMetadata> newMetadata = makeReference<GranuleMetadata>();
newMetadata->keyRange = keyRange;
@ -722,12 +768,14 @@ static bool newerRangeAssignment(GranuleRangeMetadata oldMetadata, int64_t epoch
// already has a higher sequence number, that range was either revoked, or revoked and then re-assigned. Either way,
// this assignment is no longer valid.
static void changeBlobRange(BlobWorkerData* bwData, KeyRange keyRange, int64_t epoch, int64_t seqno, bool active) {
printf("Changing range for [%s - %s): %s @ (%lld, %lld)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
active ? "T" : "F",
epoch,
seqno);
if (BW_DEBUG) {
printf("Changing range for [%s - %s): %s @ (%lld, %lld)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
active ? "T" : "F",
epoch,
seqno);
}
// For each range that intersects this update:
// If the identical range already exists at the same assignment sequence nunmber, this is a noop
@ -740,7 +788,7 @@ static void changeBlobRange(BlobWorkerData* bwData, KeyRange keyRange, int64_t e
auto ranges = bwData->granuleMetadata.intersectingRanges(keyRange);
for (auto& r : ranges) {
if (r.value().lastEpoch == epoch && r.value().lastSeqno) {
if (r.value().lastEpoch == epoch && r.value().lastSeqno == seqno) {
// applied the same assignment twice, make idempotent
ASSERT(r.begin() == keyRange.begin);
ASSERT(r.end() == keyRange.end);
@ -749,11 +797,13 @@ static void changeBlobRange(BlobWorkerData* bwData, KeyRange keyRange, int64_t e
bool thisAssignmentNewer = newerRangeAssignment(r.value(), epoch, seqno);
if (r.value().activeMetadata.isValid() && thisAssignmentNewer) {
// cancel actors for old range and clear reference
printf(" [%s - %s): @ (%lld, %lld) (cancelling)\n",
r.begin().printable().c_str(),
r.end().printable().c_str(),
r.value().lastEpoch,
r.value().lastSeqno);
if (BW_DEBUG) {
printf(" [%s - %s): @ (%lld, %lld) (cancelling)\n",
r.begin().printable().c_str(),
r.end().printable().c_str(),
r.value().lastEpoch,
r.value().lastSeqno);
}
r.value().activeMetadata->cancel();
r.value().activeMetadata.clear();
} else if (!thisAssignmentNewer) {
@ -767,20 +817,24 @@ static void changeBlobRange(BlobWorkerData* bwData, KeyRange keyRange, int64_t e
? constructActiveBlobRange(bwData, keyRange, epoch, seqno)
: constructInactiveBlobRange(epoch, seqno);
bwData->granuleMetadata.insert(keyRange, newMetadata);
printf("Inserting new range [%s - %s): %s @ (%lld, %lld)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
newMetadata.activeMetadata.isValid() ? "T" : "F",
newMetadata.lastEpoch,
newMetadata.lastSeqno);
if (BW_DEBUG) {
printf("Inserting new range [%s - %s): %s @ (%lld, %lld)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
newMetadata.activeMetadata.isValid() ? "T" : "F",
newMetadata.lastEpoch,
newMetadata.lastSeqno);
}
for (auto& it : newerRanges) {
printf("Re-inserting newer range [%s - %s): %s @ (%lld, %lld)\n",
it.first.begin.printable().c_str(),
it.first.end.printable().c_str(),
it.second.activeMetadata.isValid() ? "T" : "F",
it.second.lastEpoch,
it.second.lastSeqno);
if (BW_DEBUG) {
printf("Re-inserting newer range [%s - %s): %s @ (%lld, %lld)\n",
it.first.begin.printable().c_str(),
it.first.end.printable().c_str(),
it.second.activeMetadata.isValid() ? "T" : "F",
it.second.lastEpoch,
it.second.lastSeqno);
}
bwData->granuleMetadata.insert(it.first, it.second);
}
}
@ -805,10 +859,14 @@ ACTOR Future<Void> registerBlobWorker(BlobWorkerData* bwData, BlobWorkerInterfac
wait(tr->commit());
printf("Registered blob worker %s\n", interf.id().toString().c_str());
if (BW_DEBUG) {
printf("Registered blob worker %s\n", interf.id().toString().c_str());
}
return Void();
} catch (Error& e) {
printf("Registering blob worker %s got error %s\n", interf.id().toString().c_str(), e.name());
if (BW_DEBUG) {
printf("Registering blob worker %s got error %s\n", interf.id().toString().c_str(), e.name());
}
wait(tr->onError(e));
}
}
@ -824,18 +882,28 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf, Reference<AsyncVar<S
self.id = bwInterf.id();
self.locality = bwInterf.locality;
printf("Initializing blob worker s3 stuff\n");
if (BW_DEBUG) {
printf("Initializing blob worker s3 stuff\n");
}
try {
if (g_network->isSimulated()) {
printf("BW constructing simulated backup container\n");
if (BW_DEBUG) {
printf("BW constructing simulated backup container\n");
}
self.bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/");
} else {
printf("BW constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str());
if (BW_DEBUG) {
printf("BW constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str());
}
self.bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
printf("BW constructed backup container\n");
if (BW_DEBUG) {
printf("BW constructed backup container\n");
}
}
} catch (Error& e) {
printf("BW got backup container init error %s\n", e.name());
if (BW_DEBUG) {
printf("BW got backup container init error %s\n", e.name());
}
return Void();
}
@ -856,25 +924,31 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf, Reference<AsyncVar<S
}
when(AssignBlobRangeRequest _req = waitNext(bwInterf.assignBlobRangeRequest.getFuture())) {
state AssignBlobRangeRequest req = _req;
printf("Worker %s %s range [%s - %s) @ (%lld, %lld)\n",
self.id.toString().c_str(),
req.isAssign ? "assigned" : "revoked",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.managerEpoch,
req.managerSeqno);
if (BW_DEBUG) {
printf("Worker %s %s range [%s - %s) @ (%lld, %lld)\n",
self.id.toString().c_str(),
req.isAssign ? "assigned" : "revoked",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.managerEpoch,
req.managerSeqno);
}
if (req.managerEpoch < self.currentManagerEpoch) {
printf("BW %s got request from old epoch %lld, notifying manager it is out of date\n",
self.id.toString().c_str(),
req.managerEpoch);
if (BW_DEBUG) {
printf("BW %s got request from old epoch %lld, notifying manager it is out of date\n",
self.id.toString().c_str(),
req.managerEpoch);
}
req.reply.send(AssignBlobRangeReply(false));
} else {
if (req.managerEpoch > self.currentManagerEpoch) {
self.currentManagerEpoch = req.managerEpoch;
printf("BW %s found new manager epoch %lld\n",
self.id.toString().c_str(),
self.currentManagerEpoch);
if (BW_DEBUG) {
printf("BW %s found new manager epoch %lld\n",
self.id.toString().c_str(),
self.currentManagerEpoch);
}
}
// TODO with range versioning, need to persist only after it's confirmed
@ -888,7 +962,9 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf, Reference<AsyncVar<S
}
}
} catch (Error& e) {
printf("Blob worker got error %s, exiting\n", e.name());
if (BW_DEBUG) {
printf("Blob worker got error %s, exiting\n", e.name());
}
TraceEvent("BlobWorkerDied", self.id).error(e, true);
}

View File

@ -251,6 +251,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
state std::pair<RangeResult, Version> fdb = wait(self->readFromFDB(cx, range));
RangeResult blob = wait(self->readFromBlob(cx, self, range, fdb.second));
if (self->compareResult(fdb.first, blob, range, fdb.second, true)) {
// TODO: bias for immediately re-reading to catch rollback cases
double reReadTime = currentTime + deterministicRandom()->random01() * self->timeTravelLimit;
int memory = fdb.first.expectedSize();
if (reReadTime <= endTime &&