Reworked all of the system data to encode granule data more efficiently for persistence
This commit is contained in:
parent
f3c44c568f
commit
b5074fd597
|
@ -117,4 +117,16 @@ struct BlobGranuleChunkRef {
|
|||
};
|
||||
|
||||
enum BlobGranuleSplitState { Unknown = 0, Started = 1, Assigned = 2, Done = 3 };
|
||||
|
||||
struct BlobGranuleHistoryValue {
|
||||
constexpr static FileIdentifier file_identifier = 991434;
|
||||
UID granuleID;
|
||||
VectorRef<std::pair<KeyRangeRef, Version>> parentGranules;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, granuleID, parentGranules);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
|
@ -155,16 +155,34 @@ struct GranuleStatusReply : public ReplyPromiseStreamReply {
|
|||
bool doSplit;
|
||||
int64_t epoch;
|
||||
int64_t seqno;
|
||||
UID granuleID;
|
||||
Version startVersion;
|
||||
Version latestVersion;
|
||||
|
||||
GranuleStatusReply() {}
|
||||
explicit GranuleStatusReply(KeyRange range, bool doSplit, int64_t epoch, int64_t seqno)
|
||||
: granuleRange(range), doSplit(doSplit), epoch(epoch), seqno(seqno) {}
|
||||
explicit GranuleStatusReply(KeyRange range,
|
||||
bool doSplit,
|
||||
int64_t epoch,
|
||||
int64_t seqno,
|
||||
UID granuleID,
|
||||
Version startVersion,
|
||||
Version latestVersion)
|
||||
: granuleRange(range), doSplit(doSplit), epoch(epoch), seqno(seqno), granuleID(granuleID),
|
||||
startVersion(startVersion), latestVersion(latestVersion) {}
|
||||
|
||||
int expectedSize() const { return sizeof(GranuleStatusReply) + granuleRange.expectedSize(); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, granuleRange, doSplit, epoch, seqno);
|
||||
serializer(ar,
|
||||
ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence,
|
||||
granuleRange,
|
||||
doSplit,
|
||||
epoch,
|
||||
seqno,
|
||||
granuleID,
|
||||
startVersion,
|
||||
latestVersion);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -73,6 +73,7 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( KEY_SIZE_LIMIT, 1e4 );
|
||||
init( SYSTEM_KEY_SIZE_LIMIT, 3e4 );
|
||||
init( VALUE_SIZE_LIMIT, 1e5 );
|
||||
// TODO find better solution?
|
||||
init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 );// if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - 31;//serverKeysPrefixFor(UID()).size() - 1;
|
||||
init( METADATA_VERSION_CACHE_SIZE, 1000 );
|
||||
|
||||
|
|
|
@ -7296,12 +7296,16 @@ ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db,
|
|||
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
|
||||
granuleStartKey = blobGranuleMapping[i].key;
|
||||
granuleEndKey = blobGranuleMapping[i + 1].key;
|
||||
// if this was a time travel and the request returned larger bounds, skip this chunk
|
||||
if (granuleEndKey <= keyRange.begin) {
|
||||
continue;
|
||||
}
|
||||
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
|
||||
// prune first/last granules to requested range
|
||||
if (i == 0) {
|
||||
if (keyRange.begin > granuleStartKey) {
|
||||
granuleStartKey = keyRange.begin;
|
||||
}
|
||||
if (i == blobGranuleMapping.size() - 2) {
|
||||
if (keyRange.end < granuleEndKey) {
|
||||
granuleEndKey = keyRange.end;
|
||||
}
|
||||
|
||||
|
|
|
@ -1100,40 +1100,131 @@ const KeyRangeRef blobRangeKeys(LiteralStringRef("\xff\x02/blobRange/"), Literal
|
|||
const KeyRef blobManagerEpochKey = LiteralStringRef("\xff\x02/blobManagerEpoch");
|
||||
|
||||
const Value blobManagerEpochValueFor(int64_t epoch) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr << epoch;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
int64_t decodeBlobManagerEpochValue(ValueRef const& value) {
|
||||
int64_t epoch;
|
||||
BinaryReader reader(value, Unversioned());
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> epoch;
|
||||
return epoch;
|
||||
}
|
||||
|
||||
// blob range file data
|
||||
// blob granule data
|
||||
const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff\x02/bgf/"), LiteralStringRef("\xff\x02/bgf0"));
|
||||
const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), LiteralStringRef("\xff\x02/bgm0"));
|
||||
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0"));
|
||||
const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), LiteralStringRef("\xff\x02/bgs0"));
|
||||
const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0"));
|
||||
|
||||
const uint8_t BG_FILE_TYPE_DELTA = 'D';
|
||||
const uint8_t BG_FILE_TYPE_SNAPSHOT = 'S';
|
||||
|
||||
// uids in blob granule file/split keys are serialized big endian so that incrementUID can create a prefix range for
|
||||
// just that UID.
|
||||
|
||||
// TODO: could move this to UID or sometwhere else?
|
||||
UID incrementUID(UID uid) {
|
||||
uint64_t first = uid.first();
|
||||
uint64_t second = uid.second() + 1;
|
||||
// handle overflow increment of second
|
||||
if (second == 0) {
|
||||
first++;
|
||||
// FIXME: assume we never generate max uid, for now
|
||||
ASSERT(first != 0);
|
||||
}
|
||||
|
||||
return UID(first, second);
|
||||
}
|
||||
|
||||
void serializeUIDBigEndian(BinaryWriter& wr, UID uid) {
|
||||
wr << bigEndian64(uid.first());
|
||||
wr << bigEndian64(uid.second());
|
||||
}
|
||||
|
||||
UID deserializeUIDBigEndian(BinaryReader& reader) {
|
||||
uint64_t first;
|
||||
uint64_t second;
|
||||
reader >> first;
|
||||
reader >> second;
|
||||
return UID(bigEndian64(first), bigEndian64(second));
|
||||
}
|
||||
|
||||
const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion) {
|
||||
ASSERT(fileType == 'D' || fileType == 'S');
|
||||
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr.serializeBytes(blobGranuleFileKeys.begin);
|
||||
serializeUIDBigEndian(wr, granuleID);
|
||||
wr << fileType;
|
||||
wr << bigEndian64(fileVersion);
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
std::tuple<UID, uint8_t, Version> decodeBlobGranuleFileKey(KeyRef const& key) {
|
||||
uint8_t fileType;
|
||||
Version fileVersion;
|
||||
BinaryReader reader(key.removePrefix(blobGranuleFileKeys.begin), AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
UID granuleID = deserializeUIDBigEndian(reader);
|
||||
reader >> fileType;
|
||||
reader >> fileVersion;
|
||||
ASSERT(fileType == 'D' || fileType == 'S');
|
||||
return std::tuple(granuleID, fileType, bigEndian64(fileVersion));
|
||||
}
|
||||
|
||||
Key bgFilePrefixKey(UID gid) {
|
||||
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr.serializeBytes(blobGranuleFileKeys.begin);
|
||||
serializeUIDBigEndian(wr, gid);
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
const KeyRange blobGranuleFileKeyRangeFor(UID granuleID) {
|
||||
return KeyRangeRef(bgFilePrefixKey(granuleID), bgFilePrefixKey(incrementUID(granuleID)));
|
||||
}
|
||||
|
||||
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length) {
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr << filename;
|
||||
wr << offset;
|
||||
wr << length;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) {
|
||||
StringRef filename;
|
||||
int64_t offset;
|
||||
int64_t length;
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> filename;
|
||||
reader >> offset;
|
||||
reader >> length;
|
||||
return std::tuple(filename, offset, length);
|
||||
}
|
||||
|
||||
const Value blobGranuleMappingValueFor(UID const& workerID) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr << workerID;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
UID decodeBlobGranuleMappingValue(ValueRef const& value) {
|
||||
UID workerID;
|
||||
BinaryReader reader(value, Unversioned());
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> workerID;
|
||||
return workerID;
|
||||
}
|
||||
|
||||
const Key blobGranuleLockKeyFor(KeyRangeRef const& keyRange) {
|
||||
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr.serializeBytes(blobGranuleLockKeys.begin);
|
||||
wr << keyRange;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
const Value blobGranuleLockValueFor(int64_t epoch, int64_t seqno, UID changeFeedId) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr << epoch;
|
||||
wr << seqno;
|
||||
wr << changeFeedId;
|
||||
|
@ -1143,15 +1234,44 @@ const Value blobGranuleLockValueFor(int64_t epoch, int64_t seqno, UID changeFeed
|
|||
std::tuple<int64_t, int64_t, UID> decodeBlobGranuleLockValue(const ValueRef& value) {
|
||||
int64_t epoch, seqno;
|
||||
UID changeFeedId;
|
||||
BinaryReader reader(value, Unversioned());
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> epoch;
|
||||
reader >> seqno;
|
||||
reader >> changeFeedId;
|
||||
return std::make_tuple(epoch, seqno, changeFeedId);
|
||||
}
|
||||
|
||||
const Key blobGranuleSplitKeyFor(UID const& parentGranuleID, UID const& granuleID) {
|
||||
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr.serializeBytes(blobGranuleSplitKeys.begin);
|
||||
serializeUIDBigEndian(wr, parentGranuleID);
|
||||
serializeUIDBigEndian(wr, granuleID);
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
std::pair<UID, UID> decodeBlobGranuleSplitKey(KeyRef const& key) {
|
||||
|
||||
BinaryReader reader(key.removePrefix(blobGranuleSplitKeys.begin),
|
||||
AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
|
||||
UID parentGranuleID = deserializeUIDBigEndian(reader);
|
||||
UID currentGranuleID = deserializeUIDBigEndian(reader);
|
||||
return std::pair(parentGranuleID, currentGranuleID);
|
||||
}
|
||||
|
||||
Key bgSplitPrefixKeyFor(UID gid) {
|
||||
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr.serializeBytes(blobGranuleSplitKeys.begin);
|
||||
serializeUIDBigEndian(wr, gid);
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
const KeyRange blobGranuleSplitKeyRangeFor(UID const& parentGranuleID) {
|
||||
return KeyRangeRef(bgSplitPrefixKeyFor(parentGranuleID), bgSplitPrefixKeyFor(incrementUID(parentGranuleID)));
|
||||
}
|
||||
|
||||
const Value blobGranuleSplitValueFor(BlobGranuleSplitState st) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr << st;
|
||||
return addVersionStampAtEnd(wr.toValue());
|
||||
}
|
||||
|
@ -1159,49 +1279,67 @@ const Value blobGranuleSplitValueFor(BlobGranuleSplitState st) {
|
|||
std::pair<BlobGranuleSplitState, Version> decodeBlobGranuleSplitValue(const ValueRef& value) {
|
||||
BlobGranuleSplitState st;
|
||||
Version v;
|
||||
BinaryReader reader(value, Unversioned());
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> st;
|
||||
reader >> v;
|
||||
return std::pair(st, v);
|
||||
}
|
||||
|
||||
// const Value blobGranuleHistoryValueFor(VectorRef<KeyRangeRef> const& parentGranules);
|
||||
// VectorRef<KeyRangeRef> decodeBlobGranuleHistoryValue(ValueRef const& value);
|
||||
|
||||
const Value blobGranuleHistoryValueFor(Standalone<VectorRef<KeyRangeRef>> const& parentGranules) {
|
||||
// FIXME: make separate version for BG
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed()));
|
||||
wr << parentGranules;
|
||||
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version) {
|
||||
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr.serializeBytes(blobGranuleHistoryKeys.begin);
|
||||
wr << range;
|
||||
wr << bigEndian64(version);
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
Standalone<VectorRef<KeyRangeRef>> decodeBlobGranuleHistoryValue(const ValueRef& value) {
|
||||
Standalone<VectorRef<KeyRangeRef>> parentGranules;
|
||||
std::pair<KeyRange, Version> decodeBlobGranuleHistoryKey(const KeyRef& key) {
|
||||
KeyRangeRef keyRange;
|
||||
Version version;
|
||||
BinaryReader reader(key.removePrefix(blobGranuleHistoryKeys.begin),
|
||||
AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
reader >> keyRange;
|
||||
reader >> version;
|
||||
return std::make_pair(keyRange, bigEndian64(version));
|
||||
}
|
||||
|
||||
const KeyRange blobGranuleHistoryKeyRangeFor(KeyRangeRef const& range) {
|
||||
return KeyRangeRef(blobGranuleHistoryKeyFor(range, 0), blobGranuleHistoryKeyFor(range, MAX_VERSION));
|
||||
}
|
||||
|
||||
const Value blobGranuleHistoryValueFor(Standalone<BlobGranuleHistoryValue> const& historyValue) {
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr << historyValue;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
Standalone<BlobGranuleHistoryValue> decodeBlobGranuleHistoryValue(const ValueRef& value) {
|
||||
Standalone<BlobGranuleHistoryValue> historyValue;
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> parentGranules;
|
||||
return parentGranules;
|
||||
reader >> historyValue;
|
||||
return historyValue;
|
||||
}
|
||||
|
||||
const KeyRangeRef blobWorkerListKeys(LiteralStringRef("\xff\x02/bwList/"), LiteralStringRef("\xff\x02/bwList0"));
|
||||
|
||||
const Key blobWorkerListKeyFor(UID workerID) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
wr.serializeBytes(blobWorkerListKeys.begin);
|
||||
wr << workerID;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
const Value blobWorkerListValue(BlobWorkerInterface const& worker) {
|
||||
return ObjectWriter::toValue(worker, IncludeVersion());
|
||||
}
|
||||
|
||||
UID decodeBlobWorkerListKey(KeyRef const& key) {
|
||||
UID workerID;
|
||||
BinaryReader reader(key.removePrefix(blobWorkerListKeys.begin), Unversioned());
|
||||
BinaryReader reader(key.removePrefix(blobWorkerListKeys.begin), AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||
reader >> workerID;
|
||||
return workerID;
|
||||
}
|
||||
|
||||
const Value blobWorkerListValue(BlobWorkerInterface const& worker) {
|
||||
return ObjectWriter::toValue(worker, IncludeVersion(ProtocolVersion::withBlobGranule()));
|
||||
}
|
||||
|
||||
BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) {
|
||||
BlobWorkerInterface interf;
|
||||
ObjectReader reader(value.begin(), IncludeVersion());
|
||||
|
|
|
@ -527,7 +527,10 @@ int64_t decodeBlobManagerEpochValue(ValueRef const& value);
|
|||
|
||||
// blob granule keys
|
||||
|
||||
// \xff\x02/bgf/(startKey, endKey, {snapshot|delta}, version) = [[filename]]
|
||||
extern const uint8_t BG_FILE_TYPE_DELTA;
|
||||
extern const uint8_t BG_FILE_TYPE_SNAPSHOT;
|
||||
|
||||
// \xff\x02/bgf/(granuleID, {snapshot|delta}, version) = [[filename]]
|
||||
extern const KeyRangeRef blobGranuleFileKeys;
|
||||
|
||||
// TODO could shrink the size of the mapping keyspace by using something similar to tags instead of UIDs. We'd probably
|
||||
|
@ -536,35 +539,53 @@ extern const KeyRangeRef blobGranuleFileKeys;
|
|||
// \xff\x02/bgm/[[begin]] = [[BlobWorkerUID]]
|
||||
extern const KeyRangeRef blobGranuleMappingKeys;
|
||||
|
||||
// \xff\x02/bgl/(begin,end) = (epoch, seqno, changefeed id)
|
||||
// \xff\x02/bgl/(begin,end) = (epoch, seqno, granuleID)
|
||||
extern const KeyRangeRef blobGranuleLockKeys;
|
||||
|
||||
// \xff\x02/bgs/(oldbegin,oldend,newbegin) = state
|
||||
// \xff\x02/bgs/(parentGranuleID, granuleID) = state
|
||||
extern const KeyRangeRef blobGranuleSplitKeys;
|
||||
|
||||
// \xff\x02/bgh/(start,end) = [(oldbegin, oldend)]
|
||||
// \xff\x02/bgh/(start,end,version) = { granuleID, [parentGranuleHistoryKeys] }
|
||||
extern const KeyRangeRef blobGranuleHistoryKeys;
|
||||
|
||||
// FIXME: better way to represent file type than a string ref?
|
||||
const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion);
|
||||
std::tuple<UID, uint8_t, Version> decodeBlobGranuleFileKey(ValueRef const& value);
|
||||
const KeyRange blobGranuleFileKeyRangeFor(UID granuleID);
|
||||
|
||||
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length);
|
||||
std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value);
|
||||
|
||||
const Value blobGranuleMappingValueFor(UID const& workerID);
|
||||
UID decodeBlobGranuleMappingValue(ValueRef const& value);
|
||||
|
||||
const Key blobGranuleLockKeyFor(KeyRangeRef const& granuleRange);
|
||||
|
||||
const Value blobGranuleLockValueFor(int64_t epochNum, int64_t sequenceNum, UID changeFeedId);
|
||||
// FIXME: maybe just define a struct?
|
||||
std::tuple<int64_t, int64_t, UID> decodeBlobGranuleLockValue(ValueRef const& value);
|
||||
|
||||
const Key blobGranuleSplitKeyFor(UID const& parentGranuleID, UID const& granuleID);
|
||||
std::pair<UID, UID> decodeBlobGranuleSplitKey(KeyRef const& key);
|
||||
const KeyRange blobGranuleSplitKeyRangeFor(UID const& parentGranuleID);
|
||||
|
||||
// these are versionstamped
|
||||
const Value blobGranuleSplitValueFor(BlobGranuleSplitState st);
|
||||
std::pair<BlobGranuleSplitState, Version> decodeBlobGranuleSplitValue(ValueRef const& value);
|
||||
|
||||
const Value blobGranuleHistoryValueFor(Standalone<VectorRef<KeyRangeRef>> const& parentGranules);
|
||||
Standalone<VectorRef<KeyRangeRef>> decodeBlobGranuleHistoryValue(ValueRef const& value);
|
||||
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version);
|
||||
std::pair<KeyRange, Version> decodeBlobGranuleHistoryKey(KeyRef const& value);
|
||||
const KeyRange blobGranuleHistoryKeyRangeFor(KeyRangeRef const& range);
|
||||
|
||||
const Value blobGranuleHistoryValueFor(Standalone<BlobGranuleHistoryValue> const& historyValue);
|
||||
Standalone<BlobGranuleHistoryValue> decodeBlobGranuleHistoryValue(ValueRef const& value);
|
||||
|
||||
// \xff/bwl/[[BlobWorkerID]] = [[BlobWorkerInterface]]
|
||||
extern const KeyRangeRef blobWorkerListKeys;
|
||||
|
||||
const Key blobWorkerListKeyFor(UID workerID);
|
||||
const Value blobWorkerListValue(BlobWorkerInterface const& interface);
|
||||
UID decodeBlobWorkerListKey(KeyRef const& key);
|
||||
const Value blobWorkerListValue(BlobWorkerInterface const& interface);
|
||||
BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value);
|
||||
|
||||
#pragma clang diagnostic pop
|
||||
|
|
|
@ -25,7 +25,6 @@
|
|||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
#include "fdbserver/BlobManagerInterface.h"
|
||||
#include "fdbserver/BlobWorker.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -573,14 +572,13 @@ ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
|
|||
}
|
||||
}
|
||||
|
||||
static Key granuleLockKey(KeyRange granuleRange) {
|
||||
Tuple k;
|
||||
k.append(granuleRange.begin).append(granuleRange.end);
|
||||
return k.getDataAsStandalone().withPrefix(blobGranuleLockKeys.begin);
|
||||
}
|
||||
|
||||
// FIXME: propagate errors here
|
||||
ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, KeyRange range) {
|
||||
ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
||||
UID currentWorkerId,
|
||||
KeyRange granuleRange,
|
||||
UID granuleID,
|
||||
Version granuleStartVersion,
|
||||
Version latestVersion) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
state Standalone<VectorRef<KeyRef>> newRanges;
|
||||
state int64_t newLockSeqno = -1;
|
||||
|
@ -590,7 +588,7 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
|
|||
try {
|
||||
// redo split if previous txn try failed to calculate it
|
||||
if (newRanges.empty()) {
|
||||
Standalone<VectorRef<KeyRef>> _newRanges = wait(splitRange(tr, range));
|
||||
Standalone<VectorRef<KeyRef>> _newRanges = wait(splitRange(tr, granuleRange));
|
||||
newRanges = _newRanges;
|
||||
}
|
||||
break;
|
||||
|
@ -603,14 +601,14 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
|
|||
// not large enough to split, just reassign back to worker
|
||||
if (BM_DEBUG) {
|
||||
printf("Not splitting existing range [%s - %s). Continuing assignment to %s\n",
|
||||
range.begin.printable().c_str(),
|
||||
range.end.printable().c_str(),
|
||||
granuleRange.begin.printable().c_str(),
|
||||
granuleRange.end.printable().c_str(),
|
||||
currentWorkerId.toString().c_str());
|
||||
}
|
||||
RangeAssignment raContinue;
|
||||
raContinue.isAssign = true;
|
||||
raContinue.worker = currentWorkerId;
|
||||
raContinue.keyRange = range;
|
||||
raContinue.keyRange = granuleRange;
|
||||
raContinue.assign = RangeAssignmentData(true); // continue assignment and re-snapshot
|
||||
bmData->rangesToAssign.send(raContinue);
|
||||
return Void();
|
||||
|
@ -628,7 +626,7 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
|
|||
wait(checkManagerLock(tr, bmData));
|
||||
|
||||
// acquire lock for old granule to make sure nobody else modifies it
|
||||
state Key lockKey = granuleLockKey(range);
|
||||
state Key lockKey = blobGranuleLockKeyFor(granuleRange);
|
||||
Optional<Value> lockValue = wait(tr->get(lockKey));
|
||||
ASSERT(lockValue.present());
|
||||
std::tuple<int64_t, int64_t, UID> prevGranuleLock = decodeBlobGranuleLockValue(lockValue.get());
|
||||
|
@ -638,8 +636,8 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
|
|||
bmData->id.toString().c_str(),
|
||||
std::get<0>(prevGranuleLock),
|
||||
bmData->epoch,
|
||||
range.begin.printable().c_str(),
|
||||
range.end.printable().c_str());
|
||||
granuleRange.begin.printable().c_str(),
|
||||
granuleRange.end.printable().c_str());
|
||||
}
|
||||
|
||||
if (bmData->iAmReplaced.canBeSet()) {
|
||||
|
@ -659,20 +657,24 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
|
|||
// acquire granule lock so nobody else can make changes to this granule.
|
||||
tr->set(lockKey, blobGranuleLockValueFor(bmData->epoch, newLockSeqno, std::get<2>(prevGranuleLock)));
|
||||
|
||||
Standalone<VectorRef<KeyRangeRef>> history;
|
||||
history.push_back(history.arena(), range);
|
||||
Value historyValue = blobGranuleHistoryValueFor(history);
|
||||
// set up split metadata
|
||||
for (int i = 0; i < newRanges.size() - 1; i++) {
|
||||
Tuple splitKey;
|
||||
splitKey.append(range.begin).append(range.end).append(newRanges[i]);
|
||||
tr->atomicOp(splitKey.getDataAsStandalone().withPrefix(blobGranuleSplitKeys.begin),
|
||||
UID newGranuleID = deterministicRandom()->randomUniqueID();
|
||||
|
||||
Key splitKey = blobGranuleSplitKeyFor(granuleID, newGranuleID);
|
||||
|
||||
tr->atomicOp(splitKey,
|
||||
blobGranuleSplitValueFor(BlobGranuleSplitState::Started),
|
||||
MutationRef::SetVersionstampedValue);
|
||||
|
||||
Tuple historyKey;
|
||||
historyKey.append(newRanges[i]).append(newRanges[i + 1]);
|
||||
tr->set(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin), historyValue);
|
||||
Key historyKey = blobGranuleHistoryKeyFor(KeyRangeRef(newRanges[i], newRanges[i + 1]), latestVersion);
|
||||
|
||||
Standalone<BlobGranuleHistoryValue> historyValue;
|
||||
historyValue.granuleID = newGranuleID;
|
||||
historyValue.parentGranules.push_back(historyValue.arena(),
|
||||
std::pair(granuleRange, granuleStartVersion));
|
||||
|
||||
tr->set(historyKey, blobGranuleHistoryValueFor(historyValue));
|
||||
}
|
||||
|
||||
wait(tr->commit());
|
||||
|
@ -690,8 +692,8 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
|
|||
|
||||
if (BM_DEBUG) {
|
||||
printf("Splitting range [%s - %s) into (%d):\n",
|
||||
range.begin.printable().c_str(),
|
||||
range.end.printable().c_str(),
|
||||
granuleRange.begin.printable().c_str(),
|
||||
granuleRange.end.printable().c_str(),
|
||||
newRanges.size() - 1);
|
||||
for (int i = 0; i < newRanges.size() - 1; i++) {
|
||||
printf(" [%s - %s)\n", newRanges[i].printable().c_str(), newRanges[i + 1].printable().c_str());
|
||||
|
@ -703,7 +705,7 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
|
|||
RangeAssignment raRevoke;
|
||||
raRevoke.isAssign = false;
|
||||
raRevoke.worker = currentWorkerId;
|
||||
raRevoke.keyRange = range;
|
||||
raRevoke.keyRange = granuleRange;
|
||||
raRevoke.revoke = RangeRevokeData(false); // not a dispose
|
||||
bmData->rangesToAssign.send(raRevoke);
|
||||
|
||||
|
@ -824,7 +826,8 @@ ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn
|
|||
rep.granuleRange.end.printable().c_str());
|
||||
}
|
||||
lastSeenSeqno.insert(rep.granuleRange, std::pair(rep.epoch, rep.seqno));
|
||||
bmData->addActor.send(maybeSplitRange(bmData, bwInterf.id(), rep.granuleRange));
|
||||
bmData->addActor.send(maybeSplitRange(
|
||||
bmData, bwInterf.id(), rep.granuleRange, rep.granuleID, rep.startVersion, rep.latestVersion));
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -52,6 +52,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
int64_t mismatches = 0;
|
||||
int64_t initialReads = 0;
|
||||
int64_t timeTravelReads = 0;
|
||||
int64_t timeTravelTooOld = 0;
|
||||
int64_t rowsRead = 0;
|
||||
int64_t bytesRead = 0;
|
||||
std::vector<Future<Void>> clients;
|
||||
|
@ -312,7 +313,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
};
|
||||
|
||||
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) {
|
||||
// TODO add time travel + verification
|
||||
state double last = now();
|
||||
state double endTime = last + self->testDuration;
|
||||
state std::map<double, OldRead> timeTravelChecks;
|
||||
|
@ -340,11 +340,17 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
timeTravelIt = timeTravelChecks.erase(timeTravelIt);
|
||||
// advance iterator before doing read, so if it gets error we don't retry it
|
||||
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
|
||||
wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v));
|
||||
self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false);
|
||||
|
||||
self->timeTravelReads++;
|
||||
try {
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
|
||||
wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v));
|
||||
self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false);
|
||||
self->timeTravelReads++;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_transaction_too_old) {
|
||||
self->timeTravelTooOld++;
|
||||
// TODO: add debugging info for when this is a failure
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pick a random range
|
||||
|
@ -404,6 +410,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
state Version readVersion = wait(tr.getReadVersion());
|
||||
state int checks = 0;
|
||||
|
||||
state bool availabilityPassed = true;
|
||||
state std::vector<KeyRange> allRanges = self->granuleRanges.get();
|
||||
for (auto& range : allRanges) {
|
||||
state KeyRange r = range;
|
||||
|
@ -437,20 +444,24 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
last.begin.printable().c_str(),
|
||||
last.end.printable().c_str());
|
||||
}
|
||||
throw e;
|
||||
availabilityPassed = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
printf("Blob Granule Verifier finished with:\n");
|
||||
printf(" %d successful final granule checks\n", checks);
|
||||
printf(" %d failed final granule checks\n", availabilityPassed ? 0 : 1);
|
||||
printf(" %lld mismatches\n", self->mismatches);
|
||||
printf(" %lld time travel too old\n", self->timeTravelTooOld);
|
||||
printf(" %lld errors\n", self->errors);
|
||||
printf(" %lld initial reads\n", self->initialReads);
|
||||
printf(" %lld time travel reads\n", self->timeTravelReads);
|
||||
printf(" %lld rows\n", self->rowsRead);
|
||||
printf(" %lld bytes\n", self->bytesRead);
|
||||
printf(" %d final granule checks\n", checks);
|
||||
// FIXME: add above as details
|
||||
TraceEvent("BlobGranuleVerifierChecked");
|
||||
return self->mismatches == 0 && checks > 0;
|
||||
return availabilityPassed && self->mismatches == 0 && checks > 0 && self->timeTravelTooOld == 0;
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override {
|
||||
|
|
|
@ -140,6 +140,7 @@ public: // introduced features
|
|||
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, ChangeFeed);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TSS);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, BlobGranule); // TODO make this 7.1 or 7.2?
|
||||
};
|
||||
|
||||
template <>
|
||||
|
|
Loading…
Reference in New Issue