Added blob granule reassignment and splitting

This commit is contained in:
Josh Slocum 2021-09-03 15:13:26 -05:00
parent 4671fe57ab
commit eb76343dfb
13 changed files with 1392 additions and 297 deletions

View File

@ -108,4 +108,5 @@ struct BlobGranuleChunkRef {
}
};
enum BlobGranuleSplitState { Unknown = 0, Started = 1, Assigned = 2, Done = 3 };
#endif

View File

@ -179,6 +179,7 @@ static void applyDelta(std::map<KeyRef, ValueRef>* dataMap, Arena& ar, KeyRangeR
if (m.param1 < keyRange.begin || m.param1 >= keyRange.end) {
return;
}
// TODO: we don't need atomics here since eager reads handles it
std::map<KeyRef, ValueRef>::iterator it = dataMap->find(m.param1);
if (m.type != MutationRef::SetValue) {
Optional<StringRef> oldVal;
@ -242,15 +243,24 @@ static void applyDeltas(std::map<KeyRef, ValueRef>* dataMap,
Arena& arena,
GranuleDeltas deltas,
KeyRangeRef keyRange,
Version readVersion) {
Version readVersion,
Version* lastFileEndVersion) {
if (!deltas.empty()) {
// check that consecutive delta file versions are disjoint
ASSERT(*lastFileEndVersion < deltas.front().version);
}
for (MutationsAndVersionRef& delta : deltas) {
if (delta.version > readVersion) {
break;
*lastFileEndVersion = readVersion;
return;
}
for (auto& m : delta.mutations) {
applyDelta(dataMap, arena, keyRange, m);
}
}
if (!deltas.empty()) {
*lastFileEndVersion = deltas.back().version;
}
}
// TODO: improve the interface of this function so that it doesn't need
@ -272,6 +282,7 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
try {
state std::map<KeyRef, ValueRef> dataMap;
state Version lastFileEndVersion = invalidVersion;
Future<Arena> readSnapshotFuture;
if (chunk.snapshotFile.present()) {
@ -301,13 +312,13 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
for (Future<Standalone<GranuleDeltas>> deltaFuture : readDeltaFutures) {
Standalone<GranuleDeltas> result = wait(deltaFuture);
arena.dependsOn(result.arena());
applyDeltas(&dataMap, arena, result, keyRange, readVersion);
applyDeltas(&dataMap, arena, result, keyRange, readVersion, &lastFileEndVersion);
wait(yield());
}
if (BG_READ_DEBUG) {
printf("Applying %d memory deltas\n", chunk.newDeltas.size());
}
applyDeltas(&dataMap, arena, chunk.newDeltas, keyRange, readVersion);
applyDeltas(&dataMap, arena, chunk.newDeltas, keyRange, readVersion, &lastFileEndVersion);
wait(yield());
RangeResult ret;

View File

@ -33,6 +33,8 @@ struct BlobWorkerInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct BlobGranuleFileRequest> blobGranuleFileRequest;
RequestStream<struct AssignBlobRangeRequest> assignBlobRangeRequest;
RequestStream<struct RevokeBlobRangeRequest> revokeBlobRangeRequest;
RequestStream<struct GranuleStatusStreamRequest> granuleStatusStreamRequest;
struct LocalityData locality;
UID myId;
@ -95,23 +97,88 @@ struct AssignBlobRangeReply {
}
};
struct AssignBlobRangeRequest {
struct RevokeBlobRangeRequest {
constexpr static FileIdentifier file_identifier = 4844288;
Arena arena;
KeyRangeRef keyRange;
int64_t managerEpoch;
int64_t managerSeqno;
bool isAssign; // true if assignment, false if revoke
bool dispose;
ReplyPromise<AssignBlobRangeReply> reply;
RevokeBlobRangeRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, managerEpoch, managerSeqno, dispose, reply, arena);
}
};
struct AssignBlobRangeRequest {
constexpr static FileIdentifier file_identifier = 905381;
Arena arena;
KeyRangeRef keyRange;
int64_t managerEpoch;
int64_t managerSeqno;
// If continueAssignment is true, this is just to instruct the worker that it still owns the range, so it should
// re-snapshot it and continue. If continueAssignment is false and previousGranules is empty, this is either the
// initial assignment to construct a previously non-existent granule, or a reassignment. Depending on what state
// exists for the granule currently, the worker will either start a new granule, or just pick up from where the
// previous worker left off.
// For a split or merge, continueAssignment==false.
// For a split, previousGranules will contain one granule that contains keyRange. For a merge, previousGranules will
// contain two or more granules, the union of which will be keyRange.
bool continueAssignment;
VectorRef<KeyRangeRef> previousGranules; // only set if there is a granule boundary change
ReplyPromise<AssignBlobRangeReply> reply;
AssignBlobRangeRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, managerEpoch, managerSeqno, isAssign, reply, arena);
serializer(ar, keyRange, managerEpoch, managerSeqno, continueAssignment, previousGranules, reply, arena);
}
};
// TODO once this
// reply per granule
// TODO: could eventually add other types of metrics to report back to the manager here
struct GranuleStatusReply : public ReplyPromiseStreamReply {
constexpr static FileIdentifier file_identifier = 7563104;
KeyRange granuleRange;
bool doSplit;
int64_t epoch;
int64_t seqno;
GranuleStatusReply() {}
explicit GranuleStatusReply(KeyRange range, bool doSplit, int64_t epoch, int64_t seqno)
: granuleRange(range), doSplit(doSplit), epoch(epoch), seqno(seqno) {}
int expectedSize() const { return sizeof(GranuleStatusReply) + granuleRange.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, granuleRange, doSplit, epoch, seqno);
}
};
// manager makes one request per worker, it sends all range updates through this stream
struct GranuleStatusStreamRequest {
constexpr static FileIdentifier file_identifier = 2289677;
int64_t managerEpoch;
ReplyPromiseStream<GranuleStatusReply> reply;
GranuleStatusStreamRequest() {}
explicit GranuleStatusStreamRequest(int64_t managerEpoch) : managerEpoch(managerEpoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, managerEpoch, reply);
}
};
#endif

View File

@ -6581,7 +6581,7 @@ Future<Standalone<VectorRef<MutationsAndVersionRef>>> DatabaseContext::getRangeF
ACTOR Future<Void> getRangeFeedStreamActor(Reference<DatabaseContext> db,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
StringRef rangeID,
Key rangeID,
Version begin,
Version end,
KeyRange range) {

View File

@ -752,11 +752,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// Blob granlues
init( BG_URL, "" );
// TODO CHANGE BACK
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 );
// init( BG_SNAPSHOT_FILE_TARGET_BYTES, 1000000 );
// init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 );
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 1000000 );
init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 );
init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 );
// TODO should discuss proper value for this
init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0;
// clang-format on
if (clientKnobs) {

View File

@ -704,6 +704,8 @@ public:
int BG_DELTA_FILE_TARGET_BYTES;
int BG_DELTA_BYTES_BEFORE_COMPACT;
double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure
ServerKnobs(Randomize, ClientKnobs*, IsSimulated);
void initialize(Randomize, ClientKnobs*, IsSimulated);
};

View File

@ -1104,6 +1104,7 @@ int64_t decodeBlobManagerEpochValue(ValueRef const& value) {
const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff\x02/bgf/"), LiteralStringRef("\xff\x02/bgf0"));
const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), LiteralStringRef("\xff\x02/bgm0"));
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0"));
const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), LiteralStringRef("\xff\x02/bgs0"));
const Value blobGranuleMappingValueFor(UID const& workerID) {
BinaryWriter wr(Unversioned());
@ -1118,19 +1119,35 @@ UID decodeBlobGranuleMappingValue(ValueRef const& value) {
return workerID;
}
const Value blobGranuleLockValueFor(int64_t epoch, int64_t seqno) {
const Value blobGranuleLockValueFor(int64_t epoch, int64_t seqno, UID changeFeedId) {
BinaryWriter wr(Unversioned());
wr << epoch;
wr << seqno;
wr << changeFeedId;
return wr.toValue();
}
std::pair<int64_t, int64_t> decodeBlobGranuleLockValue(const ValueRef& value) {
std::tuple<int64_t, int64_t, UID> decodeBlobGranuleLockValue(const ValueRef& value) {
int64_t epoch, seqno;
UID changeFeedId;
BinaryReader reader(value, Unversioned());
reader >> epoch;
reader >> seqno;
return std::pair(epoch, seqno);
reader >> changeFeedId;
return std::make_tuple(epoch, seqno, changeFeedId);
}
const Value blobGranuleSplitValueFor(BlobGranuleSplitState st) {
BinaryWriter wr(Unversioned());
wr << st;
return wr.toValue();
}
BlobGranuleSplitState decodeBlobGranuleSplitValue(const ValueRef& value) {
BlobGranuleSplitState st;
BinaryReader reader(value, Unversioned());
reader >> st;
return st;
}
const KeyRangeRef blobWorkerListKeys(LiteralStringRef("\xff\x02/bwList/"), LiteralStringRef("\xff\x02/bwList0"));

View File

@ -25,7 +25,7 @@
// Functions and constants documenting the organization of the reserved keyspace in the database beginning with "\xFF"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/BlobWorkerInterface.h" // TODO move the functions that depend on this out of here and into BlobWorkerInterface.h
#include "fdbclient/BlobWorkerInterface.h" // TODO move the functions that depend on this out of here and into BlobWorkerInterface.h to remove this depdendency
#include "fdbclient/StorageServerInterface.h"
// Don't warn on constants being defined in this file.
@ -535,16 +535,23 @@ extern const KeyRangeRef blobGranuleFileKeys;
// \xff/bgm/[[begin]] = [[BlobWorkerUID]]
extern const KeyRangeRef blobGranuleMappingKeys;
// \xff/bgl/(begin,end) = (epoch, seqno)
// \xff/bgl/(begin,end) = (epoch, seqno, changefeed id)
extern const KeyRangeRef blobGranuleLockKeys;
const Value blobGranuleLockValueFor(int64_t epochNum, int64_t sequenceNum);
std::pair<int64_t, int64_t> decodeBlobGranuleLockValue(ValueRef const& value);
// \xff/bgs/(oldbegin,oldend,newbegin) = state
extern const KeyRangeRef blobGranuleSplitKeys;
const Value blobGranuleMappingValueFor(UID const& workerID);
UID decodeBlobGranuleMappingValue(ValueRef const& value);
// \xff/blobWorkerList/[[BlobWorkerID]] = [[BlobWorkerInterface]]
const Value blobGranuleLockValueFor(int64_t epochNum, int64_t sequenceNum, UID changeFeedId);
// FIXME: maybe just define a struct?
std::tuple<int64_t, int64_t, UID> decodeBlobGranuleLockValue(ValueRef const& value);
const Value blobGranuleSplitValueFor(BlobGranuleSplitState st);
BlobGranuleSplitState decodeBlobGranuleSplitValue(ValueRef const& value);
// \xff/bwl/[[BlobWorkerID]] = [[BlobWorkerInterface]]
extern const KeyRangeRef blobWorkerListKeys;
const Key blobWorkerListKeyFor(UID workerID);

View File

@ -25,9 +25,11 @@
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tuple.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/BlobWorker.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/WaitFailure.h"
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include
@ -160,23 +162,37 @@ void getRanges(std::vector<std::pair<KeyRangeRef, bool>>& results, KeyRangeMap<b
}
}
// Assigns and revokes have slightly different semantics. Revokes are idempotent revoking from a particular worker, and
// will be retried to the same worker. Assigns are not to a specific worker. An initial worker will be chosen, and if it
// fails, the range will be revoked from that worker and put back in the queue, where it will then eventually be
// assigned to a different worker.
struct RangeAssignment {
KeyRange keyRange;
bool isAssign;
struct RangeAssignmentData {
bool continueAssignment;
std::vector<KeyRange> previousRanges;
RangeAssignment() {}
explicit RangeAssignment(KeyRange keyRange, bool isAssign) : keyRange(keyRange), isAssign(isAssign) {}
RangeAssignmentData() : continueAssignment(false) {}
RangeAssignmentData(bool continueAssignment, std::vector<KeyRange> previousRanges)
: continueAssignment(continueAssignment), previousRanges(previousRanges) {}
};
struct RangeRevokeData {
bool dispose;
RangeRevokeData() {}
RangeRevokeData(bool dispose) : dispose(dispose) {}
};
struct RangeAssignment {
bool isAssign;
KeyRange keyRange;
Optional<UID> worker;
// I tried doing this with a union and it was just kind of messy
Optional<RangeAssignmentData> assign;
Optional<RangeRevokeData> revoke;
};
// TODO: track worker's reads/writes eventually
struct BlobWorkerStats {
int numGranulesAssigned;
BlobWorkerStats(int numGranulesAssigned=0): numGranulesAssigned(numGranulesAssigned) {}
BlobWorkerStats(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {}
};
struct BlobManagerData {
@ -185,6 +201,7 @@ struct BlobManagerData {
std::unordered_map<UID, BlobWorkerInterface> workersById;
std::unordered_map<UID, BlobWorkerStats> workerStats; // mapping between workerID -> workerStats
std::unordered_map<UID, Future<Void>> workerMonitors;
KeyRangeMap<UID> workerAssignments;
KeyRangeMap<bool> knownBlobRanges;
@ -222,7 +239,7 @@ ACTOR Future<Void> nukeBlobWorkerData(BlobManagerData* bmData) {
}
}
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitNewRange(Reference<ReadYourWritesTransaction> tr, KeyRange range) {
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<ReadYourWritesTransaction> tr, KeyRange range) {
// TODO is it better to just pass empty metrics to estimated?
// TODO handle errors here by pulling out into its own transaction instead of the main loop's transaction, and
// retrying
@ -264,8 +281,8 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitNewRange(Reference<ReadYourWrit
static UID pickWorkerForAssign(BlobManagerData* bmData) {
int minGranulesAssigned = INT_MAX;
std::vector<UID> eligibleWorkers;
for (auto const &worker : bmData->workerStats) {
for (auto const& worker : bmData->workerStats) {
UID currId = worker.first;
int granulesAssigned = worker.second.numGranulesAssigned;
@ -282,33 +299,57 @@ static UID pickWorkerForAssign(BlobManagerData* bmData) {
ASSERT(eligibleWorkers.size() > 0);
int idx = deterministicRandom()->randomInt(0, eligibleWorkers.size());
if (BM_DEBUG) {
printf("picked worker %s, which has a minimal number (%d) of granules assigned\n",
eligibleWorkers[idx].toString().c_str(), minGranulesAssigned);
printf("picked worker %s, which has a minimal number (%d) of granules assigned\n",
eligibleWorkers[idx].toString().c_str(),
minGranulesAssigned);
}
return eligibleWorkers[idx];
}
ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment assignment, UID workerID, int64_t seqNo) {
AssignBlobRangeRequest req;
req.keyRange =
KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin), StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerSeqno = seqNo;
req.isAssign = assignment.isAssign;
if (BM_DEBUG) {
printf("BM %s %s range [%s - %s) @ (%lld, %lld)\n",
workerID.toString().c_str(),
req.isAssign ? "assigning" : "revoking",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.managerEpoch,
req.managerSeqno);
assignment.isAssign ? "assigning" : "revoking",
assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str(),
bmData->epoch,
seqNo);
}
try {
AssignBlobRangeReply rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req));
state AssignBlobRangeReply rep;
if (assignment.isAssign) {
ASSERT(assignment.assign.present());
ASSERT(!assignment.revoke.present());
AssignBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerSeqno = seqNo;
req.continueAssignment = assignment.assign.get().continueAssignment;
for (auto& it : assignment.assign.get().previousRanges) {
req.previousGranules.push_back_deep(req.arena, it);
}
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].assignBlobRangeRequest.getReply(req));
rep = _rep;
} else {
ASSERT(!assignment.assign.present());
ASSERT(assignment.revoke.present());
RevokeBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerSeqno = seqNo;
req.dispose = assignment.revoke.get().dispose;
AssignBlobRangeReply _rep = wait(bmData->workersById[workerID].revokeBlobRangeRequest.getReply(req));
rep = _rep;
}
if (!rep.epochOk) {
if (BM_DEBUG) {
printf("BM heard from BW that there is a new manager with higher epoch\n");
@ -327,13 +368,37 @@ ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment as
assignment.keyRange.end.printable().c_str());
}
// re-send revoke to queue to handle range being un-assigned from that worker before the new one
bmData->rangesToAssign.send(RangeAssignment(assignment.keyRange, false));
RangeAssignment revokeOld;
revokeOld.isAssign = false;
revokeOld.worker = workerID;
revokeOld.keyRange = assignment.keyRange;
revokeOld.revoke = RangeRevokeData(false);
bmData->rangesToAssign.send(revokeOld);
// send assignment back to queue as is, clearing designated worker if present
assignment.worker.reset();
bmData->rangesToAssign.send(assignment);
// FIXME: improvement would be to add history of failed workers to assignment so it can try other ones first
} else if (BM_DEBUG) {
printf("BM got error revoking range [%s - %s) from worker %s, ignoring\n",
assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str());
} else {
if (BM_DEBUG) {
printf("BM got error revoking range [%s - %s) from worker %s",
assignment.keyRange.begin.printable().c_str(),
assignment.keyRange.end.printable().c_str());
}
if (assignment.revoke.get().dispose) {
if (BM_DEBUG) {
printf(", retrying for dispose\n");
}
// send assignment back to queue as is, clearing designated worker if present
assignment.worker.reset();
bmData->rangesToAssign.send(assignment);
//
} else {
if (BM_DEBUG) {
printf(", ignoring\n");
}
}
}
}
return Void();
@ -354,12 +419,17 @@ ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange);
int count = 0;
for (auto& it : currentAssignments) {
ASSERT(it.value() == UID());
if (assignment.assign.get().continueAssignment) {
ASSERT(assignment.worker.present());
ASSERT(it.value() == assignment.worker.get());
} else {
ASSERT(it.value() == UID());
}
count++;
}
ASSERT(count == 1);
workerId = pickWorkerForAssign(bmData);
workerId = assignment.worker.present() ? assignment.worker.get() : pickWorkerForAssign(bmData);
bmData->workerAssignments.insert(assignment.keyRange, workerId);
bmData->workerStats[workerId].numGranulesAssigned += 1;
@ -377,7 +447,8 @@ ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
// It is fine for multiple disjoint sub-ranges to have the same sequence number since they were part of
// the same logical change
bmData->workerStats[it.value()].numGranulesAssigned -= 1;
addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo));
if (!assignment.worker.present() || assignment.worker.get() == it.value())
addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo));
}
bmData->workerAssignments.insert(assignment.keyRange, UID());
@ -385,14 +456,37 @@ ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
}
}
ACTOR Future<Void> checkManagerLock(Reference<ReadYourWritesTransaction> tr, BlobManagerData* bmData) {
Optional<Value> currentLockValue = wait(tr->get(blobManagerEpochKey));
ASSERT(currentLockValue.present());
int64_t currentEpoch = decodeBlobManagerEpochValue(currentLockValue.get());
if (currentEpoch != bmData->epoch) {
ASSERT(currentEpoch > bmData->epoch);
printf("BM %s found new epoch %d > %d in lock check\n",
bmData->id.toString().c_str(),
currentEpoch,
bmData->epoch);
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
// TODO different error?
throw granule_assignment_conflict();
}
tr->addReadConflictRange(singleKeyRange(blobManagerEpochKey));
return Void();
}
// TODO eventually CC should probably do this and pass it as part of recruitment?
ACTOR Future<int64_t> acquireManagerLock(BlobManagerData* bmData) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
// TODO verify: this should automatically have a read conflict range for blobManagerEpochKey, right?
Optional<Value> oldEpoch = wait(tr->get(blobManagerEpochKey));
state int64_t newEpoch;
if (oldEpoch.present()) {
@ -414,6 +508,8 @@ ACTOR Future<int64_t> acquireManagerLock(BlobManagerData* bmData) {
}
}
// FIXME: this does all logic in one transaction. Adding a giant range to an existing database to hybridize would spread
// require doing a ton of storage metrics calls, which we should split up across multiple transactions likely.
ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
loop {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
@ -445,14 +541,18 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
range.end.printable().c_str());
}
bmData->rangesToAssign.send(RangeAssignment(range, false));
RangeAssignment ra;
ra.isAssign = false;
ra.keyRange = range;
ra.revoke = RangeRevokeData(true); // dispose=true
bmData->rangesToAssign.send(ra);
}
state std::vector<Future<Standalone<VectorRef<KeyRef>>>> splitFutures;
// Divide new ranges up into equal chunks by using SS byte sample
for (KeyRangeRef range : rangesToAdd) {
// assert that this range contains no currently assigned ranges in this
splitFutures.push_back(splitNewRange(tr, range));
splitFutures.push_back(splitRange(tr, range));
}
for (auto f : splitFutures) {
@ -470,7 +570,11 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
printf(" [%s - %s)\n", range.begin.printable().c_str(), range.end.printable().c_str());
}
bmData->rangesToAssign.send(RangeAssignment(range, true));
RangeAssignment ra;
ra.isAssign = true;
ra.keyRange = range;
ra.assign = RangeAssignmentData(); // continue=false, no previous granules
bmData->rangesToAssign.send(ra);
}
}
@ -491,6 +595,213 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
}
}
static Key granuleLockKey(KeyRange granuleRange) {
Tuple k;
k.append(granuleRange.begin).append(granuleRange.end);
return k.getDataAsStandalone().withPrefix(blobGranuleLockKeys.begin);
}
// FIXME: propagate errors here
ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, KeyRange range) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
state Standalone<VectorRef<KeyRef>> newRanges;
state int64_t newLockSeqno = -1;
// first get ranges to split
loop {
try {
// redo split if previous txn try failed to calculate it
if (newRanges.empty()) {
Standalone<VectorRef<KeyRef>> _newRanges = wait(splitRange(tr, range));
newRanges = _newRanges;
}
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
if (newRanges.size() == 2) {
// not large enough to split, just reassign back to worker
if (BM_DEBUG) {
printf("Not splitting existing range [%s - %s). Continuing assignment to %s\n",
range.begin.printable().c_str(),
range.end.printable().c_str(),
currentWorkerId.toString().c_str());
}
RangeAssignment raContinue;
raContinue.isAssign = true;
raContinue.worker = currentWorkerId;
raContinue.keyRange = range;
raContinue.assign =
RangeAssignmentData(true, std::vector<KeyRange>()); // continue, no "previous" range to do handover
bmData->rangesToAssign.send(raContinue);
return Void();
}
// Need to split range. Persist intent to split and split metadata to DB BEFORE sending split requests
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
ASSERT(newRanges.size() >= 2);
// make sure we're still manager when this transaction gets committed
wait(checkManagerLock(tr, bmData));
// acquire lock for old granule to make sure nobody else modifies it
state Key lockKey = granuleLockKey(range);
Optional<Value> lockValue = wait(tr->get(lockKey));
ASSERT(lockValue.present());
std::tuple<int64_t, int64_t, UID> prevGranuleLock = decodeBlobGranuleLockValue(lockValue.get());
if (std::get<0>(prevGranuleLock) > bmData->epoch) {
printf("BM %s found a higher epoch %d than %d for granule lock of [%s - %s)\n",
bmData->id.toString().c_str(),
std::get<0>(prevGranuleLock),
bmData->epoch,
range.begin.printable().c_str(),
range.end.printable().c_str());
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
return Void();
}
if (newLockSeqno == -1) {
newLockSeqno = bmData->seqNo;
bmData->seqNo++;
ASSERT(newLockSeqno > std::get<1>(prevGranuleLock));
} else {
// previous transaction could have succeeded but got commit_unknown_result
ASSERT(newLockSeqno >= std::get<1>(prevGranuleLock));
}
tr->set(lockKey, blobGranuleLockValueFor(bmData->epoch, newLockSeqno, std::get<2>(prevGranuleLock)));
// set up split metadata
for (int i = 0; i < newRanges.size() - 1; i++) {
Tuple key;
key.append(range.begin).append(range.end).append(newRanges[i]);
tr->set(key.getDataAsStandalone().withPrefix(blobGranuleSplitKeys.begin),
blobGranuleSplitValueFor(BlobGranuleSplitState::Started));
// acquire granule lock so nobody else can make changes to this granule.
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
if (BM_DEBUG) {
printf("Splitting range [%s - %s) into:\n", range.begin.printable().c_str(), range.end.printable().c_str());
for (int i = 0; i < newRanges.size() - 1; i++) {
printf(" [%s - %s)\n", newRanges[i].printable().c_str(), newRanges[i + 1].printable().c_str());
}
}
// transaction committed, send range assignments
// revoke from current worker
RangeAssignment raRevoke;
raRevoke.isAssign = false;
raRevoke.worker = currentWorkerId;
raRevoke.keyRange = range;
raRevoke.revoke = RangeRevokeData(false); // not a dispose
bmData->rangesToAssign.send(raRevoke);
std::vector<KeyRange> originalRange;
originalRange.push_back(range);
for (int i = 0; i < newRanges.size() - 1; i++) {
// reassign new range and do handover of previous range
RangeAssignment raAssignSplit;
raAssignSplit.isAssign = true;
raAssignSplit.keyRange = KeyRangeRef(newRanges[i], newRanges[i + 1]);
raAssignSplit.assign = RangeAssignmentData(false, originalRange);
// don't care who this range gets assigned to
bmData->rangesToAssign.send(raAssignSplit);
}
return Void();
}
ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
try {
state PromiseStream<Future<Void>> addActor;
state Future<Void> collection = actorCollection(addActor.getFuture());
state Future<Void> waitFailure = waitFailureClient(bwInterf.waitFailure, SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
state ReplyPromiseStream<GranuleStatusReply> statusStream =
bwInterf.granuleStatusStreamRequest.getReplyStream(GranuleStatusStreamRequest(bmData->epoch));
state KeyRangeMap<std::pair<int64_t, int64_t>> lastSeenSeqno;
loop choose {
when(wait(waitFailure)) {
// FIXME: actually handle this!!
if (BM_DEBUG) {
printf("BM %lld detected BW %s is dead\n", bmData->epoch, bwInterf.id().toString().c_str());
}
return Void();
}
when(GranuleStatusReply _rep = waitNext(statusStream.getFuture())) {
GranuleStatusReply rep = _rep;
if (BM_DEBUG) {
printf("BM %lld got status of [%s - %s) @ (%lld, %lld) from BW %s: %s\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno,
bwInterf.id().toString().c_str(),
rep.doSplit ? "split" : "");
}
if (rep.epoch > bmData->epoch) {
if (BM_DEBUG) {
printf("BM heard from BW that there is a new manager with higher epoch\n");
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
}
// TODO maybe this won't be true eventually, but right now the only time the blob worker reports back is
// to split the range.
ASSERT(rep.doSplit);
// FIXME: only evaluate for split if this worker currently owns the granule in this blob manager's
// mapping
auto lastReqForGranule = lastSeenSeqno.rangeContaining(rep.granuleRange.begin);
if (rep.granuleRange.begin == lastReqForGranule.begin() &&
rep.granuleRange.end == lastReqForGranule.end() && rep.epoch == lastReqForGranule.value().first &&
rep.seqno == lastReqForGranule.value().second) {
if (BM_DEBUG) {
printf("Manager %lld received repeat status for the same granule [%s - %s) @ %lld, ignoring.",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str());
}
} else {
if (BM_DEBUG) {
printf("Manager %lld evaluating [%s - %s) for split\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str());
}
lastSeenSeqno.insert(rep.granuleRange, std::pair(rep.epoch, rep.seqno));
addActor.send(maybeSplitRange(bmData, bwInterf.id(), rep.granuleRange));
}
}
}
} catch (Error& e) {
// FIXME: forward errors somewhere from here
if (BM_DEBUG) {
printf("BM got unexpected error %s monitoring BW %s\n", e.name(), bwInterf.id().toString().c_str());
}
throw e;
}
}
// TODO this is only for chaos testing right now!! REMOVE LATER
ACTOR Future<Void> rangeMover(BlobManagerData* bmData) {
loop {
@ -509,8 +820,19 @@ ACTOR Future<Void> rangeMover(BlobManagerData* bmData) {
randomRange.value().toString().c_str());
}
bmData->rangesToAssign.send(RangeAssignment(randomRange.range(), false));
bmData->rangesToAssign.send(RangeAssignment(randomRange.range(), true));
RangeAssignment revokeOld;
revokeOld.isAssign = false;
revokeOld.keyRange = randomRange.range();
revokeOld.worker = randomRange.value();
revokeOld.revoke = RangeRevokeData(false);
bmData->rangesToAssign.send(revokeOld);
RangeAssignment assignNew;
assignNew.isAssign = true;
assignNew.keyRange = randomRange.range();
assignNew.assign =
RangeAssignmentData(false, std::vector<KeyRange>()); // not a continue, no boundary change
bmData->rangesToAssign.send(assignNew);
break;
}
}
@ -555,6 +877,7 @@ ACTOR Future<Void> blobManager(LocalityData locality, Reference<AsyncVar<ServerD
bwInterf.initEndpoints();
self.workersById.insert({ bwInterf.id(), bwInterf });
self.workerStats.insert({ bwInterf.id(), BlobWorkerStats() });
self.workerMonitors.insert({ bwInterf.id(), monitorBlobWorker(&self, bwInterf) });
addActor.send(blobWorker(bwInterf, dbInfo));
}

File diff suppressed because it is too large Load Diff

View File

@ -1561,6 +1561,16 @@ ACTOR Future<RangeFeedReply> getRangeFeedMutations(StorageServer* data, RangeFee
if (data->version.get() < req.begin) {
wait(data->version.whenAtLeast(req.begin));
}
// TODO REMOVE this super hacky fix once evan finishes change feeds, the above wait doesn't work apparently
state int waitCnt = 0;
while (data->uidRangeFeed.count(req.rangeID) == 0) {
wait(delay(0.05));
waitCnt++;
if ((waitCnt & (waitCnt - 1)) == 0) {
printf("Waiting for change feed %s %d times\n", req.rangeID.printable().c_str(), waitCnt);
}
}
ASSERT(data->uidRangeFeed.count(req.rangeID) > 0);
auto& feedInfo = data->uidRangeFeed[req.rangeID];
/*printf("SS processing range feed req %s for version [%lld - %lld)\n",
req.rangeID.printable().c_str(),
@ -3878,6 +3888,12 @@ private:
rangeFeedInfo->range = rangeFeedRange;
rangeFeedInfo->id = rangeFeedId;
rangeFeedInfo->emptyVersion = currentVersion - 1;
printf("SS %s creating change feed %s for [%s - %s) at version %lld\n",
data->thisServerID.toString().c_str(),
rangeFeedId.toString().c_str(),
rangeFeedRange.begin.printable().c_str(),
rangeFeedRange.end.printable().c_str(),
currentVersion);
data->uidRangeFeed[rangeFeedId] = rangeFeedInfo;
auto rs = data->keyRangeFeed.modify(rangeFeedRange);
for (auto r = rs.begin(); r != rs.end(); ++r) {

View File

@ -59,10 +59,11 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
BlobGranuleVerifierWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
doSetup = !clientId; // only do this on the "first" client
// FIXME: don't do the delay in setup, as that delays the start of all workloads
minDelay = getOption(options, LiteralStringRef("minDelay"), 0.0);
maxDelay = getOption(options, LiteralStringRef("minDelay"), 60.0);
maxDelay = getOption(options, LiteralStringRef("maxDelay"), 0.0);
testDuration = getOption(options, LiteralStringRef("testDuration"), 120.0);
timeTravelLimit = getOption(options, LiteralStringRef("timeTravelLimit"), 60.0);
timeTravelLimit = getOption(options, LiteralStringRef("timeTravelLimit"), testDuration);
timeTravelBufferSize = getOption(options, LiteralStringRef("timeTravelBufferSize"), 100000000);
threads = getOption(options, LiteralStringRef("threads"), 1);
ASSERT(threads >= 1);
@ -96,6 +97,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
wait(krmSetRange(tr, blobRangeKeys.begin, KeyRange(normalKeys), LiteralStringRef("1")));
wait(tr->commit());
printf("Successfully set up blob granule range for normalKeys\n");
TraceEvent("BlobGranuleVerifierSetup");
return Void();
} catch (Error& e) {
wait(tr->onError(e));
@ -106,7 +108,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
std::string description() const override { return "BlobGranuleVerifier"; }
Future<Void> setup(Database const& cx) override {
if (doSetup) {
/// TODO make only one client do this!!! others wait
double initialDelay = deterministicRandom()->random01() * (maxDelay - minDelay) + minDelay;
printf("BGW setup initial delay of %.3f\n", initialDelay);
return setUpBlobRange(cx, delay(initialDelay));
@ -223,6 +224,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
state std::map<double, OldRead> timeTravelChecks;
state int64_t timeTravelChecksMemory = 0;
TraceEvent("BlobGranuleVerifierStart");
printf("BGV thread starting\n");
// wait for first set of ranges to be loaded
@ -264,16 +266,18 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
self->bytesRead += fdb.first.expectedSize();
self->initialReads++;
// TODO increase frequency a lot!! just for initial testing
wait(poisson(&last, 5.0));
// wait(poisson(&last, 0.1));
} catch (Error& e) {
printf("BGVerifier got error %s\n", e.name());
if (e.code() == error_code_operation_cancelled) {
return Void();
throw;
}
if (e.code() != error_code_transaction_too_old && e.code() != error_code_wrong_shard_server) {
printf("BGVerifier got unexpected error %s\n", e.name());
}
self->errors++;
}
// TODO increase frequency a lot!! just for initial testing
wait(poisson(&last, 5.0));
// wait(poisson(&last, 0.1));
}
}
@ -329,6 +333,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
printf(" %lld rows\n", self->rowsRead);
printf(" %lld bytes\n", self->bytesRead);
printf(" %d final granule checks\n", checks);
TraceEvent("BlobGranuleVerifierChecked");
return self->mismatches == 0 && checks > 0;
}

View File

@ -0,0 +1,21 @@
[[test]]
testTitle = 'BlobGranuleCorrectnessTestLarge'
[[test.workload]]
testName = 'ReadWrite'
testDuration = 200.0
transactionsPerSecond = 1000
writesPerTransactionA = 0
readsPerTransactionA = 10
writesPerTransactionB = 10
readsPerTransactionB = 1
alpha = 0.5
nodeCount = 2000000
valueBytes = 128
discardEdgeMeasurements = false
warmingDelay = 10.0
setup = false
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 200.0